avatar of ianzeng123 ianzeng123
关注 私信
2
关注
319
关注者

从固定权重到神经网络:一个Pine策略的机器学习改造实践

创建于: 2025-08-08 17:29:51, 更新于: 2025-08-11 11:58:38
comments   2
hits   533

从固定权重到神经网络:一个Pine策略的机器学习改造实践

偶然发现一个有趣的Pine策略

前几天在发明者论坛翻阅策略,看到一个叫 Panel Pro+ Quantum SmartPrompt 的策略。看了一遍代码,发现这个策略的思路挺有意思:用10个技术指标,根据市场状态给每个指标分配不同的权重,最后算出一个评分来决定买卖。比如在牛市状态下,趋势指标权重是2.0,RSI权重是1.5;在熊市状态下,权重又不一样。感觉像是在模仿人的思考方式:不同情况下,关注的重点不同。

仔细一想,这个结构挺像神经网络的: - 10个技术指标当输入 - 市场状态分类像隐藏层 - 权重矩阵就是连接权重 - 最后输出一个评分

但问题是所有权重都是写死的,比如:

if marketType == "Bull"
    array.set(weights, 0, 2.0) // 趋势权重固定是2.0
    array.set(weights, 1, 1.5) // RSI权重固定是1.5

这些数字完全是作者根据市场经验固定出来的,没有经过任何学习或优化。

想法:让权重能够学习

既然结构已经很像神经网络了,为什么不让它真的能学习呢?

我的想法很简单: 1. 保留原来的权重计算方式,得到一个”权重评分” 2. 用这个权重评分作为输入,训练一个小的神经网络 3. 让网络学习从权重评分预测未来收益率 4. 根据预测的收益率大小来决定要不要开仓

这样既保留了原策略的思路,又增加了学习能力。

在发明者平台动手实现

选择发明者平台主要因为它支持Python,而且包含丰富的数据。

第一步:重写技术指标

把Pine脚本里的所有指标用Python重新写了一遍,用talib库保证计算准确。包括EMA、MACD、RSI、ATR这些常用指标,还有成交量分析和简单的K线形态识别。

第二步:市场状态检测

照着原策略的逻辑,根据各种指标的组合判断市场类型:Bull、Bear、Eagle、Wolf等等。这部分基本就是if-else逻辑的组合。

第三步:权重评分计算

这是核心部分。我设了两套权重: - 基础权重:[2.0, 1.5, 2.0, 1.3, 1.2, …] - 市场权重:根据不同市场状态调整

最终权重 = 基础权重 × 市场权重

然后用这个权重对10个指标的原始得分加权求和,得到”权重评分”。

第四步:神经网络预测器

写了个很简单的网络: - 输入:1个特征(权重评分) - 隐藏层:16个神经元,ReLU激活 - 输出:预测收益率,用tanh限制在±5%

训练目标:用t-1时刻的权重评分,预测t时刻的价格变化。

第五步:交易逻辑

不再根据评分高低直接买卖,而是看预测的收益率: - 预测收益率 > 1.5%:开多或平空开多 - 预测收益率 < -1.5%:开空或平多开空 - 其他情况:保持现状

同时保留止盈止损,确保风险可控。

实际运行的一些观察

数据收集

策略能正常收集训练数据。每次有新K线时,会用前一根K线的权重评分作为特征,当前K线相对前一根的涨跌幅作为标签。

数据大概是这样的:

权重评分=15.6, 收益率=+0.8%
权重评分=-8.2, 收益率=-1.2%
权重评分=22.1, 收益率=+0.3%

模型训练

神经网络能正常训练,MSE损失会逐步下降。设置每4小时重新训练一次,保证模型能适应市场变化。

预测效果

模型的预测和实际收益率确实有一定相关性,但不是特别强。主要问题: 1. 单个特征太简单,可能信息不够 2. 短期价格变动随机性很大 3. 合约市场噪声比较多

交易表现

由于有止盈止损保护,单笔交易的风险控制得不错。但整体盈利能力一般,主要还是预测准确性不够高。 从固定权重到神经网络:一个Pine策略的机器学习改造实践

遇到的一些问题

特征太单一:只用权重评分一个特征,确实有点简单。市场这么复杂,一个数字很难完全概括。

样本质量不稳定:合约价格短期波动很大,很多时候涨跌其实是随机的,这让训练样本的质量不太稳定。

过拟合风险:虽然网络很简单,但样本量有限时还是可能过拟合。

实时性要求:在线学习需要平衡训练时间和实时性。

时间有限,优化不够

这个策略还有很多可以改进的地方,但时间和精力有限,没法深入优化:

特征方面:可以加更多技术指标,或者用价格序列的统计特征。

模型方面:可以试试LSTM这种序列模型,或者用多个模型集成。

数据方面:改进样本质量,增加数据清洗。

风控方面:完善动态止损,优化仓位管理。

收获和感想

这次探索让我明白了一个道理:好的灵感重点在于及时实现!当时看到Pine脚本的权重矩阵设计,马上就想到了用神经网络改进的可能性。如果当时只是想想而已,或者拖着不动手,这个想法很可能就被遗忘了。恰好发明者平台提供了Python环境和数据接口,让我能够快速把想法变成可运行的代码。从产生想法到完成基本实现,前后也就花了一天时间。虽然最终的策略效果一般,但通过实际运行,至少验证了这个思路是可行的。更重要的是,在实现过程中又产生了新的想法和改进思路。如果没有及时动手,这些后续的发现和思考都不会有。纸上谈兵永远比不上真刀真枪地写代码、跑数据、看结果。量化交易就是这样,想法很多,但真正有价值的是那些被快速实现和验证的想法。

'''backtest
start: 2025-07-31 00:00:00
end: 2025-08-07 00:00:00
period: 1h
basePeriod: 5m
exchanges: [{"eid":"Futures_Binance","currency":"ETH_USDT","balance":5000000,"fee":[0.01,0.01]}]
'''

import numpy as np
from collections import deque
import talib as TA

# ========== 异常类 ==========
class Error_noSupport(BaseException):
    def __init__(self):
        Log("只支持期货交易!#FF0000")

class Error_AtBeginHasPosition(BaseException):
    def __init__(self):
        Log("启动时有期货持仓! #FF0000")

# ========== 收益率预测神经网络 ==========
class ReturnPredictor:
    def __init__(self, input_size=10, hidden_size=20, output_size=1):
        """收益率预测网络: X[t] -> y[t+1] (收益率)"""
        self.W1 = np.random.randn(input_size, hidden_size) * 0.1
        self.b1 = np.zeros((1, hidden_size))
        self.W2 = np.random.randn(hidden_size, output_size) * 0.1
        self.b2 = np.zeros((1, output_size))
        self.learning_rate = 0.001
        
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
    
    def tanh(self, x):
        return np.tanh(x)
        
    def forward(self, X):
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = self.sigmoid(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        # 输出预测收益率,使用tanh限制在合理范围
        self.a2 = self.tanh(self.z2) * 0.1  # 限制在±10%范围内
        return self.a2
    
    def backward(self, X, y, output):
        m = X.shape[0]
        
        # MSE损失的梯度
        dZ2 = (output - y) / m
        # tanh的导数
        tanh_derivative = 1 - (output / 0.1) ** 2
        dZ2 = dZ2 * 0.1 * tanh_derivative
        
        dW2 = np.dot(self.a1.T, dZ2)
        db2 = np.sum(dZ2, axis=0, keepdims=True)
        
        dA1 = np.dot(dZ2, self.W2.T)
        dZ1 = dA1 * self.a1 * (1 - self.a1)  # sigmoid导数
        dW1 = np.dot(X.T, dZ1)
        db1 = np.sum(dZ1, axis=0, keepdims=True)
        
        # 更新权重
        self.W2 -= self.learning_rate * dW2
        self.b2 -= self.learning_rate * db2
        self.W1 -= self.learning_rate * dW1
        self.b1 -= self.learning_rate * db1
    
    def train(self, X, y, epochs=100):
        for i in range(epochs):
            output = self.forward(X)
            self.backward(X, y, output)
            if i % 20 == 0:
                loss = np.mean((output - y) ** 2)
                Log(f"收益率预测训练轮次 {i}, MSE损失: {loss:.6f}")
    
    def predict(self, X):
        return self.forward(X)

# ========== 技术指标计算类 ==========
class TechnicalIndicators:
    @staticmethod
    def calculate_indicators(records, use_completed_only=True):
        """计算技术指标和特征"""
        if len(records) < 55:
            return None, None
        
        # 只使用已完成的K线数据
        if use_completed_only and len(records) > 1:
            working_records = records[:-1]
        else:
            working_records = records
            
        if len(working_records) < 55:
            return None, None
            
        closes = np.array([r['Close'] for r in working_records])
        highs = np.array([r['High'] for r in working_records])
        lows = np.array([r['Low'] for r in working_records])
        volumes = np.array([r['Volume'] for r in working_records])
        opens = np.array([r['Open'] for r in working_records])
        
        try:
            # 基础指标
            ema_55 = TA.EMA(closes, timeperiod=55)
            sma_vol20 = TA.SMA(volumes, timeperiod=20)
            macd, signal_line, _ = TA.MACD(closes)
            rsi_val = TA.RSI(closes, timeperiod=14)
            atr14 = TA.ATR(highs, lows, closes, timeperiod=14)
            range20 = TA.STDDEV(closes, timeperiod=20)
            
            # 计算派生指标
            sma_atr20 = TA.SMA(atr14, timeperiod=20)
            sma_range20 = TA.SMA(range20, timeperiod=20)
            rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes)
            delta = closes - opens
            
            # 计算量能阈值
            vol_abs_thresh = sma_vol20 * 1.2
            sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1]
            
            # 趋势
            trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0))
            
            # 简化K线形态
            body_size = np.abs(closes - opens)
            total_range = highs - lows
            
            # 锤子线
            is_hammer = ((total_range > 3 * body_size) & 
                        ((closes - lows) / (total_range + 0.001) > 0.6) & 
                        ((opens - lows) / (total_range + 0.001) > 0.6))
            
            # 吞噬形态
            is_engulfing = np.zeros_like(closes, dtype=bool)
            if len(closes) >= 2:
                is_engulfing[1:] = ((closes[1:] > opens[:-1]) & 
                                   (opens[1:] < closes[:-1]) & 
                                   (closes[1:] > opens[1:]) & 
                                   (opens[1:] < closes[1:]))
            
            pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0))
            
            # 🔥 计算标准化特征向量(用于神经网络输入)
            features = []
            
            # 1. 趋势特征
            if len(ema_55) > 0 and not np.isnan(ema_55[-1]):
                trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1]
                features.append(np.tanh(trend_feature * 100))
            else:
                features.append(0)
            
            # 2. RSI特征
            if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]):
                rsi_feature = (rsi_val[-1] - 50) / 50
                features.append(rsi_feature)
            else:
                features.append(0)
            
            # 3. MACD特征
            if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]):
                macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0
                features.append(np.tanh(macd_feature * 1000))
            else:
                features.append(0)
            
            # 4. 成交量特征
            if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0:
                vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1
                features.append(np.tanh(vol_feature))
            else:
                features.append(0)
            
            # 5. 相对成交量特征
            if len(rvol) > 0 and not np.isnan(rvol[-1]):
                rvol_feature = rvol[-1] - 1
                features.append(np.tanh(rvol_feature))
            else:
                features.append(0)
            
            # 6. Delta特征
            if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0:
                delta_feature = delta[-1] / closes[-1]
                features.append(np.tanh(delta_feature * 100))
            else:
                features.append(0)
            
            # 7. ATR特征
            if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0:
                atr_feature = atr14[-1] / sma_atr20[-1] - 1
                features.append(np.tanh(atr_feature))
            else:
                features.append(0)
            
            # 8. Blocks特征
            if len(volumes) >= 10:
                highest_vol = np.max(volumes[-10:])
                blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0
                features.append(np.tanh(blocks_feature * 5))
            else:
                features.append(0)
            
            # 9. Tick特征
            if len(sma_vol20) > 0 and sma_vol20[-1] > 0:
                tick_feature = volumes[-1] / sma_vol20[-1] - 1
                features.append(np.tanh(tick_feature))
            else:
                features.append(0)
            
            # 10. 形态特征
            pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0
            features.append(pattern_feature)
            
            # 确保特征数量正确
            while len(features) < 10:
                features.append(0)
            
            features = np.array(features[:10]).reshape(1, -1)
            
            indicators = {
                'ema_55': ema_55,
                'sma_vol20': sma_vol20,
                'macd': macd,
                'signal_line': signal_line,
                'rsi_val': rsi_val,
                'atr14': atr14,
                'range20': range20,
                'sma_atr20': sma_atr20,
                'sma_range20': sma_range20,
                'rvol': rvol,
                'delta': delta,
                'vol_abs_thresh': vol_abs_thresh,
                'sniper_thresh': sniper_thresh,
                'trend': trend,
                'pattern': pattern,
                'volumes': volumes,
                'closes': closes,
                'highs': highs,
                'lows': lows
            }
            
            return indicators, features
            
        except Exception as e:
            Log(f"计算技术指标异常: {str(e)}")
            return None, None

# ========== 市场状态检测类 ==========
class MarketStateDetector:
    @staticmethod
    def detect_market_type(indicators):
        """检测市场状态"""
        if indicators is None:
            return "Unknown"
        
        try:
            # 获取最新值
            close = indicators['closes'][-1]
            ema_55 = indicators['ema_55'][-1]
            macd = indicators['macd'][-1]
            signal_line = indicators['signal_line'][-1]
            rsi_val = indicators['rsi_val'][-1]
            atr14 = indicators['atr14'][-1]
            volume = indicators['volumes'][-1]
            sma_vol20 = indicators['sma_vol20'][-1]
            sma_atr20 = indicators['sma_atr20'][-1]
            range20 = indicators['range20'][-1]
            sma_range20 = indicators['sma_range20'][-1]
            rvol = indicators['rvol'][-1]
            delta = indicators['delta'][-1]
            
            # 检查有效性
            if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or 
                np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)):
                return "Unknown"
            
            # 市场类型判断
            is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1)
            is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20)
            is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20)
            is_volatile = (atr14 > sma_atr20 * 1.2)
            
            # 需要历史数据的判断
            if len(indicators['closes']) >= 2:
                price_change = indicators['closes'][-1] - indicators['closes'][-2]
                is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5)
                is_wolf = (price_change < -atr14 and close < ema_55)
            else:
                is_momentum = False
                is_wolf = False
            
            is_mean_rev = (rsi_val > 70 or rsi_val < 30)
            is_box = (is_sideways and range20 < sma_range20 * 0.8)
            is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False
            is_eagle = (is_bull and atr14 < sma_atr20 * 0.8)
            
            # 优先级判断
            if is_eagle:
                return "Eagle"
            elif is_bull:
                return "Bull"
            elif is_wolf:
                return "Wolf"
            elif is_bear:
                return "Bear"
            elif is_box:
                return "Box"
            elif is_sideways:
                return "Sideways"
            elif is_volatile:
                return "Volatile"
            elif is_momentum:
                return "Momentum"
            elif is_mean_rev:
                return "MeanRev"
            elif is_macro:
                return "Macro"
            else:
                return "Unknown"
                
        except Exception as e:
            Log(f"市场状态检测异常: {str(e)}")
            return "Unknown"

# ========== 动态权重生成器 ==========
class DynamicWeightGenerator:
    @staticmethod
    def generate_weights_from_predicted_return(predicted_return, market_type):
        """根据预测收益率和市场状态生成动态权重"""
        
        # 基础权重矩阵(不同市场类型)
        base_weights_matrix = {
            "Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0],
            "Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0],
            "Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1],
            "Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9],
            "Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0],
            "Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3],
            "Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0],
        }
        
        base_weights = base_weights_matrix.get(market_type, [1.0] * 10)
        
        # 🔥 根据预测收益率动态调整权重
        adjustment_factors = [1.0] * 10
        
        # 预测收益率的强度
        return_strength = abs(predicted_return)
        return_direction = 1 if predicted_return > 0 else -1
        
        if return_strength > 0.02:  # 强预测信号 > 2%
            if return_direction > 0:  # 预测上涨
                adjustment_factors[0] *= 1.3  # 增强趋势权重
                adjustment_factors[2] *= 1.2  # 增强MACD权重
                adjustment_factors[4] *= 1.15 # 增强相对成交量权重
                adjustment_factors[1] *= 0.9  # 降低RSI权重
            else:  # 预测下跌
                adjustment_factors[1] *= 1.3  # 增强RSI权重
                adjustment_factors[3] *= 1.2  # 增强成交量权重
                adjustment_factors[0] *= 0.9  # 降低趋势权重
                
        elif return_strength > 0.01:  # 中等预测信号 1%-2%
            if return_direction > 0:
                adjustment_factors[0] *= 1.15
                adjustment_factors[2] *= 1.1
            else:
                adjustment_factors[1] *= 1.15
                adjustment_factors[3] *= 1.1
        
        # 波动性调整
        if return_strength > 0.03:  # 高波动预期 > 3%
            adjustment_factors[4] *= 1.2  # 增强相对成交量权重
            adjustment_factors[6] *= 1.15 # 增强sniper权重
            adjustment_factors[7] *= 1.1  # 增强blocks权重
        
        # 生成最终动态权重
        dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)]
        
        # 权重标准化(可选)
        # total_weight = sum(dynamic_weights)
        # dynamic_weights = [w / total_weight * 10 for w in dynamic_weights]
        
        return dynamic_weights

# ========== 智能得分计算系统 ==========
class SmartScoringSystem:
    def __init__(self):
        self.return_predictor = ReturnPredictor()
        self.weight_generator = DynamicWeightGenerator()
        self.is_model_trained = False
        
    def calculate_score(self, indicators, market_type, features=None):
        """计算交易得分(使用预测收益率的动态权重)"""
        if indicators is None:
            return 50.0
        
        try:
            # 🔥 核心逻辑:使用当前指标预测下期收益率
            if self.is_model_trained and features is not None:
                predicted_return = self.return_predictor.predict(features)[0, 0]
                
            else:
                predicted_return = 0.0
                Log(f"📊 使用基础权重计算")
            
            # 根据预测收益率生成动态权重
            dynamic_weights = self.weight_generator.generate_weights_from_predicted_return(
                predicted_return, market_type)
            
            # 获取最新指标值
            trend = indicators['trend'][-1]
            rsi_val = indicators['rsi_val'][-1]
            macd = indicators['macd'][-1]
            signal_line = indicators['signal_line'][-1]
            volume = indicators['volumes'][-1]
            vol_abs_thresh = indicators['vol_abs_thresh'][-1]
            sma_vol20 = indicators['sma_vol20'][-1]
            rvol = indicators['rvol'][-1]
            delta = indicators['delta'][-1]
            sniper_thresh = indicators['sniper_thresh']
            pattern = indicators['pattern'][-1]
            
            # 计算各项得分
            base_score = 0.0
            
            # 1. 趋势得分
            trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0)
            base_score += trend_score * dynamic_weights[0]
            
            # 2. RSI得分
            rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0)
            base_score += rsi_score * dynamic_weights[1]
            
            # 3. MACD得分
            macd_score = 10 if macd > signal_line else -10
            base_score += macd_score * dynamic_weights[2]
            
            # 4. 成交量得分
            vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0)
            base_score += vol_score * dynamic_weights[3]
            
            # 5. 相对成交量得分
            rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0)
            base_score += rvol_score * dynamic_weights[4]
            
            # 6. Delta得分
            delta_score = 6 if delta > 0 else -6
            base_score += delta_score * dynamic_weights[5]
            
            # 7. Sniper得分
            sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0)
            base_score += sniper_score * dynamic_weights[6]
            
            # 8. Blocks得分
            if len(indicators['volumes']) >= 10:
                highest_vol = np.max(indicators['volumes'][-10:])
                blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0)
            else:
                blocks_score = 0
            base_score += blocks_score * dynamic_weights[7]
            
            # 9. Tick得分
            tick_score = 5 if volume > sma_vol20 else -5
            base_score += tick_score * dynamic_weights[8]
            
            # 10. 形态得分
            pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0)
            base_score += pattern_score * dynamic_weights[9]
            
            # 转换为百分比得分
            score_pct = max(0, min(100, 50 + base_score))
            
            return score_pct
            
        except Exception as e:
            Log(f"得分计算异常: {str(e)}")
            return 50.0
    
    def train_return_predictor(self, X, y):
        """训练收益率预测器"""
        if len(X) < 20:
            Log("训练数据不足,跳过收益率预测器训练")
            return False
            
        X_array = np.array(X)
        y_array = np.array(y).reshape(-1, 1)
        
        Log(f"🧠 开始训练收益率预测器,样本数: {len(X_array)}")
        Log(f"📊 收益率范围: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]")
        
        self.return_predictor.train(X_array, y_array, epochs=100)
        self.is_model_trained = True
        
        # 验证模型预测效果
        predictions = self.return_predictor.predict(X_array)
        mse = np.mean((predictions - y_array) ** 2)
        correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1]
        
        Log(f"✅ 收益率预测器训练完成")
        Log(f"📈 MSE: {mse:.6f}, 相关系数: {correlation:.4f}")
        
        return True

# ========== 动态参数管理器 ==========
class DynamicParameterManager:
    def __init__(self):
        self.market_params = {
            "Bull": {"stop_loss": 0.02, "take_profit": 0.05},
            "Bear": {"stop_loss": 0.02, "take_profit": 0.05},
            "Eagle": {"stop_loss": 0.015, "take_profit": 0.06},
            "Wolf": {"stop_loss": 0.025, "take_profit": 0.04},
            "Momentum": {"stop_loss": 0.025, "take_profit": 0.06},
            "Sideways": {"stop_loss": 0.01, "take_profit": 0.02},
            "Volatile": {"stop_loss": 0.03, "take_profit": 0.07},
            "Unknown": {"stop_loss": 0.02, "take_profit": 0.03}
        }
    
    def get_params(self, market_type):
        return self.market_params.get(market_type, self.market_params["Unknown"])

# ========== 主策略类 ==========
class PredictiveNeuralTradingStrategy:
    def __init__(self):
        self.data_buffer = deque(maxlen=200)
        self.feature_buffer = deque(maxlen=100)
        self.label_buffer = deque(maxlen=100)  # 存储收益率标签
        self.scoring_system = SmartScoringSystem()
        self.param_manager = DynamicParameterManager()
        
        # 训练控制
        self.last_retrain_time = 0
        self.retrain_interval = 3600 * 6  # 6小时重新训练
        self.min_train_samples = 30
        
        # 交易状态
        self.POSITION_NONE = 0
        self.POSITION_LONG = 1
        self.POSITION_SHORT = 2
        self.position_state = self.POSITION_NONE
        
        # 交易记录
        self.open_price = 0
        self.counter = {'win': 0, 'loss': 0}
        
        # K线数据管理
        self.last_processed_time = 0
        
    def get_current_position(self):
        """获取当前期货持仓状态"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                return self.POSITION_NONE, 0
            
            long_amount = 0
            short_amount = 0
            
            for pos in positions:
                amount = pos.get('Amount', 0)
                pos_type = pos.get('Type', -1)
                
                if amount > 0:
                    if pos_type == 0:  # 多仓
                        long_amount += amount
                    elif pos_type == 1:  # 空仓
                        short_amount += amount
            
            net_position = long_amount - short_amount
            
            if net_position > 0:
                return self.POSITION_LONG, net_position
            elif net_position < 0:
                return self.POSITION_SHORT, abs(net_position)
            else:
                return self.POSITION_NONE, 0
                
        except Exception as e:
            Log(f"获取持仓异常: {str(e)}")
            return self.POSITION_NONE, 0
    
    def collect_data(self, records):
        """收集数据并生成训练样本"""
        if not records or len(records) < 55:
            return False
        
        # 检查是否有新的已完成K线
        if len(records) > 1:
            latest_completed = records[-2]
            current_time = latest_completed['Time']
            
            # 如果这根K线已经处理过,跳过
            if current_time <= self.last_processed_time:
                return False
                
            self.last_processed_time = current_time
        
        # 添加已完成的K线到缓冲区
        completed_records = records[:-1] if len(records) > 1 else []
        if completed_records:
            self.data_buffer.extend(completed_records[-5:])
        
        # 🔥 生成训练样本:X[t] -> y[t+1]
        if len(self.data_buffer) >= 2:
            # 使用倒数第二条记录作为特征,最后一条记录计算收益率标签
            buffer_list = list(self.data_buffer)
            
            # 计算t-1时刻的指标作为特征
            feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list
            indicators, features = TechnicalIndicators.calculate_indicators(
                feature_records, use_completed_only=False)
            
            if indicators is not None and features is not None:
                # 计算t时刻相对于t-1时刻的收益率作为标签
                if len(buffer_list) >= 2:
                    current_close = buffer_list[-1]['Close']
                    previous_close = buffer_list[-2]['Close']
                    
                    if previous_close > 0:
                        return_rate = (current_close - previous_close) / previous_close
                        
                        # 添加到训练集
                        self.feature_buffer.append(features[0])
                        self.label_buffer.append(return_rate)
                        
                        Log(f"📈 新样本: 收益率={return_rate*100:.3f}%, 特征维度={features.shape}")
        
        return True
    
    def should_retrain(self):
        """判断是否需要重新训练"""
        import time
        current_time = time.time()
        return (current_time - self.last_retrain_time > self.retrain_interval and 
                len(self.feature_buffer) >= self.min_train_samples)
    
    def train_model(self):
        """训练收益率预测器"""
        if len(self.feature_buffer) < self.min_train_samples:
            Log("训练数据不足,跳过训练")
            return False
            
        X = list(self.feature_buffer)
        y = list(self.label_buffer)
        
        success = self.scoring_system.train_return_predictor(X, y)
        
        if success:
            import time
            self.last_retrain_time = time.time()
        
        return success
    
    def get_trading_signals(self, records):
        """获取交易信号"""
        # 计算当前时刻的技术指标
        indicators, features = TechnicalIndicators.calculate_indicators(
            list(self.data_buffer), use_completed_only=False)
        if indicators is None:
            return 50.0, "Unknown"
        
        # 检测市场类型
        market_type = MarketStateDetector.detect_market_type(indicators)
        
        # 🔥 使用预测收益率的动态权重计算得分
        score = self.scoring_system.calculate_score(indicators, market_type, features)
        
        return score, market_type
    
    def check_entry_conditions(self, score, market_type):
        """检查开仓条件"""
        # 多头条件
        long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65)
        
        # 空头条件  
        short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35)
        
        return long_condition, short_condition
    
    def open_long(self):
        """开多仓"""
        try:
            ticker = exchange.GetTicker()
            if not ticker:
                return False
            
            buy_price = ticker['Last'] + 20
            order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP)
            
            if order_id:
                Sleep(2000)
                order_info = exchange.GetOrder(order_id)
                if order_info and order_info.get('Status') == 1:
                    self.open_price = order_info.get('AvgPrice', buy_price)
                    self.position_state = self.POSITION_LONG
                    Log(f"🚀 开多仓成功: 价格={self.open_price}, 数量={AmountOP}")
                    return True
                else:
                    exchange.CancelOrder(order_id)
                    Log("开多仓订单未完全成交,已取消")
            
            return False
            
        except Exception as e:
            Log(f"开多仓异常: {str(e)}")
            return False
    
    def open_short(self):
        """开空仓"""
        try:
            ticker = exchange.GetTicker()
            if not ticker:
                return False
            
            sell_price = ticker['Last'] - 20
            order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP)
            
            if order_id:
                Sleep(2000)
                order_info = exchange.GetOrder(order_id)
                if order_info and order_info.get('Status') == 1:
                    self.open_price = order_info.get('AvgPrice', sell_price)
                    self.position_state = self.POSITION_SHORT
                    Log(f"🎯 开空仓成功: 价格={self.open_price}, 数量={AmountOP}")
                    return True
                else:
                    exchange.CancelOrder(order_id)
                    Log("开空仓订单未完全成交,已取消")
            
            return False
            
        except Exception as e:
            Log(f"开空仓异常: {str(e)}")
            return False
    
    def close_position(self):
        """平仓"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                Log("没有持仓需要平仓")
                self.position_state = self.POSITION_NONE
                self.open_price = 0
                return True
            
            ticker = exchange.GetTicker()
            if not ticker:
                return False
            
            close_success = True
            
            for pos in positions:
                if pos['Amount'] == 0:
                    continue
                    
                amount = pos['Amount']
                pos_type = pos['Type']
                
                if pos_type == 0:  # 平多仓
                    close_price = ticker['Last'] - 20
                    order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                    Log(f"📤 平多仓: 价格={close_price}, 数量={amount}")
                    
                elif pos_type == 1:  # 平空仓
                    close_price = ticker['Last'] + 20
                    order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                    Log(f"📤 平空仓: 价格={close_price}, 数量={amount}")
                
                if order_id:
                    Sleep(2000)
                    order_info = exchange.GetOrder(order_id)
                    if order_info and order_info.get('Status') == 1:
                        close_price = order_info.get('AvgPrice', close_price)
                        Log(f"✅ 平仓成功: 成交价格={close_price}")
                        self.update_profit_stats(close_price)
                    else:
                        exchange.CancelOrder(order_id)
                        close_success = False
                        Log(f"平仓订单未完全成交,已取消")
                else:
                    close_success = False
                    Log("平仓订单创建失败")
            
            if close_success:
                self.position_state = self.POSITION_NONE
                self.open_price = 0
            
            return close_success
            
        except Exception as e:
            Log(f"平仓异常: {str(e)}")
            return False
    
    def update_profit_stats(self, close_price):
        """更新盈亏统计"""
        if self.open_price == 0:
            return
            
        if self.position_state == self.POSITION_LONG:
            if close_price > self.open_price:
                self.counter['win'] += 1
                Log("💰 多仓盈利")
            else:
                self.counter['loss'] += 1
                Log("💸 多仓亏损")
        elif self.position_state == self.POSITION_SHORT:
            if close_price < self.open_price:
                self.counter['win'] += 1
                Log("💰 空仓盈利")
            else:
                self.counter['loss'] += 1
                Log("💸 空仓亏损")
    
    def check_stop_loss_take_profit(self, current_price, params):
        """检查止损止盈并执行平仓"""
        if self.open_price == 0 or self.position_state == self.POSITION_NONE:
            return False
            
        stop_loss_pct = params["stop_loss"]
        take_profit_pct = params["take_profit"]
        
        if self.position_state == self.POSITION_LONG:
            profit_pct = (current_price - self.open_price) / self.open_price
            
            if profit_pct <= -stop_loss_pct:
                Log(f"🔴 多仓止损触发: 开仓价={self.open_price:.2f}, 当前价={current_price:.2f}, 亏损={profit_pct:.4f}")
                return self.execute_close_position("止损")
            elif profit_pct >= take_profit_pct:
                Log(f"🟢 多仓止盈触发: 开仓价={self.open_price:.2f}, 当前价={current_price:.2f}, 盈利={profit_pct:.4f}")
                return self.execute_close_position("止盈")
                
        elif self.position_state == self.POSITION_SHORT:
            profit_pct = (self.open_price - current_price) / self.open_price
            
            if profit_pct <= -stop_loss_pct:
                Log(f"🔴 空仓止损触发: 开仓价={self.open_price:.2f}, 当前价={current_price:.2f}, 亏损={profit_pct:.4f}")
                return self.execute_close_position("止损")
            elif profit_pct >= take_profit_pct:
                Log(f"🟢 空仓止盈触发: 开仓价={self.open_price:.2f}, 当前价={current_price:.2f}, 盈利={profit_pct:.4f}")
                return self.execute_close_position("止盈")
        
        return False
    
    def execute_close_position(self, reason):
        """执行平仓操作(专门用于止盈止损)"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                Log(f"{reason}平仓: 没有持仓")
                self.position_state = self.POSITION_NONE
                self.open_price = 0
                return True
            
            ticker = exchange.GetTicker()
            if not ticker:
                Log(f"{reason}平仓失败: 无法获取ticker")
                return False
            
            Log(f"🚨 执行{reason}平仓操作...")
            close_success = True
            
            for pos in positions:
                if pos['Amount'] == 0:
                    continue
                    
                amount = pos['Amount']
                pos_type = pos['Type']
                order_id = None
                
                if pos_type == 0:  # 平多仓
                    close_price = ticker['Last'] - 50
                    order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                    Log(f"📤 {reason}平多仓订单: 价格={close_price}, 数量={amount}")
                    
                elif pos_type == 1:  # 平空仓
                    close_price = ticker['Last'] + 50
                    order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                    Log(f"📤 {reason}平空仓订单: 价格={close_price}, 数量={amount}")
                
                if order_id:
                    Log(f"📋 {reason}平仓订单ID: {order_id}")
                    Sleep(1500)
                    
                    for retry in range(2):
                        order_info = exchange.GetOrder(order_id)
                        if order_info:
                            status = order_info.get('Status', -1)
                            if status == 1:
                                close_price = order_info.get('AvgPrice', close_price)
                                Log(f"✅ {reason}平仓成功: 成交价格={close_price}")
                                self.update_profit_stats(close_price)
                                break
                            elif status == 0:
                                if retry == 0:
                                    Log(f"⏳ {reason}平仓订单执行中,等待...")
                                    Sleep(1500)
                                else:
                                    Log(f"⚠️ {reason}平仓订单未完全成交,强制取消")
                                    exchange.CancelOrder(order_id)
                                    close_success = False
                            else:
                                Log(f"❌ {reason}平仓订单状态异常: {status}")
                                exchange.CancelOrder(order_id)
                                close_success = False
                                break
                        else:
                            Log(f"⚠️ 无法获取{reason}平仓订单信息,重试 {retry+1}/2")
                            if retry == 1:
                                close_success = False
                else:
                    Log(f"❌ {reason}平仓订单创建失败")
                    close_success = False
            
            if close_success:
                Sleep(1000)
                new_positions = exchange.GetPosition()
                total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0
                
                if total_amount == 0:
                    Log(f"✅ {reason}平仓完成,持仓已清零")
                    self.position_state = self.POSITION_NONE
                    self.open_price = 0
                    return True
                else:
                    Log(f"⚠️ {reason}平仓不完全,剩余持仓: {total_amount}")
                    return False
            else:
                Log(f"❌ {reason}平仓失败")
                return False
            
        except Exception as e:
            Log(f"❌ {reason}平仓异常: {str(e)}")
            return False
    
    def execute_trade_logic(self, score, market_type, current_price):
        """执行交易逻辑"""
        params = self.param_manager.get_params(market_type)
        
        # 获取当前实际持仓状态
        actual_position, position_amount = self.get_current_position()
        
        # 同步内部状态
        self.position_state = actual_position
        
        # 先检查止损止盈(最高优先级)
        if self.position_state != self.POSITION_NONE:
            if self.check_stop_loss_take_profit(current_price, params):
                Log("🚨 触发止盈止损,已执行平仓,跳过其他交易信号")
                return
        
        # 获取开仓条件
        long_condition, short_condition = self.check_entry_conditions(score, market_type)
        
        # 执行交易逻辑
        if long_condition and self.position_state <= self.POSITION_NONE:
            Log(f"📈 开多仓信号: 市场={market_type}, 预测得分={score:.1f} > 65")
            self.open_long()
            
        if short_condition and self.position_state >= self.POSITION_NONE:
            Log(f"📉 开空仓信号: 市场={market_type}, 预测得分={score:.1f} < 35")
            self.open_short()
            
        if not long_condition and self.position_state > self.POSITION_NONE:
            Log(f"📤 平多仓信号: 市场={market_type}, 预测得分={score:.1f}")
            self.close_position()
            
        if not short_condition and self.position_state < self.POSITION_NONE:
            Log(f"📤 平空仓信号: 市场={market_type}, 预测得分={score:.1f}")
            self.close_position()

def CancelPendingOrders():
    """取消所有挂单"""
    while True:
        orders = exchange.GetOrders()
        if not orders:
            break
        for order in orders:
            exchange.CancelOrder(order['Id'])
            Sleep(500)

def main():
    global AmountOP, LoopInterval
    
    # 检查初始持仓
    initial_positions = exchange.GetPosition()
    if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions):
        raise Error_AtBeginHasPosition()
    
    # 取消所有挂单
    CancelPendingOrders()
    
    # 初始化策略
    strategy = PredictiveNeuralTradingStrategy()
    
    Log("🔮 预测型神经网络期货交易策略启动")
    LogProfitReset()
    
    # 数据预热期
    Log("进入数据预热期...")
    warmup_count = 0
    warmup_target = 60
    
    while warmup_count < warmup_target:
        records = exchange.GetRecords()
        if records and len(records) >= 55:
            if strategy.collect_data(records):
                warmup_count += 1
                if warmup_count % 10 == 0:
                    Log(f"预热进度: {warmup_count}/{warmup_target}")
        Sleep(5000)
    
    Log("数据预热完成,开始首次收益率预测器训练...")
    strategy.train_model()
    
    # 主交易循环
    loop_count = 0
    while True:
        loop_count += 1
        
        # 获取K线数据
        records = exchange.GetRecords()
        if not records or len(records) < 55:
            Sleep(LoopInterval * 1000)
            continue
        
        # 数据处理
        data_updated = strategy.collect_data(records)
        
        # 检查是否需要重新训练
        if strategy.should_retrain():
            Log("🔄 重新训练收益率预测器...")
            strategy.train_model()
        
        # 获取交易信号
        score, market_type = strategy.get_trading_signals(records)
        
        # 获取当前实时价格
        ticker = exchange.GetTicker()
        if ticker:
            current_price = ticker['Last']
        else:
            current_price = records[-1]['Close']
        
        # 获取当前参数
        params = strategy.param_manager.get_params(market_type)
        
        # 优先检查止损止盈(使用实时价格)
        if strategy.position_state != strategy.POSITION_NONE:
            if strategy.check_stop_loss_take_profit(current_price, params):
                Log("⚡ 触发止盈止损,已执行平仓")
                Sleep(LoopInterval * 1000)
                continue
        
        # 执行交易逻辑(只在有新数据时执行)
        if data_updated:
            strategy.execute_trade_logic(score, market_type, current_price)
        
        # 状态显示
        pos_state_name = {
            strategy.POSITION_NONE: "无仓",
            strategy.POSITION_LONG: "多仓", 
            strategy.POSITION_SHORT: "空仓"
        }.get(strategy.position_state, "未知")
        
        data_status = "📊新数据" if data_updated else "⏸️等待"
        model_status = "🔮预测" if strategy.scoring_system.is_model_trained else "📊基础"
        
        # 获取开仓条件用于显示
        long_cond, short_cond = strategy.check_entry_conditions(score, market_type)
        signal_status = ""
        if long_cond:
            signal_status = "📈多头"
        elif short_cond:
            signal_status = "📉空头"
        else:
            signal_status = "🔄观望"
        
        # 显示训练样本数量
        sample_count = len(strategy.feature_buffer)
        
        LogStatus(f"循环: {loop_count}, 价格: {current_price:.2f}, "
                  f"预测得分: {score:.1f}, 市场: {market_type}, "
                  f"持仓: {pos_state_name}, 信号: {signal_status}, "
                  f"状态: {data_status}, 模式: {model_status}, "
                  f"样本: {sample_count}, "
                  f"胜: {strategy.counter['win']}, 负: {strategy.counter['loss']}")
        
        Sleep(LoopInterval * 1000)

# ========== 参数设置 ==========
AmountOP = 1  # 期货合约数量
LoopInterval = 3  # 循环间隔(秒)

if __name__ == "__main__":
    main()
相关推荐
全部留言
avatar of SEA
SEA
大哥 你这正好是我最近研究的方向。我最近在弄混合模型 Transformer和LightGBM的协同, 实测 60个特征 但是实在说 特征偏移一直是最大的问题!
2025-08-09 03:22:09
avatar of ianzeng123
ianzeng123
兄弟,加油!有好东西发出来看看
2025-08-11 08:50:10