avatar of 发明者量化-小小梦 发明者量化-小小梦
关注 私信
4
关注
1271
关注者

From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy

创建于: 2025-08-11 17:22:24, 更新于: 2025-08-12 09:03:59
comments   0
hits   150

From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy

Accidentally Discovered an Interesting Pine Strategy

A few days ago, while browsing strategies on the Inventor forum, I came across one called “Panel Pro+ Quantum SmartPrompt”. After reviewing the code, I found this strategy’s approach quite interesting: it uses 10 technical indicators, assigns different weights to each indicator based on market conditions, and finally calculates a score to determine buy/sell decisions. For example, in a bull market state, trend indicators have a weight of 2.0 and RSI has a weight of 1.5; in a bear market state, the weights are different. It feels like it’s mimicking human thinking: focusing on different aspects under different circumstances. Upon closer examination, this structure is quite similar to a neural network:

  • 10 technical indicators as inputs
  • Market state classification acts like a hidden layer
  • Weight matrices serve as connection weights
  • Finally outputs a score

However, the problem is that all weights are hardcoded, for example:

if marketType == "Bull"
    array.set(weights, 0, 2.0) // Trend weight is fixed at 2.0
    array.set(weights, 1, 1.5) // RSI weight is fixed at 1.5

These numbers are completely fixed by the author based on market experience, without any learning or optimization.

Idea: Make the Weights Learnable

Since the structure already resembles a neural network, why not make it truly capable of learning? My idea is simple:

  • 1.Keep the original weight calculation method to get a “weight score”
  • 2.Use this weight score as input to train a small neural network
  • 3.Let the network learn to predict future returns from the weight score
  • 4.Decide whether to open positions based on the predicted return magnitude

This way, we preserve the original strategy’s logic while adding learning capability.

Implementation on the Inventor Platform

I chose the Inventor platform mainly because it supports Python and contains rich data.

Step 1: Rewrite Technical Indicators

I rewrote all indicators from the Pine script in Python, using the talib library to ensure calculation accuracy. This includes common indicators like EMA, MACD, RSI, ATR, as well as volume analysis and simple candlestick pattern recognition.

Step 2: Market State Detection

Following the original strategy’s logic, I determine market types (Bull, Bear, Eagle, Wolf, etc.) based on combinations of various indicators. This part is basically a combination of if-else logic.

Step 3: Weight Score Calculation

This is the core part. I set up two sets of weights:

  • Base weights: [2.0, 1.5, 2.0, 1.3, 1.2, …]
  • Market weights: adjusted according to different market states

Final weight = Base weight × Market weight

Then I use these weights to calculate a weighted sum of the 10 indicators’ raw scores to get the “weight score.”

Step 4: Neural Network Predictor

I built a simple network:

  • Input: 1 feature (weight score)
  • Hidden layer: 16 neurons with ReLU activation
  • Output: predicted return, constrained to ±5% using tanh

Training objective: Use the weight score at time t-1 to predict the price change at time t.

Step 5: Trading Logic

Instead of buying/selling directly based on score levels, I now look at predicted returns:

  • Predicted return > 1.5%: go long or close short and go long
  • Predicted return < -1.5%: go short or close long and go short
  • Other cases: maintain current position

I also keep stop-loss and take-profit mechanisms to ensure controllable risk.

Observations from Actual Operation

Data Collection

The strategy can normally collect training data. Each time there’s a new candlestick, it uses the previous candlestick’s weight score as a feature and the current candlestick’s percentage change relative to the previous one as the label.

The data looks roughly like this:

Weight Score = 15.6, Return = +0.8%
Weight Score = -8.2, Return = -1.2%
Weight Score = 22.1, Return = +0.3%

Model Training

The neural network trains normally, with MSE loss gradually decreasing. I set it to retrain every 4 hours to ensure the model can adapt to market changes.

Prediction Performance

The model’s predictions do show some correlation with actual returns, but it’s not particularly strong. Main issues:

  • 1.Single feature is too simple, possibly insufficient information
  • 2.Short-term price movements have high randomness
  • 3.Futures markets have considerable noise

Trading Performance

Thanks to stop-loss and take-profit protection, risk control for individual trades is decent. However, overall profitability is mediocre, mainly due to insufficient prediction accuracy.

From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy

Problems Encountered

  • Features Too Limited: Using only the weight score as a single feature is indeed too simple. Markets are so complex that one number can hardly capture everything.
  • Unstable Sample Quality: Contract prices have large short-term fluctuations, and often the ups and downs are actually random, making the quality of training samples unstable.
  • Overfitting Risk: Although the network is simple, there’s still potential for overfitting when sample size is limited.
  • Real-time Requirements: Online learning needs to balance training time with real-time performance.

Limited Time, Insufficient Optimization

This strategy has many areas for improvement, but with limited time and energy, I couldn’t optimize it deeply:

  • Feature Aspects: Could add more technical indicators or use statistical features of price sequences.
  • Model Aspects: Could try sequence models like LSTM, or use ensemble methods with multiple models.
  • Data Aspects: Improve sample quality and add data cleaning.
  • Risk Management: Refine dynamic stop-loss and optimize position management.

Gains and Reflections

This exploration taught me an important lesson: good inspiration is all about timely implementation! When I saw the weight matrix design in the Pine script, I immediately thought of the possibility of improving it with neural networks. If I had just thought about it without taking action, or procrastinated, this idea would likely have been forgotten. Fortunately, the Inventor platform provided a Python environment and data interfaces, allowing me to quickly turn the idea into runnable code. From generating the idea to completing the basic implementation took only about a day. Although the final strategy’s performance was mediocre, running it in practice at least validated that this approach was feasible. More importantly, new ideas and improvement directions emerged during the implementation process. Without taking action promptly, none of these subsequent discoveries and thoughts would have occurred. Theoretical discussions can never compare to actually writing code, running data, and examining results. Quantitative trading is like this - there are many ideas, but truly valuable ones are those that get quickly implemented and validated.

”`python “‘backtest start: 2025-07-31 00:00:00 end: 2025-08-07 00:00:00 period: 1h basePeriod: 5m exchanges: [{“eid”:“Futures_Binance”,“currency”:“ETH_USDT”,“balance”:5000000,“fee”:[0.01,0.01]}] “’

import numpy as np from collections import deque import talib as TA

========== Exception Classes ==========

class Error_noSupport(BaseException): def init(self): Log(“Only futures trading is supported! #FF0000”)

class Error_AtBeginHasPosition(BaseException): def init(self): Log(“Futures position exists at startup! #FF0000”)

========== Return Prediction Neural Network ==========

class ReturnPredictor: def init(self, input_size=10, hidden_size=20, output_size=1): “”“Return prediction network: X[t] -> yt+1”“” self.W1 = np.random.randn(input_size, hidden_size) * 0.1 self.b1 = np.zeros((1, hidden_size)) self.W2 = np.random.randn(hidden_size, output_size) * 0.1 self.b2 = np.zeros((1, output_size)) self.learning_rate = 0.001

def sigmoid(self, x):
    return 1 / (1 + np.exp(-np.clip(x, -250, 250)))

def tanh(self, x):
    return np.tanh(x)

def forward(self, X):
    self.z1 = np.dot(X, self.W1) + self.b1
    self.a1 = self.sigmoid(self.z1)
    self.z2 = np.dot(self.a1, self.W2) + self.b2
    # Output predicted return rate, using tanh to limit to reasonable range
    self.a2 = self.tanh(self.z2) * 0.1  # Limit to ±10% range
    return self.a2

def backward(self, X, y, output):
    m = X.shape[0]

    # MSE loss gradient
    dZ2 = (output - y) / m
    # tanh derivative
    tanh_derivative = 1 - (output / 0.1) ** 2
    dZ2 = dZ2 * 0.1 * tanh_derivative

    dW2 = np.dot(self.a1.T, dZ2)
    db2 = np.sum(dZ2, axis=0, keepdims=True)

    dA1 = np.dot(dZ2, self.W2.T)
    dZ1 = dA1 * self.a1 * (1 - self.a1)  # sigmoid derivative
    dW1 = np.dot(X.T, dZ1)
    db1 = np.sum(dZ1, axis=0, keepdims=True)

    # Update weights
    self.W2 -= self.learning_rate * dW2
    self.b2 -= self.learning_rate * db2
    self.W1 -= self.learning_rate * dW1
    self.b1 -= self.learning_rate * db1

def train(self, X, y, epochs=100):
    for i in range(epochs):
        output = self.forward(X)
        self.backward(X, y, output)
        if i % 20 == 0:
            loss = np.mean((output - y) ** 2)
            Log(f"Return prediction training epoch {i}, MSE loss: {loss:.6f}")

def predict(self, X):
    return self.forward(X)

========== Technical Indicators Calculation Class ==========

class TechnicalIndicators: @staticmethod def calculate_indicators(records, use_completed_only=True): “”“Calculate technical indicators and features”“” if len(records) < 55: return None, None

    # Only use completed candlestick data
    if use_completed_only and len(records) > 1:
        working_records = records[:-1]
    else:
        working_records = records

    if len(working_records) < 55:
        return None, None

    closes = np.array([r['Close'] for r in working_records])
    highs = np.array([r['High'] for r in working_records])
    lows = np.array([r['Low'] for r in working_records])
    volumes = np.array([r['Volume'] for r in working_records])
    opens = np.array([r['Open'] for r in working_records])

    try:
        # Basic indicators
        ema_55 = TA.EMA(closes, timeperiod=55)
        sma_vol20 = TA.SMA(volumes, timeperiod=20)
        macd, signal_line, _ = TA.MACD(closes)
        rsi_val = TA.RSI(closes, timeperiod=14)
        atr14 = TA.ATR(highs, lows, closes, timeperiod=14)
        range20 = TA.STDDEV(closes, timeperiod=20)

        # Calculate derived indicators
        sma_atr20 = TA.SMA(atr14, timeperiod=20)
        sma_range20 = TA.SMA(range20, timeperiod=20)
        rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes)
        delta = closes - opens

        # Calculate volume thresholds
        vol_abs_thresh = sma_vol20 * 1.2
        sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1]

        # Trend
        trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0))

        # Simplified candlestick patterns
        body_size = np.abs(closes - opens)
        total_range = highs - lows

        # Hammer pattern
        is_hammer = ((total_range > 3 * body_size) & 
                    ((closes - lows) / (total_range + 0.001) > 0.6) & 
                    ((opens - lows) / (total_range + 0.001) > 0.6))

        # Engulfing pattern
        is_engulfing = np.zeros_like(closes, dtype=bool)
        if len(closes) >= 2:
            is_engulfing[1:] = ((closes[1:] > opens[:-1]) & 
                               (opens[1:] < closes[:-1]) & 
                               (closes[1:] > opens[1:]) & 
                               (opens[1:] < closes[1:]))

        pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0))

        # 🔥 Calculate normalized feature vectors (for neural network input)
        features = []

        # 1. Trend feature
        if len(ema_55) > 0 and not np.isnan(ema_55[-1]):
            trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1]
            features.append(np.tanh(trend_feature * 100))
        else:
            features.append(0)

        # 2. RSI feature
        if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]):
            rsi_feature = (rsi_val[-1] - 50) / 50
            features.append(rsi_feature)
        else:
            features.append(0)

        # 3. MACD feature
        if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]):
            macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0
            features.append(np.tanh(macd_feature * 1000))
        else:
            features.append(0)

        # 4. Volume feature
        if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0:
            vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1
            features.append(np.tanh(vol_feature))
        else:
            features.append(0)

        # 5. Relative volume feature
        if len(rvol) > 0 and not np.isnan(rvol[-1]):
            rvol_feature = rvol[-1] - 1
            features.append(np.tanh(rvol_feature))
        else:
            features.append(0)

        # 6. Delta feature
        if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0:
            delta_feature = delta[-1] / closes[-1]
            features.append(np.tanh(delta_feature * 100))
        else:
            features.append(0)

        # 7. ATR feature
        if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0:
            atr_feature = atr14[-1] / sma_atr20[-1] - 1
            features.append(np.tanh(atr_feature))
        else:
            features.append(0)

        # 8. Blocks feature
        if len(volumes) >= 10:
            highest_vol = np.max(volumes[-10:])
            blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0
            features.append(np.tanh(blocks_feature * 5))
        else:
            features.append(0)

        # 9. Tick feature
        if len(sma_vol20) > 0 and sma_vol20[-1] > 0:
            tick_feature = volumes[-1] / sma_vol20[-1] - 1
            features.append(np.tanh(tick_feature))
        else:
            features.append(0)

        # 10. Pattern feature
        pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0
        features.append(pattern_feature)

        # Ensure correct feature count
        while len(features) < 10:
            features.append(0)

        features = np.array(features[:10]).reshape(1, -1)

        indicators = {
            'ema_55': ema_55,
            'sma_vol20': sma_vol20,
            'macd': macd,
            'signal_line': signal_line,
            'rsi_val': rsi_val,
            'atr14': atr14,
            'range20': range20,
            'sma_atr20': sma_atr20,
            'sma_range20': sma_range20,
            'rvol': rvol,
            'delta': delta,
            'vol_abs_thresh': vol_abs_thresh,
            'sniper_thresh': sniper_thresh,
            'trend': trend,
            'pattern': pattern,
            'volumes': volumes,
            'closes': closes,
            'highs': highs,
            'lows': lows
        }

        return indicators, features

    except Exception as e:
        Log(f"Technical indicators calculation error: {str(e)}")
        return None, None

========== Market State Detection Class ==========

class MarketStateDetector: @staticmethod def detect_market_type(indicators): “”“Detect market state”“” if indicators is None: return “Unknown”

    try:
        # Get latest values
        close = indicators['closes'][-1]
        ema_55 = indicators['ema_55'][-1]
        macd = indicators['macd'][-1]
        signal_line = indicators['signal_line'][-1]
        rsi_val = indicators['rsi_val'][-1]
        atr14 = indicators['atr14'][-1]
        volume = indicators['volumes'][-1]
        sma_vol20 = indicators['sma_vol20'][-1]
        sma_atr20 = indicators['sma_atr20'][-1]
        range20 = indicators['range20'][-1]
        sma_range20 = indicators['sma_range20'][-1]
        rvol = indicators['rvol'][-1]
        delta = indicators['delta'][-1]

        # Check validity
        if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or 
            np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)):
            return "Unknown"

        # Market type judgment
        is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1)
        is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20)
        is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20)
        is_volatile = (atr14 > sma_atr20 * 1.2)

        # Judgments requiring historical data
        if len(indicators['closes']) >= 2:
            price_change = indicators['closes'][-1] - indicators['closes'][-2]
            is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5)
            is_wolf = (price_change < -atr14 and close < ema_55)
        else:
            is_momentum = False
            is_wolf = False

        is_mean_rev = (rsi_val > 70 or rsi_val < 30)
        is_box = (is_sideways and range20 < sma_range20 * 0.8)
        is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False
        is_eagle = (is_bull and atr14 < sma_atr20 * 0.8)

        # Priority judgment
        if is_eagle:
            return "Eagle"
        elif is_bull:
            return "Bull"
        elif is_wolf:
            return "Wolf"
        elif is_bear:
            return "Bear"
        elif is_box:
            return "Box"
        elif is_sideways:
            return "Sideways"
        elif is_volatile:
            return "Volatile"
        elif is_momentum:
            return "Momentum"
        elif is_mean_rev:
            return "MeanRev"
        elif is_macro:
            return "Macro"
        else:
            return "Unknown"

    except Exception as e:
        Log(f"Market state detection error: {str(e)}")
        return "Unknown"

========== Dynamic Weight Generator ==========

class DynamicWeightGenerator: @staticmethod def generate_weights_from_predicted_return(predicted_return, market_type): “”“Generate dynamic weights based on predicted return rate and market state”“”

    # Base weight matrix (different market types)
    base_weights_matrix = {
        "Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0],
        "Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0],
        "Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1],
        "Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9],
        "Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0],
        "Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3],
        "Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0],
    }

    base_weights = base_weights_matrix.get(market_type, [1.0] * 10)

    # 🔥 Dynamically adjust weights based on predicted return rate
    adjustment_factors = [1.0] * 10

    # Strength of predicted return rate
    return_strength = abs(predicted_return)
    return_direction = 1 if predicted_return > 0 else -1

    if return_strength > 0.02:  # Strong prediction signal > 2%
        if return_direction > 0:  # Predict upward
            adjustment_factors[0] *= 1.3  # Enhance trend weight
            adjustment_factors[2] *= 1.2  # Enhance MACD weight
            adjustment_factors[4] *= 1.15 # Enhance relative volume weight
            adjustment_factors[1] *= 0.9  # Reduce RSI weight
        else:  # Predict downward
            adjustment_factors[1] *= 1.3  # Enhance RSI weight
            adjustment_factors[3] *= 1.2  # Enhance volume weight
            adjustment_factors[0] *= 0.9  # Reduce trend weight

    elif return_strength > 0.01:  # Medium prediction signal 1%-2%
        if return_direction > 0:
            adjustment_factors[0] *= 1.15
            adjustment_factors[2] *= 1.1
        else:
            adjustment_factors[1] *= 1.15
            adjustment_factors[3] *= 1.1

    # Volatility adjustment
    if return_strength > 0.03:  # High volatility expectation > 3%
        adjustment_factors[4] *= 1.2  # Enhance relative volume weight
        adjustment_factors[6] *= 1.15 # Enhance sniper weight
        adjustment_factors[7] *= 1.1  # Enhance blocks weight

    # Generate final dynamic weights
    dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)]

    # Weight normalization (optional)
    # total_weight = sum(dynamic_weights)
    # dynamic_weights = [w / total_weight * 10 for w in dynamic_weights]

    return dynamic_weights

========== Smart Scoring System ==========

class SmartScoringSystem: def init(self): self.return_predictor = ReturnPredictor() self.weight_generator = DynamicWeightGenerator() self.is_model_trained = False

def calculate_score(self, indicators, market_type, features=None):
    """Calculate trading score (using dynamic weights from predicted return rate)"""
    if indicators is None:
        return 50.0

    try:
        # 🔥 Core logic: Use current indicators to predict next period return rate
        if self.is_model_trained and features is not None:
            predicted_return = self.return_predictor.predict(features)[0, 0]

        else:
            predicted_return = 0.0
            Log(f"📊 Using base weights for calculation")

        # Generate dynamic weights based on predicted return rate
        dynamic_weights = self.weight_generator.generate_weights_from_predicted_return(
            predicted_return, market_type)

        # Get latest indicator values
        trend = indicators['trend'][-1]
        rsi_val = indicators['rsi_val'][-1]
        macd = indicators['macd'][-1]
        signal_line = indicators['signal_line'][-1]
        volume = indicators['volumes'][-1]
        vol_abs_thresh = indicators['vol_abs_thresh'][-1]
        sma_vol20 = indicators['sma_vol20'][-1]
        rvol = indicators['rvol'][-1]
        delta = indicators['delta'][-1]
        sniper_thresh = indicators['sniper_thresh']
        pattern = indicators['pattern'][-1]

        # Calculate individual scores
        base_score = 0.0

        # 1. Trend score
        trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0)
        base_score += trend_score * dynamic_weights[0]

        # 2. RSI score
        rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0)
        base_score += rsi_score * dynamic_weights[1]

        # 3. MACD score
        macd_score = 10 if macd > signal_line else -10
        base_score += macd_score * dynamic_weights[2]

        # 4. Volume score
        vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0)
        base_score += vol_score * dynamic_weights[3]

        # 5. Relative volume score
        rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0)
        base_score += rvol_score * dynamic_weights[4]

        # 6. Delta score
        delta_score = 6 if delta > 0 else -6
        base_score += delta_score * dynamic_weights[5]

        # 7. Sniper score
        sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0)
        base_score += sniper_score * dynamic_weights[6]

        # 8. Blocks score
        if len(indicators['volumes']) >= 10:
            highest_vol = np.max(indicators['volumes'][-10:])
            blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0)
        else:
            blocks_score = 0
        base_score += blocks_score * dynamic_weights[7]

        # 9. Tick score
        tick_score = 5 if volume > sma_vol20 else -5
        base_score += tick_score * dynamic_weights[8]

        # 10. Pattern score
        pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0)
        base_score += pattern_score * dynamic_weights[9]

        # Convert to percentage score
        score_pct = max(0, min(100, 50 + base_score))

        return score_pct

    except Exception as e:
        Log(f"Score calculation error: {str(e)}")
        return 50.0

def train_return_predictor(self, X, y):
    """Train return rate predictor"""
    if len(X) < 20:
        Log("Insufficient training data, skipping return predictor training")
        return False

    X_array = np.array(X)
    y_array = np.array(y).reshape(-1, 1)

    Log(f"🧠 Starting return predictor training, sample count: {len(X_array)}")
    Log(f"📊 Return rate range: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]")

    self.return_predictor.train(X_array, y_array, epochs=100)
    self.is_model_trained = True

    # Validate model prediction performance
    predictions = self.return_predictor.predict(X_array)
    mse = np.mean((predictions - y_array) ** 2)
    correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1]

    Log(f"✅ Return predictor training completed")
    Log(f"📈 MSE: {mse:.6f}, Correlation coefficient: {correlation:.4f}")

    return True

========== Dynamic Parameter Manager ==========

class DynamicParameterManager: def init(self): self.market_params = { “Bull”: {“stop_loss”: 0.02, “take_profit”: 0.05}, “Bear”: {“stop_loss”: 0.02, “take_profit”: 0.05}, “Eagle”: {“stop_loss”: 0.015, “take_profit”: 0.06}, “Wolf”: {“stop_loss”: 0.025, “take_profit”: 0.04}, “Momentum”: {“stop_loss”: 0.025, “take_profit”: 0.06}, “Sideways”: {“stop_loss”: 0.01, “take_profit”: 0.02}, “Volatile”: {“stop_loss”: 0.03, “take_profit”: 0.07}, “Unknown”: {“stop_loss”: 0.02, “take_profit”: 0.03} }

def get_params(self, market_type):
    return self.market_params.get(market_type, self.market_params["Unknown"])

========== Main Strategy Class ==========

class PredictiveNeuralTradingStrategy: def init(self): self.data_buffer = deque(maxlen=200) self.feature_buffer = deque(maxlen=100) self.label_buffer = deque(maxlen=100) # Store return rate labels self.scoring_system = SmartScoringSystem() self.param_manager = DynamicParameterManager()

    # Training control
    self.last_retrain_time = 0
    self.retrain_interval = 3600 * 6  # Retrain every 6 hours
    self.min_train_samples = 30

    # Trading state
    self.POSITION_NONE = 0
    self.POSITION_LONG = 1
    self.POSITION_SHORT = 2
    self.position_state = self.POSITION_NONE

    # Trading records
    self.open_price = 0
    self.counter = {'win': 0, 'loss': 0}

    # K-line data management
    self.last_processed_time = 0

def get_current_position(self):
    """Get current futures position status"""
    try:
        positions = exchange.GetPosition()
        if not positions:
            return self.POSITION_NONE, 0

        long_amount = 0
        short_amount = 0

        for pos in positions:
            amount = pos.get('Amount', 0)
            pos_type = pos.get('Type', -1)

            if amount > 0:
                if pos_type == 0:  # Long position
                    long_amount += amount
                elif pos_type == 1:  # Short position
                    short_amount += amount

        net_position = long_amount - short_amount

        if net_position > 0:
            return self.POSITION_LONG, net_position
        elif net_position < 0:
            return self.POSITION_SHORT, abs(net_position)
        else:
            return self.POSITION_NONE, 0

    except Exception as e:
        Log(f"Get position error: {str(e)}")
        return self.POSITION_NONE, 0

def collect_data(self, records):
    """Collect data and generate training samples"""
    if not records or len(records) < 55:
        return False

    # Check if there's a new completed K-line
    if len(records) > 1:
        latest_completed = records[-2]
        current_time = latest_completed['Time']

        # Skip if this K-line has already been processed
        if current_time <= self.last_processed_time:
            return False

        self.last_processed_time = current_time

    # Add completed K-lines to buffer
    completed_records = records[:-1] if len(records) > 1 else []
    if completed_records:
        self.data_buffer.extend(completed_records[-5:])

    # 🔥 Generate training samples: X[t] -> y[t+1]
    if len(self.data_buffer) >= 2:
        # Use the second-to-last record as features, last record to calculate return rate label
        buffer_list = list(self.data_buffer)

        # Calculate t-1 moment indicators as features
        feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list
        indicators, features = TechnicalIndicators.calculate_indicators(
            feature_records, use_completed_only=False)

        if indicators is not None and features is not None:
            # Calculate t moment return rate relative to t-1 moment as label
            if len(buffer_list) >= 2:
                current_close = buffer_list[-1]['Close']
                previous_close = buffer_list[-2]['Close']

                if previous_close > 0:
                    return_rate = (current_close - previous_close) / previous_close

                    # Add to training set
                    self.feature_buffer.append(features[0])
                    self.label_buffer.append(return_rate)

                    Log(f"📈 New sample: Return rate={return_rate*100:.3f}%, Feature dimensions={features.shape}")

    return True

def should_retrain(self):
    """Determine if retraining is needed"""
    import time
    current_time = time.time()
    return (current_time - self.last_retrain_time > self.retrain_interval and 
            len(self.feature_buffer) >= self.min_train_samples)

def train_model(self):
    """Train return rate predictor"""
    if len(self.feature_buffer) < self.min_train_samples:
        Log("Insufficient training data, skipping training")
        return False

    X = list(self.feature_buffer)
    y = list(self.label_buffer)

    success = self.scoring_system.train_return_predictor(X, y)

    if success:
        import time
        self.last_retrain_time = time.time()

    return success

def get_trading_signals(self, records):
    """Get trading signals"""
    # Calculate current moment technical indicators
    indicators, features = TechnicalIndicators.calculate_indicators(
        list(self.data_buffer), use_completed_only=False)
    if indicators is None:
        return 50.0, "Unknown"

    # Detect market type
    market_type = MarketStateDetector.detect_market_type(indicators)

    # 🔥 Calculate score using dynamic weights from predicted return rate
    score = self.scoring_system.calculate_score(indicators, market_type, features)

    return score, market_type

def check_entry_conditions(self, score, market_type):
    """Check entry conditions"""
    # Long conditions
    long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65)

    # Short conditions  
    short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35)

    return long_condition, short_condition

def open_long(self):
    """Open long position"""
    try:
        ticker = exchange.GetTicker()
        if not ticker:
            return False

        buy_price = ticker['Last'] + 20
        order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP)

        if order_id:
            Sleep(2000)
            order_info = exchange.GetOrder(order_id)
            if order_info and order_info.get('Status') == 1:
                self.open_price = order_info.get('AvgPrice', buy_price)
                self.position_state = self.POSITION_LONG
                Log(f"🚀 Long position opened successfully: Price={self.open_price}, Amount={AmountOP}")
                return True
            else:
                exchange.CancelOrder(order_id)
                Log("Long position order not fully filled, cancelled")

        return False

    except Exception as e:
        Log(f"Open long position error: {str(e)}")
        return False

def open_short(self):
    """Open short position"""
    try:
        ticker = exchange.GetTicker()
        if not ticker:
            return False

        sell_price = ticker['Last'] - 20
        order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP)

        if order_id:
            Sleep(2000)
            order_info = exchange.GetOrder(order_id)
            if order_info and order_info.get('Status') == 1:
                self.open_price = order_info.get('AvgPrice', sell_price)
                self.position_state = self.POSITION_SHORT
                Log(f"🎯 Short position opened successfully: Price={self.open_price}, Amount={AmountOP}")
                return True
            else:
                exchange.CancelOrder(order_id)
                Log("Short position order not fully filled, cancelled")

        return False

    except Exception as e:
        Log(f"Open short position error: {str(e)}")
        return False

def close_position(self):
    """Close position"""
    try:
        positions = exchange.GetPosition()
        if not positions:
            Log("No position to close")
            self.position_state = self.POSITION_NONE
            self.open_price = 0
            return True

        ticker = exchange.GetTicker()
        if not ticker:
            return False

        close_success = True

        for pos in positions:
            if pos['Amount'] == 0:
                continue

            amount = pos['Amount']
            pos_type = pos['Type']

            if pos_type == 0:  # Close long position
                close_price = ticker['Last'] - 20
                order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                Log(f"📤 Close long position: Price={close_price}, Amount={amount}")

            elif pos_type == 1:  # Close short position
                close_price = ticker['Last'] + 20
                order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                Log(f"📤 Close short position: Price={close_price}, Amount={amount}")

            if order_id:
                Sleep(2000)
                order_info = exchange.GetOrder(order_id)
                if order_info and order_info.get('Status') == 1:
                    close_price = order_info.get('AvgPrice', close_price)
                    Log(f"✅ Position closed successfully: Fill price={close_price}")
                    self.update_profit_stats(close_price)
                else:
                    exchange.CancelOrder(order_id)
                    close_success = False
                    Log(f"Close position order not fully filled, cancelled")
            else:
                close_success = False
                Log("Failed to create close position order")

        if close_success:
            self.position_state = self.POSITION_NONE
            self.open_price = 0

        return close_success

    except Exception as e:
        Log(f"Close position error: {str(e)}")
        return False

def update_profit_stats(self, close_price):
    """Update profit/loss statistics"""
    if self.open_price == 0:
        return

    if self.position_state == self.POSITION_LONG:
        if close_price > self.open_price:
            self.counter['win'] += 1
            Log("💰 Long position profitable")
        else:
            self.counter['loss'] += 1
            Log("💸 Long position loss")
    elif self.position_state == self.POSITION_SHORT:
        if close_price < self.open_price:
            self.counter['win'] += 1
            Log("💰 Short position profitable")
        else:
            self.counter['loss'] += 1
            Log("💸 Short position loss")

def check_stop_loss_take_profit(self, current_price, params):
    """Check stop loss and take profit, execute closing"""
    if self.open_price == 0 or self.position_state == self.POSITION_NONE:
        return False

    stop_loss_pct = params["stop_loss"]
    take_profit_pct = params["take_profit"]

    if self.position_state == self.POSITION_LONG:
        profit_pct = (current_price - self.open_price) / self.open_price

        if profit_pct <= -stop_loss_pct:
            Log(f"🔴 Long position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
            return self.execute_close_position("Stop Loss")
        elif profit_pct >= take_profit_pct:
            Log(f"🟢 Long position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
            return self.execute_close_position("Take Profit")

    elif self.position_state == self.POSITION_SHORT:
        profit_pct = (self.open_price - current_price) / self.open_price

        if profit_pct <= -stop_loss_pct:
            Log(f"🔴 Short position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
            return self.execute_close_position("Stop Loss")
        elif profit_pct >= take_profit_pct:
            Log(f"🟢 Short position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
            return self.execute_close_position("Take Profit")

    return False

def execute_close_position(self, reason):
    """Execute close position operation (specifically for stop loss/take profit)"""
    try:
        positions = exchange.GetPosition()
        if not positions:
            Log(f"{reason} close position: No position")
            self.position_state = self.POSITION_NONE
            self.open_price = 0
            return True

        ticker = exchange.GetTicker()
        if not ticker:
            Log(f"{reason} close position failed: Cannot get ticker")
            return False

        Log(f"🚨 Executing {reason} close position operation...")
        close_success = True

        for pos in positions:
            if pos['Amount'] == 0:
                continue

            amount = pos['Amount']
            pos_type = pos['Type']
            order_id = None

            if pos_type == 0:  # Close long position
                close_price = ticker['Last'] - 50
                order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                Log(f"📤 {reason} close long position order: Price={close_price}, Amount={amount}")

            elif pos_type == 1:  # Close short position
                close_price = ticker['Last'] + 50
                order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                Log(f"📤 {reason} close short position order: Price={close_price}, Amount={amount}")

            if order_id:
                Log(f"📋 {reason} close position order ID: {order_id}")
                Sleep(1500)

                for retry in range(2):
                    order_info = exchange.GetOrder(order_id)
                    if order_info:
                        status = order_info.get('Status', -1)
                        if status == 1:
                            close_price = order_info.get('AvgPrice', close_price)
                            Log(f"✅ {reason} close position successful: Fill price={close_price}")
                            self.update_profit_stats(close_price)
                            break
                        elif status == 0:
                            if retry == 0:
                                Log(f"⏳ {reason} close position order executing, waiting...")
                                Sleep(1500)
                            else:
                                Log(f"⚠️ {reason} close position order not fully filled, force cancel")
                                exchange.CancelOrder(order_id)
                                close_success = False
                        else:
                            Log(f"❌ {reason} close position order status abnormal: {status}")
                            exchange.CancelOrder(order_id)
                            close_success = False
                            break
                    else:
                        Log(f"⚠️ Cannot get {reason} close position order info, retry {retry+1}/2")
                        if retry == 1:
                            close_success = False
            else:
                Log(f"❌ {reason} close position order creation failed")
                close_success = False

        if close_success:
            Sleep(1000)
            new_positions = exchange.GetPosition()
            total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0

            if total_amount == 0:
                Log(f"✅ {reason} close position completed, position cleared")
                self.position_state = self.POSITION_NONE
                self.open_price = 0
                return True
            else:
                Log(f"⚠️ {reason} close position incomplete, remaining position: {total_amount}")
                return False
        else:
            Log(f"❌ {reason} close position failed")
            return False

    except Exception as e:
        Log(f"❌ {reason} close position error: {str(e)}")
        return False

def execute_trade_logic(self, score, market_type, current_price):
    """Execute trading logic"""
    params = self.param_manager.get_params(market_type)

    # Get current actual position status
    actual_position, position_amount = self.get_current_position()

    # Synchronize internal state
    self.position_state = actual_position

    # First check stop loss/take profit (highest priority)
    if self.position_state != self.POSITION_NONE:
        if self.check_stop_loss_take_profit(current_price, params):
            Log("🚨 Stop loss/take profit triggered, executed close position, skipping other trading signals")
            return

    # Get entry conditions
    long_condition, short_condition = self.check_entry_conditions(score, market_type)

    # Execute trading logic
    if long_condition and self.position_state <= self.POSITION_NONE:
        Log(f"📈 Long position signal: Market={market_type}, Prediction score={score:.1f} > 65")
        self.open_long()

    if short_condition and self.position_state >= self.POSITION_NONE:
        Log(f"📉 Short position signal: Market={market_type}, Prediction score={score:.1f} < 35")
        self.open_short()

    if not long_condition and self.position_state > self.POSITION_NONE:
        Log(f"📤 Close long position signal: Market={market_type}, Prediction score={score:.1f}")
        self.close_position()

    if not short_condition and self.position_state < self.POSITION_NONE:
        Log(f"📤 Close short position signal: Market={market_type}, Prediction score={score:.1f}")
        self.close_position()

def CancelPendingOrders(): “”“Cancel all pending orders”“” while True: orders = exchange.GetOrders() if not orders: break for order in orders: exchange.CancelOrder(order[‘Id’]) Sleep(500)

def main(): global AmountOP, LoopInterval

# Check initial position
initial_positions = exchange.GetPosition()
if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions):
    raise Error_AtBeginHasPosition()

# Cancel all pending orders
CancelPendingOrders()

# Initialize strategy
strategy = PredictiveNeuralTradingStrategy()

Log("🔮 Predictive Neural Network Futures Trading Strategy Started")
LogProfitReset()

# Data warm-up period
Log("Entering data warm-up period...")
warmup_count = 0
warmup_target = 60

while warmup_count < warmup_target:
    records = exchange.GetRecords()
    if records and len(records) >= 55:
        if strategy.collect_data(records):
            warmup_count += 1
            if warmup_count % 10 == 0:
                Log(f"Warm-up progress: {warmup_count}/{warmup_target}")
    Sleep(5000)

Log("Data warm-up completed, starting initial return rate predictor training...")
strategy.train_model()

# Main trading loop
loop_count = 0
while True:
    loop_count += 1

    # Get K-line data
    records = exchange.GetRecords()
    if not records or len(records) < 55:
        Sleep(LoopInterval * 1000)
        continue

    # Data processing
    data_updated = strategy.collect_data(records)

    # Check if retraining is needed
    if strategy.should_retrain():
        Log("🔄 Retraining return rate predictor...")
        strategy.train_model()

    # Get trading signals
    score, market_type = strategy.get_trading_signals(records)

    # Get current real-time price
    ticker = exchange.GetTicker()
    if ticker:
        current_price = ticker['Last']
    else:
        current_price = records[-1]['Close']

    # Get current parameters
    params = strategy.param_manager.get_params(market_type)

    # Priority check for stop loss/take profit (using real-time price)
    if strategy.position_state != strategy.POSITION_NONE:
        if strategy.check_stop_loss_take_profit(current_price, params):
            Log("⚡ Stop loss/take profit triggered, executed close position")
            Sleep(LoopInterval * 1000)
            continue

    # Execute trading logic (only when there's new data)
    if data_updated:
        strategy.execute_trade_logic(score, market_type, current_price)

    # Status display
    pos_state_name = {
        strategy.POSITION_NONE: "No Position",
        strategy.POSITION_LONG: "Long", 
        strategy.POSITION_SHORT: "Short"
    }.get(strategy.position_state, "Unknown")

    data_status = "📊New Data" if data_updated else "⏸️Waiting"
    model_status = "🔮Prediction" if strategy.scoring_system.is_model_trained else "📊Basic"

    # Get entry conditions for display
    long_cond, short_cond = strategy.check_entry_conditions(score, market_type)
    signal_status = ""
    if long_cond:
        signal_status 
相关推荐