Type/to search
8
Follow
1363
Followers
From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy
Original
Created 2025-08-11 17:22:24  Updated 2025-08-12 09:03:59
 0
 317

img

Accidentally Discovered an Interesting Pine Strategy

A few days ago, while browsing strategies on the Inventor forum, I came across one called "Panel Pro+ Quantum SmartPrompt". After reviewing the code, I found this strategy's approach quite interesting: it uses 10 technical indicators, assigns different weights to each indicator based on market conditions, and finally calculates a score to determine buy/sell decisions. For example, in a bull market state, trend indicators have a weight of 2.0 and RSI has a weight of 1.5; in a bear market state, the weights are different. It feels like it's mimicking human thinking: focusing on different aspects under different circumstances.
Upon closer examination, this structure is quite similar to a neural network:

  • 10 technical indicators as inputs
  • Market state classification acts like a hidden layer
  • Weight matrices serve as connection weights
  • Finally outputs a score

However, the problem is that all weights are hardcoded, for example:

pine
if marketType == "Bull" array.set(weights, 0, 2.0) // Trend weight is fixed at 2.0 array.set(weights, 1, 1.5) // RSI weight is fixed at 1.5

These numbers are completely fixed by the author based on market experience, without any learning or optimization.

Idea: Make the Weights Learnable

Since the structure already resembles a neural network, why not make it truly capable of learning?
My idea is simple:

  • 1.Keep the original weight calculation method to get a "weight score"
  • 2.Use this weight score as input to train a small neural network
  • 3.Let the network learn to predict future returns from the weight score
  • 4.Decide whether to open positions based on the predicted return magnitude

This way, we preserve the original strategy's logic while adding learning capability.

Implementation on the Inventor Platform

I chose the Inventor platform mainly because it supports Python and contains rich data.

Step 1: Rewrite Technical Indicators

I rewrote all indicators from the Pine script in Python, using the talib library to ensure calculation accuracy. This includes common indicators like EMA, MACD, RSI, ATR, as well as volume analysis and simple candlestick pattern recognition.

Step 2: Market State Detection

Following the original strategy's logic, I determine market types (Bull, Bear, Eagle, Wolf, etc.) based on combinations of various indicators. This part is basically a combination of if-else logic.

Step 3: Weight Score Calculation

This is the core part. I set up two sets of weights:

  • Base weights: [2.0, 1.5, 2.0, 1.3, 1.2, ...]
  • Market weights: adjusted according to different market states

Final weight = Base weight × Market weight

Then I use these weights to calculate a weighted sum of the 10 indicators' raw scores to get the "weight score."

Step 4: Neural Network Predictor

I built a simple network:

  • Input: 1 feature (weight score)
  • Hidden layer: 16 neurons with ReLU activation
  • Output: predicted return, constrained to ±5% using tanh

Training objective: Use the weight score at time t-1 to predict the price change at time t.

Step 5: Trading Logic

Instead of buying/selling directly based on score levels, I now look at predicted returns:

  • Predicted return > 1.5%: go long or close short and go long
  • Predicted return < -1.5%: go short or close long and go short
  • Other cases: maintain current position

I also keep stop-loss and take-profit mechanisms to ensure controllable risk.

Observations from Actual Operation

Data Collection

The strategy can normally collect training data. Each time there's a new candlestick, it uses the previous candlestick's weight score as a feature and the current candlestick's percentage change relative to the previous one as the label.

The data looks roughly like this:

Weight Score = 15.6, Return = +0.8% Weight Score = -8.2, Return = -1.2% Weight Score = 22.1, Return = +0.3%

Model Training

The neural network trains normally, with MSE loss gradually decreasing. I set it to retrain every 4 hours to ensure the model can adapt to market changes.

Prediction Performance

The model's predictions do show some correlation with actual returns, but it's not particularly strong. Main issues:

  • 1.Single feature is too simple, possibly insufficient information
  • 2.Short-term price movements have high randomness
  • 3.Futures markets have considerable noise

Trading Performance

Thanks to stop-loss and take-profit protection, risk control for individual trades is decent. However, overall profitability is mediocre, mainly due to insufficient prediction accuracy.

img

Problems Encountered

  • Features Too Limited: Using only the weight score as a single feature is indeed too simple. Markets are so complex that one number can hardly capture everything.
  • Unstable Sample Quality: Contract prices have large short-term fluctuations, and often the ups and downs are actually random, making the quality of training samples unstable.
  • Overfitting Risk: Although the network is simple, there's still potential for overfitting when sample size is limited.
  • Real-time Requirements: Online learning needs to balance training time with real-time performance.

Limited Time, Insufficient Optimization

This strategy has many areas for improvement, but with limited time and energy, I couldn't optimize it deeply:

  • Feature Aspects: Could add more technical indicators or use statistical features of price sequences.
  • Model Aspects: Could try sequence models like LSTM, or use ensemble methods with multiple models.
  • Data Aspects: Improve sample quality and add data cleaning.
  • Risk Management: Refine dynamic stop-loss and optimize position management.

Gains and Reflections

This exploration taught me an important lesson: good inspiration is all about timely implementation! When I saw the weight matrix design in the Pine script, I immediately thought of the possibility of improving it with neural networks. If I had just thought about it without taking action, or procrastinated, this idea would likely have been forgotten.
Fortunately, the Inventor platform provided a Python environment and data interfaces, allowing me to quickly turn the idea into runnable code. From generating the idea to completing the basic implementation took only about a day.
Although the final strategy's performance was mediocre, running it in practice at least validated that this approach was feasible. More importantly, new ideas and improvement directions emerged during the implementation process. Without taking action promptly, none of these subsequent discoveries and thoughts would have occurred.
Theoretical discussions can never compare to actually writing code, running data, and examining results. Quantitative trading is like this - there are many ideas, but truly valuable ones are those that get quickly implemented and validated.

python
'''backtest start: 2025-07-31 00:00:00 end: 2025-08-07 00:00:00 period: 1h basePeriod: 5m exchanges: [{"eid":"Futures_Binance","currency":"ETH_USDT","balance":5000000,"fee":[0.01,0.01]}] ''' import numpy as np from collections import deque import talib as TA # ========== Exception Classes ========== class Error_noSupport(BaseException): def __init__(self): Log("Only futures trading is supported! #FF0000") class Error_AtBeginHasPosition(BaseException): def __init__(self): Log("Futures position exists at startup! #FF0000") # ========== Return Prediction Neural Network ========== class ReturnPredictor: def __init__(self, input_size=10, hidden_size=20, output_size=1): """Return prediction network: X[t] -> y[t+1] (return rate)""" self.W1 = np.random.randn(input_size, hidden_size) * 0.1 self.b1 = np.zeros((1, hidden_size)) self.W2 = np.random.randn(hidden_size, output_size) * 0.1 self.b2 = np.zeros((1, output_size)) self.learning_rate = 0.001 def sigmoid(self, x): return 1 / (1 + np.exp(-np.clip(x, -250, 250))) def tanh(self, x): return np.tanh(x) def forward(self, X): self.z1 = np.dot(X, self.W1) + self.b1 self.a1 = self.sigmoid(self.z1) self.z2 = np.dot(self.a1, self.W2) + self.b2 # Output predicted return rate, using tanh to limit to reasonable range self.a2 = self.tanh(self.z2) * 0.1 # Limit to ±10% range return self.a2 def backward(self, X, y, output): m = X.shape[0] # MSE loss gradient dZ2 = (output - y) / m # tanh derivative tanh_derivative = 1 - (output / 0.1) ** 2 dZ2 = dZ2 * 0.1 * tanh_derivative dW2 = np.dot(self.a1.T, dZ2) db2 = np.sum(dZ2, axis=0, keepdims=True) dA1 = np.dot(dZ2, self.W2.T) dZ1 = dA1 * self.a1 * (1 - self.a1) # sigmoid derivative dW1 = np.dot(X.T, dZ1) db1 = np.sum(dZ1, axis=0, keepdims=True) # Update weights self.W2 -= self.learning_rate * dW2 self.b2 -= self.learning_rate * db2 self.W1 -= self.learning_rate * dW1 self.b1 -= self.learning_rate * db1 def train(self, X, y, epochs=100): for i in range(epochs): output = self.forward(X) self.backward(X, y, output) if i % 20 == 0: loss = np.mean((output - y) ** 2) Log(f"Return prediction training epoch {i}, MSE loss: {loss:.6f}") def predict(self, X): return self.forward(X) # ========== Technical Indicators Calculation Class ========== class TechnicalIndicators: @staticmethod def calculate_indicators(records, use_completed_only=True): """Calculate technical indicators and features""" if len(records) < 55: return None, None # Only use completed candlestick data if use_completed_only and len(records) > 1: working_records = records[:-1] else: working_records = records if len(working_records) < 55: return None, None closes = np.array([r['Close'] for r in working_records]) highs = np.array([r['High'] for r in working_records]) lows = np.array([r['Low'] for r in working_records]) volumes = np.array([r['Volume'] for r in working_records]) opens = np.array([r['Open'] for r in working_records]) try: # Basic indicators ema_55 = TA.EMA(closes, timeperiod=55) sma_vol20 = TA.SMA(volumes, timeperiod=20) macd, signal_line, _ = TA.MACD(closes) rsi_val = TA.RSI(closes, timeperiod=14) atr14 = TA.ATR(highs, lows, closes, timeperiod=14) range20 = TA.STDDEV(closes, timeperiod=20) # Calculate derived indicators sma_atr20 = TA.SMA(atr14, timeperiod=20) sma_range20 = TA.SMA(range20, timeperiod=20) rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes) delta = closes - opens # Calculate volume thresholds vol_abs_thresh = sma_vol20 * 1.2 sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1] # Trend trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0)) # Simplified candlestick patterns body_size = np.abs(closes - opens) total_range = highs - lows # Hammer pattern is_hammer = ((total_range > 3 * body_size) & ((closes - lows) / (total_range + 0.001) > 0.6) & ((opens - lows) / (total_range + 0.001) > 0.6)) # Engulfing pattern is_engulfing = np.zeros_like(closes, dtype=bool) if len(closes) >= 2: is_engulfing[1:] = ((closes[1:] > opens[:-1]) & (opens[1:] < closes[:-1]) & (closes[1:] > opens[1:]) & (opens[1:] < closes[1:])) pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0)) # 🔥 Calculate normalized feature vectors (for neural network input) features = [] # 1. Trend feature if len(ema_55) > 0 and not np.isnan(ema_55[-1]): trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1] features.append(np.tanh(trend_feature * 100)) else: features.append(0) # 2. RSI feature if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]): rsi_feature = (rsi_val[-1] - 50) / 50 features.append(rsi_feature) else: features.append(0) # 3. MACD feature if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]): macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0 features.append(np.tanh(macd_feature * 1000)) else: features.append(0) # 4. Volume feature if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0: vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1 features.append(np.tanh(vol_feature)) else: features.append(0) # 5. Relative volume feature if len(rvol) > 0 and not np.isnan(rvol[-1]): rvol_feature = rvol[-1] - 1 features.append(np.tanh(rvol_feature)) else: features.append(0) # 6. Delta feature if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0: delta_feature = delta[-1] / closes[-1] features.append(np.tanh(delta_feature * 100)) else: features.append(0) # 7. ATR feature if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0: atr_feature = atr14[-1] / sma_atr20[-1] - 1 features.append(np.tanh(atr_feature)) else: features.append(0) # 8. Blocks feature if len(volumes) >= 10: highest_vol = np.max(volumes[-10:]) blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0 features.append(np.tanh(blocks_feature * 5)) else: features.append(0) # 9. Tick feature if len(sma_vol20) > 0 and sma_vol20[-1] > 0: tick_feature = volumes[-1] / sma_vol20[-1] - 1 features.append(np.tanh(tick_feature)) else: features.append(0) # 10. Pattern feature pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0 features.append(pattern_feature) # Ensure correct feature count while len(features) < 10: features.append(0) features = np.array(features[:10]).reshape(1, -1) indicators = { 'ema_55': ema_55, 'sma_vol20': sma_vol20, 'macd': macd, 'signal_line': signal_line, 'rsi_val': rsi_val, 'atr14': atr14, 'range20': range20, 'sma_atr20': sma_atr20, 'sma_range20': sma_range20, 'rvol': rvol, 'delta': delta, 'vol_abs_thresh': vol_abs_thresh, 'sniper_thresh': sniper_thresh, 'trend': trend, 'pattern': pattern, 'volumes': volumes, 'closes': closes, 'highs': highs, 'lows': lows } return indicators, features except Exception as e: Log(f"Technical indicators calculation error: {str(e)}") return None, None # ========== Market State Detection Class ========== class MarketStateDetector: @staticmethod def detect_market_type(indicators): """Detect market state""" if indicators is None: return "Unknown" try: # Get latest values close = indicators['closes'][-1] ema_55 = indicators['ema_55'][-1] macd = indicators['macd'][-1] signal_line = indicators['signal_line'][-1] rsi_val = indicators['rsi_val'][-1] atr14 = indicators['atr14'][-1] volume = indicators['volumes'][-1] sma_vol20 = indicators['sma_vol20'][-1] sma_atr20 = indicators['sma_atr20'][-1] range20 = indicators['range20'][-1] sma_range20 = indicators['sma_range20'][-1] rvol = indicators['rvol'][-1] delta = indicators['delta'][-1] # Check validity if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)): return "Unknown" # Market type judgment is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1) is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20) is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20) is_volatile = (atr14 > sma_atr20 * 1.2) # Judgments requiring historical data if len(indicators['closes']) >= 2: price_change = indicators['closes'][-1] - indicators['closes'][-2] is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5) is_wolf = (price_change < -atr14 and close < ema_55) else: is_momentum = False is_wolf = False is_mean_rev = (rsi_val > 70 or rsi_val < 30) is_box = (is_sideways and range20 < sma_range20 * 0.8) is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False is_eagle = (is_bull and atr14 < sma_atr20 * 0.8) # Priority judgment if is_eagle: return "Eagle" elif is_bull: return "Bull" elif is_wolf: return "Wolf" elif is_bear: return "Bear" elif is_box: return "Box" elif is_sideways: return "Sideways" elif is_volatile: return "Volatile" elif is_momentum: return "Momentum" elif is_mean_rev: return "MeanRev" elif is_macro: return "Macro" else: return "Unknown" except Exception as e: Log(f"Market state detection error: {str(e)}") return "Unknown" # ========== Dynamic Weight Generator ========== class DynamicWeightGenerator: @staticmethod def generate_weights_from_predicted_return(predicted_return, market_type): """Generate dynamic weights based on predicted return rate and market state""" # Base weight matrix (different market types) base_weights_matrix = { "Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0], "Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0], "Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1], "Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9], "Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0], "Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3], "Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0], } base_weights = base_weights_matrix.get(market_type, [1.0] * 10) # 🔥 Dynamically adjust weights based on predicted return rate adjustment_factors = [1.0] * 10 # Strength of predicted return rate return_strength = abs(predicted_return) return_direction = 1 if predicted_return > 0 else -1 if return_strength > 0.02: # Strong prediction signal > 2% if return_direction > 0: # Predict upward adjustment_factors[0] *= 1.3 # Enhance trend weight adjustment_factors[2] *= 1.2 # Enhance MACD weight adjustment_factors[4] *= 1.15 # Enhance relative volume weight adjustment_factors[1] *= 0.9 # Reduce RSI weight else: # Predict downward adjustment_factors[1] *= 1.3 # Enhance RSI weight adjustment_factors[3] *= 1.2 # Enhance volume weight adjustment_factors[0] *= 0.9 # Reduce trend weight elif return_strength > 0.01: # Medium prediction signal 1%-2% if return_direction > 0: adjustment_factors[0] *= 1.15 adjustment_factors[2] *= 1.1 else: adjustment_factors[1] *= 1.15 adjustment_factors[3] *= 1.1 # Volatility adjustment if return_strength > 0.03: # High volatility expectation > 3% adjustment_factors[4] *= 1.2 # Enhance relative volume weight adjustment_factors[6] *= 1.15 # Enhance sniper weight adjustment_factors[7] *= 1.1 # Enhance blocks weight # Generate final dynamic weights dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)] # Weight normalization (optional) # total_weight = sum(dynamic_weights) # dynamic_weights = [w / total_weight * 10 for w in dynamic_weights] return dynamic_weights # ========== Smart Scoring System ========== class SmartScoringSystem: def __init__(self): self.return_predictor = ReturnPredictor() self.weight_generator = DynamicWeightGenerator() self.is_model_trained = False def calculate_score(self, indicators, market_type, features=None): """Calculate trading score (using dynamic weights from predicted return rate)""" if indicators is None: return 50.0 try: # 🔥 Core logic: Use current indicators to predict next period return rate if self.is_model_trained and features is not None: predicted_return = self.return_predictor.predict(features)[0, 0] else: predicted_return = 0.0 Log(f"📊 Using base weights for calculation") # Generate dynamic weights based on predicted return rate dynamic_weights = self.weight_generator.generate_weights_from_predicted_return( predicted_return, market_type) # Get latest indicator values trend = indicators['trend'][-1] rsi_val = indicators['rsi_val'][-1] macd = indicators['macd'][-1] signal_line = indicators['signal_line'][-1] volume = indicators['volumes'][-1] vol_abs_thresh = indicators['vol_abs_thresh'][-1] sma_vol20 = indicators['sma_vol20'][-1] rvol = indicators['rvol'][-1] delta = indicators['delta'][-1] sniper_thresh = indicators['sniper_thresh'] pattern = indicators['pattern'][-1] # Calculate individual scores base_score = 0.0 # 1. Trend score trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0) base_score += trend_score * dynamic_weights[0] # 2. RSI score rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0) base_score += rsi_score * dynamic_weights[1] # 3. MACD score macd_score = 10 if macd > signal_line else -10 base_score += macd_score * dynamic_weights[2] # 4. Volume score vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0) base_score += vol_score * dynamic_weights[3] # 5. Relative volume score rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0) base_score += rvol_score * dynamic_weights[4] # 6. Delta score delta_score = 6 if delta > 0 else -6 base_score += delta_score * dynamic_weights[5] # 7. Sniper score sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0) base_score += sniper_score * dynamic_weights[6] # 8. Blocks score if len(indicators['volumes']) >= 10: highest_vol = np.max(indicators['volumes'][-10:]) blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0) else: blocks_score = 0 base_score += blocks_score * dynamic_weights[7] # 9. Tick score tick_score = 5 if volume > sma_vol20 else -5 base_score += tick_score * dynamic_weights[8] # 10. Pattern score pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0) base_score += pattern_score * dynamic_weights[9] # Convert to percentage score score_pct = max(0, min(100, 50 + base_score)) return score_pct except Exception as e: Log(f"Score calculation error: {str(e)}") return 50.0 def train_return_predictor(self, X, y): """Train return rate predictor""" if len(X) < 20: Log("Insufficient training data, skipping return predictor training") return False X_array = np.array(X) y_array = np.array(y).reshape(-1, 1) Log(f"🧠 Starting return predictor training, sample count: {len(X_array)}") Log(f"📊 Return rate range: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]") self.return_predictor.train(X_array, y_array, epochs=100) self.is_model_trained = True # Validate model prediction performance predictions = self.return_predictor.predict(X_array) mse = np.mean((predictions - y_array) ** 2) correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1] Log(f"✅ Return predictor training completed") Log(f"📈 MSE: {mse:.6f}, Correlation coefficient: {correlation:.4f}") return True # ========== Dynamic Parameter Manager ========== class DynamicParameterManager: def __init__(self): self.market_params = { "Bull": {"stop_loss": 0.02, "take_profit": 0.05}, "Bear": {"stop_loss": 0.02, "take_profit": 0.05}, "Eagle": {"stop_loss": 0.015, "take_profit": 0.06}, "Wolf": {"stop_loss": 0.025, "take_profit": 0.04}, "Momentum": {"stop_loss": 0.025, "take_profit": 0.06}, "Sideways": {"stop_loss": 0.01, "take_profit": 0.02}, "Volatile": {"stop_loss": 0.03, "take_profit": 0.07}, "Unknown": {"stop_loss": 0.02, "take_profit": 0.03} } def get_params(self, market_type): return self.market_params.get(market_type, self.market_params["Unknown"]) # ========== Main Strategy Class ========== class PredictiveNeuralTradingStrategy: def __init__(self): self.data_buffer = deque(maxlen=200) self.feature_buffer = deque(maxlen=100) self.label_buffer = deque(maxlen=100) # Store return rate labels self.scoring_system = SmartScoringSystem() self.param_manager = DynamicParameterManager() # Training control self.last_retrain_time = 0 self.retrain_interval = 3600 * 6 # Retrain every 6 hours self.min_train_samples = 30 # Trading state self.POSITION_NONE = 0 self.POSITION_LONG = 1 self.POSITION_SHORT = 2 self.position_state = self.POSITION_NONE # Trading records self.open_price = 0 self.counter = {'win': 0, 'loss': 0} # K-line data management self.last_processed_time = 0 def get_current_position(self): """Get current futures position status""" try: positions = exchange.GetPosition() if not positions: return self.POSITION_NONE, 0 long_amount = 0 short_amount = 0 for pos in positions: amount = pos.get('Amount', 0) pos_type = pos.get('Type', -1) if amount > 0: if pos_type == 0: # Long position long_amount += amount elif pos_type == 1: # Short position short_amount += amount net_position = long_amount - short_amount if net_position > 0: return self.POSITION_LONG, net_position elif net_position < 0: return self.POSITION_SHORT, abs(net_position) else: return self.POSITION_NONE, 0 except Exception as e: Log(f"Get position error: {str(e)}") return self.POSITION_NONE, 0 def collect_data(self, records): """Collect data and generate training samples""" if not records or len(records) < 55: return False # Check if there's a new completed K-line if len(records) > 1: latest_completed = records[-2] current_time = latest_completed['Time'] # Skip if this K-line has already been processed if current_time <= self.last_processed_time: return False self.last_processed_time = current_time # Add completed K-lines to buffer completed_records = records[:-1] if len(records) > 1 else [] if completed_records: self.data_buffer.extend(completed_records[-5:]) # 🔥 Generate training samples: X[t] -> y[t+1] if len(self.data_buffer) >= 2: # Use the second-to-last record as features, last record to calculate return rate label buffer_list = list(self.data_buffer) # Calculate t-1 moment indicators as features feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list indicators, features = TechnicalIndicators.calculate_indicators( feature_records, use_completed_only=False) if indicators is not None and features is not None: # Calculate t moment return rate relative to t-1 moment as label if len(buffer_list) >= 2: current_close = buffer_list[-1]['Close'] previous_close = buffer_list[-2]['Close'] if previous_close > 0: return_rate = (current_close - previous_close) / previous_close # Add to training set self.feature_buffer.append(features[0]) self.label_buffer.append(return_rate) Log(f"📈 New sample: Return rate={return_rate*100:.3f}%, Feature dimensions={features.shape}") return True def should_retrain(self): """Determine if retraining is needed""" import time current_time = time.time() return (current_time - self.last_retrain_time > self.retrain_interval and len(self.feature_buffer) >= self.min_train_samples) def train_model(self): """Train return rate predictor""" if len(self.feature_buffer) < self.min_train_samples: Log("Insufficient training data, skipping training") return False X = list(self.feature_buffer) y = list(self.label_buffer) success = self.scoring_system.train_return_predictor(X, y) if success: import time self.last_retrain_time = time.time() return success def get_trading_signals(self, records): """Get trading signals""" # Calculate current moment technical indicators indicators, features = TechnicalIndicators.calculate_indicators( list(self.data_buffer), use_completed_only=False) if indicators is None: return 50.0, "Unknown" # Detect market type market_type = MarketStateDetector.detect_market_type(indicators) # 🔥 Calculate score using dynamic weights from predicted return rate score = self.scoring_system.calculate_score(indicators, market_type, features) return score, market_type def check_entry_conditions(self, score, market_type): """Check entry conditions""" # Long conditions long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65) # Short conditions short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35) return long_condition, short_condition def open_long(self): """Open long position""" try: ticker = exchange.GetTicker() if not ticker: return False buy_price = ticker['Last'] + 20 order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP) if order_id: Sleep(2000) order_info = exchange.GetOrder(order_id) if order_info and order_info.get('Status') == 1: self.open_price = order_info.get('AvgPrice', buy_price) self.position_state = self.POSITION_LONG Log(f"🚀 Long position opened successfully: Price={self.open_price}, Amount={AmountOP}") return True else: exchange.CancelOrder(order_id) Log("Long position order not fully filled, cancelled") return False except Exception as e: Log(f"Open long position error: {str(e)}") return False def open_short(self): """Open short position""" try: ticker = exchange.GetTicker() if not ticker: return False sell_price = ticker['Last'] - 20 order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP) if order_id: Sleep(2000) order_info = exchange.GetOrder(order_id) if order_info and order_info.get('Status') == 1: self.open_price = order_info.get('AvgPrice', sell_price) self.position_state = self.POSITION_SHORT Log(f"🎯 Short position opened successfully: Price={self.open_price}, Amount={AmountOP}") return True else: exchange.CancelOrder(order_id) Log("Short position order not fully filled, cancelled") return False except Exception as e: Log(f"Open short position error: {str(e)}") return False def close_position(self): """Close position""" try: positions = exchange.GetPosition() if not positions: Log("No position to close") self.position_state = self.POSITION_NONE self.open_price = 0 return True ticker = exchange.GetTicker() if not ticker: return False close_success = True for pos in positions: if pos['Amount'] == 0: continue amount = pos['Amount'] pos_type = pos['Type'] if pos_type == 0: # Close long position close_price = ticker['Last'] - 20 order_id = exchange.CreateOrder("", "closebuy", close_price, amount) Log(f"📤 Close long position: Price={close_price}, Amount={amount}") elif pos_type == 1: # Close short position close_price = ticker['Last'] + 20 order_id = exchange.CreateOrder("", "closesell", close_price, amount) Log(f"📤 Close short position: Price={close_price}, Amount={amount}") if order_id: Sleep(2000) order_info = exchange.GetOrder(order_id) if order_info and order_info.get('Status') == 1: close_price = order_info.get('AvgPrice', close_price) Log(f"✅ Position closed successfully: Fill price={close_price}") self.update_profit_stats(close_price) else: exchange.CancelOrder(order_id) close_success = False Log(f"Close position order not fully filled, cancelled") else: close_success = False Log("Failed to create close position order") if close_success: self.position_state = self.POSITION_NONE self.open_price = 0 return close_success except Exception as e: Log(f"Close position error: {str(e)}") return False def update_profit_stats(self, close_price): """Update profit/loss statistics""" if self.open_price == 0: return if self.position_state == self.POSITION_LONG: if close_price > self.open_price: self.counter['win'] += 1 Log("💰 Long position profitable") else: self.counter['loss'] += 1 Log("💸 Long position loss") elif self.position_state == self.POSITION_SHORT: if close_price < self.open_price: self.counter['win'] += 1 Log("💰 Short position profitable") else: self.counter['loss'] += 1 Log("💸 Short position loss") def check_stop_loss_take_profit(self, current_price, params): """Check stop loss and take profit, execute closing""" if self.open_price == 0 or self.position_state == self.POSITION_NONE: return False stop_loss_pct = params["stop_loss"] take_profit_pct = params["take_profit"] if self.position_state == self.POSITION_LONG: profit_pct = (current_price - self.open_price) / self.open_price if profit_pct <= -stop_loss_pct: Log(f"🔴 Long position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}") return self.execute_close_position("Stop Loss") elif profit_pct >= take_profit_pct: Log(f"🟢 Long position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}") return self.execute_close_position("Take Profit") elif self.position_state == self.POSITION_SHORT: profit_pct = (self.open_price - current_price) / self.open_price if profit_pct <= -stop_loss_pct: Log(f"🔴 Short position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}") return self.execute_close_position("Stop Loss") elif profit_pct >= take_profit_pct: Log(f"🟢 Short position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}") return self.execute_close_position("Take Profit") return False def execute_close_position(self, reason): """Execute close position operation (specifically for stop loss/take profit)""" try: positions = exchange.GetPosition() if not positions: Log(f"{reason} close position: No position") self.position_state = self.POSITION_NONE self.open_price = 0 return True ticker = exchange.GetTicker() if not ticker: Log(f"{reason} close position failed: Cannot get ticker") return False Log(f"🚨 Executing {reason} close position operation...") close_success = True for pos in positions: if pos['Amount'] == 0: continue amount = pos['Amount'] pos_type = pos['Type'] order_id = None if pos_type == 0: # Close long position close_price = ticker['Last'] - 50 order_id = exchange.CreateOrder("", "closebuy", close_price, amount) Log(f"📤 {reason} close long position order: Price={close_price}, Amount={amount}") elif pos_type == 1: # Close short position close_price = ticker['Last'] + 50 order_id = exchange.CreateOrder("", "closesell", close_price, amount) Log(f"📤 {reason} close short position order: Price={close_price}, Amount={amount}") if order_id: Log(f"📋 {reason} close position order ID: {order_id}") Sleep(1500) for retry in range(2): order_info = exchange.GetOrder(order_id) if order_info: status = order_info.get('Status', -1) if status == 1: close_price = order_info.get('AvgPrice', close_price) Log(f"✅ {reason} close position successful: Fill price={close_price}") self.update_profit_stats(close_price) break elif status == 0: if retry == 0: Log(f"⏳ {reason} close position order executing, waiting...") Sleep(1500) else: Log(f"⚠️ {reason} close position order not fully filled, force cancel") exchange.CancelOrder(order_id) close_success = False else: Log(f"❌ {reason} close position order status abnormal: {status}") exchange.CancelOrder(order_id) close_success = False break else: Log(f"⚠️ Cannot get {reason} close position order info, retry {retry+1}/2") if retry == 1: close_success = False else: Log(f"❌ {reason} close position order creation failed") close_success = False if close_success: Sleep(1000) new_positions = exchange.GetPosition() total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0 if total_amount == 0: Log(f"✅ {reason} close position completed, position cleared") self.position_state = self.POSITION_NONE self.open_price = 0 return True else: Log(f"⚠️ {reason} close position incomplete, remaining position: {total_amount}") return False else: Log(f"❌ {reason} close position failed") return False except Exception as e: Log(f"❌ {reason} close position error: {str(e)}") return False def execute_trade_logic(self, score, market_type, current_price): """Execute trading logic""" params = self.param_manager.get_params(market_type) # Get current actual position status actual_position, position_amount = self.get_current_position() # Synchronize internal state self.position_state = actual_position # First check stop loss/take profit (highest priority) if self.position_state != self.POSITION_NONE: if self.check_stop_loss_take_profit(current_price, params): Log("🚨 Stop loss/take profit triggered, executed close position, skipping other trading signals") return # Get entry conditions long_condition, short_condition = self.check_entry_conditions(score, market_type) # Execute trading logic if long_condition and self.position_state <= self.POSITION_NONE: Log(f"📈 Long position signal: Market={market_type}, Prediction score={score:.1f} > 65") self.open_long() if short_condition and self.position_state >= self.POSITION_NONE: Log(f"📉 Short position signal: Market={market_type}, Prediction score={score:.1f} < 35") self.open_short() if not long_condition and self.position_state > self.POSITION_NONE: Log(f"📤 Close long position signal: Market={market_type}, Prediction score={score:.1f}") self.close_position() if not short_condition and self.position_state < self.POSITION_NONE: Log(f"📤 Close short position signal: Market={market_type}, Prediction score={score:.1f}") self.close_position() def CancelPendingOrders(): """Cancel all pending orders""" while True: orders = exchange.GetOrders() if not orders: break for order in orders: exchange.CancelOrder(order['Id']) Sleep(500) def main(): global AmountOP, LoopInterval # Check initial position initial_positions = exchange.GetPosition() if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions): raise Error_AtBeginHasPosition() # Cancel all pending orders CancelPendingOrders() # Initialize strategy strategy = PredictiveNeuralTradingStrategy() Log("🔮 Predictive Neural Network Futures Trading Strategy Started") LogProfitReset() # Data warm-up period Log("Entering data warm-up period...") warmup_count = 0 warmup_target = 60 while warmup_count < warmup_target: records = exchange.GetRecords() if records and len(records) >= 55: if strategy.collect_data(records): warmup_count += 1 if warmup_count % 10 == 0: Log(f"Warm-up progress: {warmup_count}/{warmup_target}") Sleep(5000) Log("Data warm-up completed, starting initial return rate predictor training...") strategy.train_model() # Main trading loop loop_count = 0 while True: loop_count += 1 # Get K-line data records = exchange.GetRecords() if not records or len(records) < 55: Sleep(LoopInterval * 1000) continue # Data processing data_updated = strategy.collect_data(records) # Check if retraining is needed if strategy.should_retrain(): Log("🔄 Retraining return rate predictor...") strategy.train_model() # Get trading signals score, market_type = strategy.get_trading_signals(records) # Get current real-time price ticker = exchange.GetTicker() if ticker: current_price = ticker['Last'] else: current_price = records[-1]['Close'] # Get current parameters params = strategy.param_manager.get_params(market_type) # Priority check for stop loss/take profit (using real-time price) if strategy.position_state != strategy.POSITION_NONE: if strategy.check_stop_loss_take_profit(current_price, params): Log("⚡ Stop loss/take profit triggered, executed close position") Sleep(LoopInterval * 1000) continue # Execute trading logic (only when there's new data) if data_updated: strategy.execute_trade_logic(score, market_type, current_price) # Status display pos_state_name = { strategy.POSITION_NONE: "No Position", strategy.POSITION_LONG: "Long", strategy.POSITION_SHORT: "Short" }.get(strategy.position_state, "Unknown") data_status = "📊New Data" if data_updated else "⏸️Waiting" model_status = "🔮Prediction" if strategy.scoring_system.is_model_trained else "📊Basic" # Get entry conditions for display long_cond, short_cond = strategy.check_entry_conditions(score, market_type) signal_status = "" if long_cond: signal_status = "📈Long" elif short_cond: signal_status = "📉Short" else: signal_status = "🔄Hold" # Display training sample count sample_count = len(strategy.feature_buffer) LogStatus(f"Loop: {loop_count}, Price: {current_price:.2f}, " f"Prediction Score: {score:.1f}, Market: {market_type}, " f"Position: {pos_state_name}, Signal: {signal_status}, " f"Status: {data_status}, Mode: {model_status}, " f"Samples: {sample_count}, " f"Win: {strategy.counter['win']}, Loss: {strategy.counter['loss']}") Sleep(LoopInterval * 1000) # ========== Parameter Settings ========== AmountOP = 1 # Futures contract quantity LoopInterval = 3 # Loop interval (seconds) if __name__ == "__main__": main()
Comment
All comments (0)
No data
No data
  • 1
iPhone Download
Forums
PINE Language
© 2015 - ∞ INVENTOR PTE LTD (SG)