
A few days ago, while browsing strategies on the Inventor forum, I came across one called “Panel Pro+ Quantum SmartPrompt”. After reviewing the code, I found this strategy’s approach quite interesting: it uses 10 technical indicators, assigns different weights to each indicator based on market conditions, and finally calculates a score to determine buy/sell decisions. For example, in a bull market state, trend indicators have a weight of 2.0 and RSI has a weight of 1.5; in a bear market state, the weights are different. It feels like it’s mimicking human thinking: focusing on different aspects under different circumstances. Upon closer examination, this structure is quite similar to a neural network:
However, the problem is that all weights are hardcoded, for example:
if marketType == "Bull"
array.set(weights, 0, 2.0) // Trend weight is fixed at 2.0
array.set(weights, 1, 1.5) // RSI weight is fixed at 1.5
These numbers are completely fixed by the author based on market experience, without any learning or optimization.
Since the structure already resembles a neural network, why not make it truly capable of learning? My idea is simple:
This way, we preserve the original strategy’s logic while adding learning capability.
I chose the Inventor platform mainly because it supports Python and contains rich data.
I rewrote all indicators from the Pine script in Python, using the talib library to ensure calculation accuracy. This includes common indicators like EMA, MACD, RSI, ATR, as well as volume analysis and simple candlestick pattern recognition.
Following the original strategy’s logic, I determine market types (Bull, Bear, Eagle, Wolf, etc.) based on combinations of various indicators. This part is basically a combination of if-else logic.
This is the core part. I set up two sets of weights:
Final weight = Base weight × Market weight
Then I use these weights to calculate a weighted sum of the 10 indicators’ raw scores to get the “weight score.”
I built a simple network:
Training objective: Use the weight score at time t-1 to predict the price change at time t.
Instead of buying/selling directly based on score levels, I now look at predicted returns:
I also keep stop-loss and take-profit mechanisms to ensure controllable risk.
The strategy can normally collect training data. Each time there’s a new candlestick, it uses the previous candlestick’s weight score as a feature and the current candlestick’s percentage change relative to the previous one as the label.
The data looks roughly like this:
Weight Score = 15.6, Return = +0.8%
Weight Score = -8.2, Return = -1.2%
Weight Score = 22.1, Return = +0.3%
The neural network trains normally, with MSE loss gradually decreasing. I set it to retrain every 4 hours to ensure the model can adapt to market changes.
The model’s predictions do show some correlation with actual returns, but it’s not particularly strong. Main issues:
Thanks to stop-loss and take-profit protection, risk control for individual trades is decent. However, overall profitability is mediocre, mainly due to insufficient prediction accuracy.

This strategy has many areas for improvement, but with limited time and energy, I couldn’t optimize it deeply:
This exploration taught me an important lesson: good inspiration is all about timely implementation! When I saw the weight matrix design in the Pine script, I immediately thought of the possibility of improving it with neural networks. If I had just thought about it without taking action, or procrastinated, this idea would likely have been forgotten. Fortunately, the Inventor platform provided a Python environment and data interfaces, allowing me to quickly turn the idea into runnable code. From generating the idea to completing the basic implementation took only about a day. Although the final strategy’s performance was mediocre, running it in practice at least validated that this approach was feasible. More importantly, new ideas and improvement directions emerged during the implementation process. Without taking action promptly, none of these subsequent discoveries and thoughts would have occurred. Theoretical discussions can never compare to actually writing code, running data, and examining results. Quantitative trading is like this - there are many ideas, but truly valuable ones are those that get quickly implemented and validated.
”`python “‘backtest start: 2025-07-31 00:00:00 end: 2025-08-07 00:00:00 period: 1h basePeriod: 5m exchanges: [{“eid”:“Futures_Binance”,“currency”:“ETH_USDT”,“balance”:5000000,“fee”:[0.01,0.01]}] “’
import numpy as np from collections import deque import talib as TA
class Error_noSupport(BaseException): def init(self): Log(“Only futures trading is supported! #FF0000”)
class Error_AtBeginHasPosition(BaseException): def init(self): Log(“Futures position exists at startup! #FF0000”)
class ReturnPredictor: def init(self, input_size=10, hidden_size=20, output_size=1): “”“Return prediction network: X[t] -> yt+1”“” self.W1 = np.random.randn(input_size, hidden_size) * 0.1 self.b1 = np.zeros((1, hidden_size)) self.W2 = np.random.randn(hidden_size, output_size) * 0.1 self.b2 = np.zeros((1, output_size)) self.learning_rate = 0.001
def sigmoid(self, x):
return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
def tanh(self, x):
return np.tanh(x)
def forward(self, X):
self.z1 = np.dot(X, self.W1) + self.b1
self.a1 = self.sigmoid(self.z1)
self.z2 = np.dot(self.a1, self.W2) + self.b2
# Output predicted return rate, using tanh to limit to reasonable range
self.a2 = self.tanh(self.z2) * 0.1 # Limit to ±10% range
return self.a2
def backward(self, X, y, output):
m = X.shape[0]
# MSE loss gradient
dZ2 = (output - y) / m
# tanh derivative
tanh_derivative = 1 - (output / 0.1) ** 2
dZ2 = dZ2 * 0.1 * tanh_derivative
dW2 = np.dot(self.a1.T, dZ2)
db2 = np.sum(dZ2, axis=0, keepdims=True)
dA1 = np.dot(dZ2, self.W2.T)
dZ1 = dA1 * self.a1 * (1 - self.a1) # sigmoid derivative
dW1 = np.dot(X.T, dZ1)
db1 = np.sum(dZ1, axis=0, keepdims=True)
# Update weights
self.W2 -= self.learning_rate * dW2
self.b2 -= self.learning_rate * db2
self.W1 -= self.learning_rate * dW1
self.b1 -= self.learning_rate * db1
def train(self, X, y, epochs=100):
for i in range(epochs):
output = self.forward(X)
self.backward(X, y, output)
if i % 20 == 0:
loss = np.mean((output - y) ** 2)
Log(f"Return prediction training epoch {i}, MSE loss: {loss:.6f}")
def predict(self, X):
return self.forward(X)
class TechnicalIndicators: @staticmethod def calculate_indicators(records, use_completed_only=True): “”“Calculate technical indicators and features”“” if len(records) < 55: return None, None
# Only use completed candlestick data
if use_completed_only and len(records) > 1:
working_records = records[:-1]
else:
working_records = records
if len(working_records) < 55:
return None, None
closes = np.array([r['Close'] for r in working_records])
highs = np.array([r['High'] for r in working_records])
lows = np.array([r['Low'] for r in working_records])
volumes = np.array([r['Volume'] for r in working_records])
opens = np.array([r['Open'] for r in working_records])
try:
# Basic indicators
ema_55 = TA.EMA(closes, timeperiod=55)
sma_vol20 = TA.SMA(volumes, timeperiod=20)
macd, signal_line, _ = TA.MACD(closes)
rsi_val = TA.RSI(closes, timeperiod=14)
atr14 = TA.ATR(highs, lows, closes, timeperiod=14)
range20 = TA.STDDEV(closes, timeperiod=20)
# Calculate derived indicators
sma_atr20 = TA.SMA(atr14, timeperiod=20)
sma_range20 = TA.SMA(range20, timeperiod=20)
rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes)
delta = closes - opens
# Calculate volume thresholds
vol_abs_thresh = sma_vol20 * 1.2
sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1]
# Trend
trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0))
# Simplified candlestick patterns
body_size = np.abs(closes - opens)
total_range = highs - lows
# Hammer pattern
is_hammer = ((total_range > 3 * body_size) &
((closes - lows) / (total_range + 0.001) > 0.6) &
((opens - lows) / (total_range + 0.001) > 0.6))
# Engulfing pattern
is_engulfing = np.zeros_like(closes, dtype=bool)
if len(closes) >= 2:
is_engulfing[1:] = ((closes[1:] > opens[:-1]) &
(opens[1:] < closes[:-1]) &
(closes[1:] > opens[1:]) &
(opens[1:] < closes[1:]))
pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0))
# 🔥 Calculate normalized feature vectors (for neural network input)
features = []
# 1. Trend feature
if len(ema_55) > 0 and not np.isnan(ema_55[-1]):
trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1]
features.append(np.tanh(trend_feature * 100))
else:
features.append(0)
# 2. RSI feature
if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]):
rsi_feature = (rsi_val[-1] - 50) / 50
features.append(rsi_feature)
else:
features.append(0)
# 3. MACD feature
if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]):
macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0
features.append(np.tanh(macd_feature * 1000))
else:
features.append(0)
# 4. Volume feature
if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0:
vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1
features.append(np.tanh(vol_feature))
else:
features.append(0)
# 5. Relative volume feature
if len(rvol) > 0 and not np.isnan(rvol[-1]):
rvol_feature = rvol[-1] - 1
features.append(np.tanh(rvol_feature))
else:
features.append(0)
# 6. Delta feature
if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0:
delta_feature = delta[-1] / closes[-1]
features.append(np.tanh(delta_feature * 100))
else:
features.append(0)
# 7. ATR feature
if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0:
atr_feature = atr14[-1] / sma_atr20[-1] - 1
features.append(np.tanh(atr_feature))
else:
features.append(0)
# 8. Blocks feature
if len(volumes) >= 10:
highest_vol = np.max(volumes[-10:])
blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0
features.append(np.tanh(blocks_feature * 5))
else:
features.append(0)
# 9. Tick feature
if len(sma_vol20) > 0 and sma_vol20[-1] > 0:
tick_feature = volumes[-1] / sma_vol20[-1] - 1
features.append(np.tanh(tick_feature))
else:
features.append(0)
# 10. Pattern feature
pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0
features.append(pattern_feature)
# Ensure correct feature count
while len(features) < 10:
features.append(0)
features = np.array(features[:10]).reshape(1, -1)
indicators = {
'ema_55': ema_55,
'sma_vol20': sma_vol20,
'macd': macd,
'signal_line': signal_line,
'rsi_val': rsi_val,
'atr14': atr14,
'range20': range20,
'sma_atr20': sma_atr20,
'sma_range20': sma_range20,
'rvol': rvol,
'delta': delta,
'vol_abs_thresh': vol_abs_thresh,
'sniper_thresh': sniper_thresh,
'trend': trend,
'pattern': pattern,
'volumes': volumes,
'closes': closes,
'highs': highs,
'lows': lows
}
return indicators, features
except Exception as e:
Log(f"Technical indicators calculation error: {str(e)}")
return None, None
class MarketStateDetector: @staticmethod def detect_market_type(indicators): “”“Detect market state”“” if indicators is None: return “Unknown”
try:
# Get latest values
close = indicators['closes'][-1]
ema_55 = indicators['ema_55'][-1]
macd = indicators['macd'][-1]
signal_line = indicators['signal_line'][-1]
rsi_val = indicators['rsi_val'][-1]
atr14 = indicators['atr14'][-1]
volume = indicators['volumes'][-1]
sma_vol20 = indicators['sma_vol20'][-1]
sma_atr20 = indicators['sma_atr20'][-1]
range20 = indicators['range20'][-1]
sma_range20 = indicators['sma_range20'][-1]
rvol = indicators['rvol'][-1]
delta = indicators['delta'][-1]
# Check validity
if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or
np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)):
return "Unknown"
# Market type judgment
is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1)
is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20)
is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20)
is_volatile = (atr14 > sma_atr20 * 1.2)
# Judgments requiring historical data
if len(indicators['closes']) >= 2:
price_change = indicators['closes'][-1] - indicators['closes'][-2]
is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5)
is_wolf = (price_change < -atr14 and close < ema_55)
else:
is_momentum = False
is_wolf = False
is_mean_rev = (rsi_val > 70 or rsi_val < 30)
is_box = (is_sideways and range20 < sma_range20 * 0.8)
is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False
is_eagle = (is_bull and atr14 < sma_atr20 * 0.8)
# Priority judgment
if is_eagle:
return "Eagle"
elif is_bull:
return "Bull"
elif is_wolf:
return "Wolf"
elif is_bear:
return "Bear"
elif is_box:
return "Box"
elif is_sideways:
return "Sideways"
elif is_volatile:
return "Volatile"
elif is_momentum:
return "Momentum"
elif is_mean_rev:
return "MeanRev"
elif is_macro:
return "Macro"
else:
return "Unknown"
except Exception as e:
Log(f"Market state detection error: {str(e)}")
return "Unknown"
class DynamicWeightGenerator: @staticmethod def generate_weights_from_predicted_return(predicted_return, market_type): “”“Generate dynamic weights based on predicted return rate and market state”“”
# Base weight matrix (different market types)
base_weights_matrix = {
"Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0],
"Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0],
"Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1],
"Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9],
"Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0],
"Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3],
"Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0],
}
base_weights = base_weights_matrix.get(market_type, [1.0] * 10)
# 🔥 Dynamically adjust weights based on predicted return rate
adjustment_factors = [1.0] * 10
# Strength of predicted return rate
return_strength = abs(predicted_return)
return_direction = 1 if predicted_return > 0 else -1
if return_strength > 0.02: # Strong prediction signal > 2%
if return_direction > 0: # Predict upward
adjustment_factors[0] *= 1.3 # Enhance trend weight
adjustment_factors[2] *= 1.2 # Enhance MACD weight
adjustment_factors[4] *= 1.15 # Enhance relative volume weight
adjustment_factors[1] *= 0.9 # Reduce RSI weight
else: # Predict downward
adjustment_factors[1] *= 1.3 # Enhance RSI weight
adjustment_factors[3] *= 1.2 # Enhance volume weight
adjustment_factors[0] *= 0.9 # Reduce trend weight
elif return_strength > 0.01: # Medium prediction signal 1%-2%
if return_direction > 0:
adjustment_factors[0] *= 1.15
adjustment_factors[2] *= 1.1
else:
adjustment_factors[1] *= 1.15
adjustment_factors[3] *= 1.1
# Volatility adjustment
if return_strength > 0.03: # High volatility expectation > 3%
adjustment_factors[4] *= 1.2 # Enhance relative volume weight
adjustment_factors[6] *= 1.15 # Enhance sniper weight
adjustment_factors[7] *= 1.1 # Enhance blocks weight
# Generate final dynamic weights
dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)]
# Weight normalization (optional)
# total_weight = sum(dynamic_weights)
# dynamic_weights = [w / total_weight * 10 for w in dynamic_weights]
return dynamic_weights
class SmartScoringSystem: def init(self): self.return_predictor = ReturnPredictor() self.weight_generator = DynamicWeightGenerator() self.is_model_trained = False
def calculate_score(self, indicators, market_type, features=None):
"""Calculate trading score (using dynamic weights from predicted return rate)"""
if indicators is None:
return 50.0
try:
# 🔥 Core logic: Use current indicators to predict next period return rate
if self.is_model_trained and features is not None:
predicted_return = self.return_predictor.predict(features)[0, 0]
else:
predicted_return = 0.0
Log(f"📊 Using base weights for calculation")
# Generate dynamic weights based on predicted return rate
dynamic_weights = self.weight_generator.generate_weights_from_predicted_return(
predicted_return, market_type)
# Get latest indicator values
trend = indicators['trend'][-1]
rsi_val = indicators['rsi_val'][-1]
macd = indicators['macd'][-1]
signal_line = indicators['signal_line'][-1]
volume = indicators['volumes'][-1]
vol_abs_thresh = indicators['vol_abs_thresh'][-1]
sma_vol20 = indicators['sma_vol20'][-1]
rvol = indicators['rvol'][-1]
delta = indicators['delta'][-1]
sniper_thresh = indicators['sniper_thresh']
pattern = indicators['pattern'][-1]
# Calculate individual scores
base_score = 0.0
# 1. Trend score
trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0)
base_score += trend_score * dynamic_weights[0]
# 2. RSI score
rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0)
base_score += rsi_score * dynamic_weights[1]
# 3. MACD score
macd_score = 10 if macd > signal_line else -10
base_score += macd_score * dynamic_weights[2]
# 4. Volume score
vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0)
base_score += vol_score * dynamic_weights[3]
# 5. Relative volume score
rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0)
base_score += rvol_score * dynamic_weights[4]
# 6. Delta score
delta_score = 6 if delta > 0 else -6
base_score += delta_score * dynamic_weights[5]
# 7. Sniper score
sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0)
base_score += sniper_score * dynamic_weights[6]
# 8. Blocks score
if len(indicators['volumes']) >= 10:
highest_vol = np.max(indicators['volumes'][-10:])
blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0)
else:
blocks_score = 0
base_score += blocks_score * dynamic_weights[7]
# 9. Tick score
tick_score = 5 if volume > sma_vol20 else -5
base_score += tick_score * dynamic_weights[8]
# 10. Pattern score
pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0)
base_score += pattern_score * dynamic_weights[9]
# Convert to percentage score
score_pct = max(0, min(100, 50 + base_score))
return score_pct
except Exception as e:
Log(f"Score calculation error: {str(e)}")
return 50.0
def train_return_predictor(self, X, y):
"""Train return rate predictor"""
if len(X) < 20:
Log("Insufficient training data, skipping return predictor training")
return False
X_array = np.array(X)
y_array = np.array(y).reshape(-1, 1)
Log(f"🧠 Starting return predictor training, sample count: {len(X_array)}")
Log(f"📊 Return rate range: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]")
self.return_predictor.train(X_array, y_array, epochs=100)
self.is_model_trained = True
# Validate model prediction performance
predictions = self.return_predictor.predict(X_array)
mse = np.mean((predictions - y_array) ** 2)
correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1]
Log(f"✅ Return predictor training completed")
Log(f"📈 MSE: {mse:.6f}, Correlation coefficient: {correlation:.4f}")
return True
class DynamicParameterManager: def init(self): self.market_params = { “Bull”: {“stop_loss”: 0.02, “take_profit”: 0.05}, “Bear”: {“stop_loss”: 0.02, “take_profit”: 0.05}, “Eagle”: {“stop_loss”: 0.015, “take_profit”: 0.06}, “Wolf”: {“stop_loss”: 0.025, “take_profit”: 0.04}, “Momentum”: {“stop_loss”: 0.025, “take_profit”: 0.06}, “Sideways”: {“stop_loss”: 0.01, “take_profit”: 0.02}, “Volatile”: {“stop_loss”: 0.03, “take_profit”: 0.07}, “Unknown”: {“stop_loss”: 0.02, “take_profit”: 0.03} }
def get_params(self, market_type):
return self.market_params.get(market_type, self.market_params["Unknown"])
class PredictiveNeuralTradingStrategy: def init(self): self.data_buffer = deque(maxlen=200) self.feature_buffer = deque(maxlen=100) self.label_buffer = deque(maxlen=100) # Store return rate labels self.scoring_system = SmartScoringSystem() self.param_manager = DynamicParameterManager()
# Training control
self.last_retrain_time = 0
self.retrain_interval = 3600 * 6 # Retrain every 6 hours
self.min_train_samples = 30
# Trading state
self.POSITION_NONE = 0
self.POSITION_LONG = 1
self.POSITION_SHORT = 2
self.position_state = self.POSITION_NONE
# Trading records
self.open_price = 0
self.counter = {'win': 0, 'loss': 0}
# K-line data management
self.last_processed_time = 0
def get_current_position(self):
"""Get current futures position status"""
try:
positions = exchange.GetPosition()
if not positions:
return self.POSITION_NONE, 0
long_amount = 0
short_amount = 0
for pos in positions:
amount = pos.get('Amount', 0)
pos_type = pos.get('Type', -1)
if amount > 0:
if pos_type == 0: # Long position
long_amount += amount
elif pos_type == 1: # Short position
short_amount += amount
net_position = long_amount - short_amount
if net_position > 0:
return self.POSITION_LONG, net_position
elif net_position < 0:
return self.POSITION_SHORT, abs(net_position)
else:
return self.POSITION_NONE, 0
except Exception as e:
Log(f"Get position error: {str(e)}")
return self.POSITION_NONE, 0
def collect_data(self, records):
"""Collect data and generate training samples"""
if not records or len(records) < 55:
return False
# Check if there's a new completed K-line
if len(records) > 1:
latest_completed = records[-2]
current_time = latest_completed['Time']
# Skip if this K-line has already been processed
if current_time <= self.last_processed_time:
return False
self.last_processed_time = current_time
# Add completed K-lines to buffer
completed_records = records[:-1] if len(records) > 1 else []
if completed_records:
self.data_buffer.extend(completed_records[-5:])
# 🔥 Generate training samples: X[t] -> y[t+1]
if len(self.data_buffer) >= 2:
# Use the second-to-last record as features, last record to calculate return rate label
buffer_list = list(self.data_buffer)
# Calculate t-1 moment indicators as features
feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list
indicators, features = TechnicalIndicators.calculate_indicators(
feature_records, use_completed_only=False)
if indicators is not None and features is not None:
# Calculate t moment return rate relative to t-1 moment as label
if len(buffer_list) >= 2:
current_close = buffer_list[-1]['Close']
previous_close = buffer_list[-2]['Close']
if previous_close > 0:
return_rate = (current_close - previous_close) / previous_close
# Add to training set
self.feature_buffer.append(features[0])
self.label_buffer.append(return_rate)
Log(f"📈 New sample: Return rate={return_rate*100:.3f}%, Feature dimensions={features.shape}")
return True
def should_retrain(self):
"""Determine if retraining is needed"""
import time
current_time = time.time()
return (current_time - self.last_retrain_time > self.retrain_interval and
len(self.feature_buffer) >= self.min_train_samples)
def train_model(self):
"""Train return rate predictor"""
if len(self.feature_buffer) < self.min_train_samples:
Log("Insufficient training data, skipping training")
return False
X = list(self.feature_buffer)
y = list(self.label_buffer)
success = self.scoring_system.train_return_predictor(X, y)
if success:
import time
self.last_retrain_time = time.time()
return success
def get_trading_signals(self, records):
"""Get trading signals"""
# Calculate current moment technical indicators
indicators, features = TechnicalIndicators.calculate_indicators(
list(self.data_buffer), use_completed_only=False)
if indicators is None:
return 50.0, "Unknown"
# Detect market type
market_type = MarketStateDetector.detect_market_type(indicators)
# 🔥 Calculate score using dynamic weights from predicted return rate
score = self.scoring_system.calculate_score(indicators, market_type, features)
return score, market_type
def check_entry_conditions(self, score, market_type):
"""Check entry conditions"""
# Long conditions
long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65)
# Short conditions
short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35)
return long_condition, short_condition
def open_long(self):
"""Open long position"""
try:
ticker = exchange.GetTicker()
if not ticker:
return False
buy_price = ticker['Last'] + 20
order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP)
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
self.open_price = order_info.get('AvgPrice', buy_price)
self.position_state = self.POSITION_LONG
Log(f"🚀 Long position opened successfully: Price={self.open_price}, Amount={AmountOP}")
return True
else:
exchange.CancelOrder(order_id)
Log("Long position order not fully filled, cancelled")
return False
except Exception as e:
Log(f"Open long position error: {str(e)}")
return False
def open_short(self):
"""Open short position"""
try:
ticker = exchange.GetTicker()
if not ticker:
return False
sell_price = ticker['Last'] - 20
order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP)
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
self.open_price = order_info.get('AvgPrice', sell_price)
self.position_state = self.POSITION_SHORT
Log(f"🎯 Short position opened successfully: Price={self.open_price}, Amount={AmountOP}")
return True
else:
exchange.CancelOrder(order_id)
Log("Short position order not fully filled, cancelled")
return False
except Exception as e:
Log(f"Open short position error: {str(e)}")
return False
def close_position(self):
"""Close position"""
try:
positions = exchange.GetPosition()
if not positions:
Log("No position to close")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
ticker = exchange.GetTicker()
if not ticker:
return False
close_success = True
for pos in positions:
if pos['Amount'] == 0:
continue
amount = pos['Amount']
pos_type = pos['Type']
if pos_type == 0: # Close long position
close_price = ticker['Last'] - 20
order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
Log(f"📤 Close long position: Price={close_price}, Amount={amount}")
elif pos_type == 1: # Close short position
close_price = ticker['Last'] + 20
order_id = exchange.CreateOrder("", "closesell", close_price, amount)
Log(f"📤 Close short position: Price={close_price}, Amount={amount}")
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
close_price = order_info.get('AvgPrice', close_price)
Log(f"✅ Position closed successfully: Fill price={close_price}")
self.update_profit_stats(close_price)
else:
exchange.CancelOrder(order_id)
close_success = False
Log(f"Close position order not fully filled, cancelled")
else:
close_success = False
Log("Failed to create close position order")
if close_success:
self.position_state = self.POSITION_NONE
self.open_price = 0
return close_success
except Exception as e:
Log(f"Close position error: {str(e)}")
return False
def update_profit_stats(self, close_price):
"""Update profit/loss statistics"""
if self.open_price == 0:
return
if self.position_state == self.POSITION_LONG:
if close_price > self.open_price:
self.counter['win'] += 1
Log("💰 Long position profitable")
else:
self.counter['loss'] += 1
Log("💸 Long position loss")
elif self.position_state == self.POSITION_SHORT:
if close_price < self.open_price:
self.counter['win'] += 1
Log("💰 Short position profitable")
else:
self.counter['loss'] += 1
Log("💸 Short position loss")
def check_stop_loss_take_profit(self, current_price, params):
"""Check stop loss and take profit, execute closing"""
if self.open_price == 0 or self.position_state == self.POSITION_NONE:
return False
stop_loss_pct = params["stop_loss"]
take_profit_pct = params["take_profit"]
if self.position_state == self.POSITION_LONG:
profit_pct = (current_price - self.open_price) / self.open_price
if profit_pct <= -stop_loss_pct:
Log(f"🔴 Long position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
return self.execute_close_position("Stop Loss")
elif profit_pct >= take_profit_pct:
Log(f"🟢 Long position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
return self.execute_close_position("Take Profit")
elif self.position_state == self.POSITION_SHORT:
profit_pct = (self.open_price - current_price) / self.open_price
if profit_pct <= -stop_loss_pct:
Log(f"🔴 Short position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
return self.execute_close_position("Stop Loss")
elif profit_pct >= take_profit_pct:
Log(f"🟢 Short position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
return self.execute_close_position("Take Profit")
return False
def execute_close_position(self, reason):
"""Execute close position operation (specifically for stop loss/take profit)"""
try:
positions = exchange.GetPosition()
if not positions:
Log(f"{reason} close position: No position")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
ticker = exchange.GetTicker()
if not ticker:
Log(f"{reason} close position failed: Cannot get ticker")
return False
Log(f"🚨 Executing {reason} close position operation...")
close_success = True
for pos in positions:
if pos['Amount'] == 0:
continue
amount = pos['Amount']
pos_type = pos['Type']
order_id = None
if pos_type == 0: # Close long position
close_price = ticker['Last'] - 50
order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
Log(f"📤 {reason} close long position order: Price={close_price}, Amount={amount}")
elif pos_type == 1: # Close short position
close_price = ticker['Last'] + 50
order_id = exchange.CreateOrder("", "closesell", close_price, amount)
Log(f"📤 {reason} close short position order: Price={close_price}, Amount={amount}")
if order_id:
Log(f"📋 {reason} close position order ID: {order_id}")
Sleep(1500)
for retry in range(2):
order_info = exchange.GetOrder(order_id)
if order_info:
status = order_info.get('Status', -1)
if status == 1:
close_price = order_info.get('AvgPrice', close_price)
Log(f"✅ {reason} close position successful: Fill price={close_price}")
self.update_profit_stats(close_price)
break
elif status == 0:
if retry == 0:
Log(f"⏳ {reason} close position order executing, waiting...")
Sleep(1500)
else:
Log(f"⚠️ {reason} close position order not fully filled, force cancel")
exchange.CancelOrder(order_id)
close_success = False
else:
Log(f"❌ {reason} close position order status abnormal: {status}")
exchange.CancelOrder(order_id)
close_success = False
break
else:
Log(f"⚠️ Cannot get {reason} close position order info, retry {retry+1}/2")
if retry == 1:
close_success = False
else:
Log(f"❌ {reason} close position order creation failed")
close_success = False
if close_success:
Sleep(1000)
new_positions = exchange.GetPosition()
total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0
if total_amount == 0:
Log(f"✅ {reason} close position completed, position cleared")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
else:
Log(f"⚠️ {reason} close position incomplete, remaining position: {total_amount}")
return False
else:
Log(f"❌ {reason} close position failed")
return False
except Exception as e:
Log(f"❌ {reason} close position error: {str(e)}")
return False
def execute_trade_logic(self, score, market_type, current_price):
"""Execute trading logic"""
params = self.param_manager.get_params(market_type)
# Get current actual position status
actual_position, position_amount = self.get_current_position()
# Synchronize internal state
self.position_state = actual_position
# First check stop loss/take profit (highest priority)
if self.position_state != self.POSITION_NONE:
if self.check_stop_loss_take_profit(current_price, params):
Log("🚨 Stop loss/take profit triggered, executed close position, skipping other trading signals")
return
# Get entry conditions
long_condition, short_condition = self.check_entry_conditions(score, market_type)
# Execute trading logic
if long_condition and self.position_state <= self.POSITION_NONE:
Log(f"📈 Long position signal: Market={market_type}, Prediction score={score:.1f} > 65")
self.open_long()
if short_condition and self.position_state >= self.POSITION_NONE:
Log(f"📉 Short position signal: Market={market_type}, Prediction score={score:.1f} < 35")
self.open_short()
if not long_condition and self.position_state > self.POSITION_NONE:
Log(f"📤 Close long position signal: Market={market_type}, Prediction score={score:.1f}")
self.close_position()
if not short_condition and self.position_state < self.POSITION_NONE:
Log(f"📤 Close short position signal: Market={market_type}, Prediction score={score:.1f}")
self.close_position()
def CancelPendingOrders(): “”“Cancel all pending orders”“” while True: orders = exchange.GetOrders() if not orders: break for order in orders: exchange.CancelOrder(order[‘Id’]) Sleep(500)
def main(): global AmountOP, LoopInterval
# Check initial position
initial_positions = exchange.GetPosition()
if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions):
raise Error_AtBeginHasPosition()
# Cancel all pending orders
CancelPendingOrders()
# Initialize strategy
strategy = PredictiveNeuralTradingStrategy()
Log("🔮 Predictive Neural Network Futures Trading Strategy Started")
LogProfitReset()
# Data warm-up period
Log("Entering data warm-up period...")
warmup_count = 0
warmup_target = 60
while warmup_count < warmup_target:
records = exchange.GetRecords()
if records and len(records) >= 55:
if strategy.collect_data(records):
warmup_count += 1
if warmup_count % 10 == 0:
Log(f"Warm-up progress: {warmup_count}/{warmup_target}")
Sleep(5000)
Log("Data warm-up completed, starting initial return rate predictor training...")
strategy.train_model()
# Main trading loop
loop_count = 0
while True:
loop_count += 1
# Get K-line data
records = exchange.GetRecords()
if not records or len(records) < 55:
Sleep(LoopInterval * 1000)
continue
# Data processing
data_updated = strategy.collect_data(records)
# Check if retraining is needed
if strategy.should_retrain():
Log("🔄 Retraining return rate predictor...")
strategy.train_model()
# Get trading signals
score, market_type = strategy.get_trading_signals(records)
# Get current real-time price
ticker = exchange.GetTicker()
if ticker:
current_price = ticker['Last']
else:
current_price = records[-1]['Close']
# Get current parameters
params = strategy.param_manager.get_params(market_type)
# Priority check for stop loss/take profit (using real-time price)
if strategy.position_state != strategy.POSITION_NONE:
if strategy.check_stop_loss_take_profit(current_price, params):
Log("⚡ Stop loss/take profit triggered, executed close position")
Sleep(LoopInterval * 1000)
continue
# Execute trading logic (only when there's new data)
if data_updated:
strategy.execute_trade_logic(score, market_type, current_price)
# Status display
pos_state_name = {
strategy.POSITION_NONE: "No Position",
strategy.POSITION_LONG: "Long",
strategy.POSITION_SHORT: "Short"
}.get(strategy.position_state, "Unknown")
data_status = "📊New Data" if data_updated else "⏸️Waiting"
model_status = "🔮Prediction" if strategy.scoring_system.is_model_trained else "📊Basic"
# Get entry conditions for display
long_cond, short_cond = strategy.check_entry_conditions(score, market_type)
signal_status = ""
if long_cond:
signal_status