2
关注
43
关注者

用Python搭建AI量化交易系统(LightGBM+Transformer混合策略)

创建于: 2025-12-24 10:44:43, 更新于: 2025-12-30 10:02:31
comments   2
hits   328

简介

今天我将分享一个FMZ上完整的量化交易系统实现,它不是一个简单的回测脚本,而是一个的实时交易框架。这个系统已经在加密货币市场(ETH/USDT)上实际运行,具备以下特性: * 实时数据处理 - WebSocket流式数据
* 混合AI模型 - LightGBM + Transformer
* 自动运维 - 特征漂移检测 + 自动再训练
* 生产就绪 - 热切换 + 状态持久化 + 实时监控
* 完整工程化 - 配置管理 + 错误处理 + 性能优化

系统架构全景

  • 数据层:币安WebSocket → 实时K线合成 → 特征工程
  • 模型层:LightGBM(表格特征) + Transformer(序列特征) → 融合预测
  • 决策层:信号验证 → 风险管理 → 交易执行
  • 监控层:性能监控 + 漂移检测 + 自动再训练
## 第一部分:环境搭建

###  安装依赖
```bash
# 核心依赖
pip install websockets lightgbm torch scikit-learn 
pip install bayesian-optimization pandas numpy scipy

# 可选:钉钉通知
pip install requests pyyaml

## 代码结构

quant_trading/
├── config.yaml              # 配置文件
├── models_v4/              # 模型存储目录
├── strategy_state/         # 运行时状态保存
├── main.py                 # 主程序(2000+行完整代码)
├── requirements.txt        # 依赖列表
└── README.md              # 项目说明

核心代码

class Config:
    """智能配置管理系统"""
    def __init__(self, config_file="config.yaml"):
        # 默认配置
        self.defaults = {
            "trading": {
                "pair": "ETH_USDT",        # 交易对
                "train_bars": 1440,        # 训练数据量(24小时)
                "predict_horizon": 10,     # 预测未来几分钟
                "spread_threshold": 0.002  # 交易阈值
            },
            "transformer": {
                "enabled": True,           # 启用Transformer
                "seq_len": 30,             # 序列长度
                "d_model": 32,             # 特征维度
                "train_epochs": 10         # 训练轮数
            }
        }
        # 支持外部配置文件热重载
        self._load_external_config(config_file)

## 实时数据流处理

async def websocket_producer(uri, queue):
    """WebSocket数据生产者"""
    reconnect_delay = 5  # 智能重连机制
    while True:
        try:
            async with websockets.connect(uri, ping_interval=20) as ws:
                reconnect_delay = 5  # 重置延迟
                while True:
                    data = await ws.recv()
                    parsed = json.loads(data)
                    await queue.put(parsed)  # 放入异步队列
        except Exception as e:
            Log(f"连接断开: {e}, {reconnect_delay}秒后重连", "#ff0000")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(300, reconnect_delay * 2) 
async def kline_generator():
    """分钟K线合成器"""
    while True:
        # 精确等待下一分钟
        now = time.time()
        wait_seconds = 60.5 - (now % 60)  # 0.5秒缓冲
        await asyncio.sleep(wait_seconds)
        
        # 合成K线
        minute_ticks = get_last_minute_ticks()
        if minute_ticks:
            new_kline = {
                "ts": last_minute_start_ts,
                "open": minute_ticks[0]["price"],
                "high": max(t["price"] for t in minute_ticks),
                "low": min(t["price"] for t in minute_ticks),
                "close": minute_ticks[-1]["price"],
                "volume": sum(t["qty"] for t in minute_ticks)
            }
            FeatureStore.klines_1min.append(new_kline)
            
            # 自动清理旧数据
            twenty_four_hours_ago = time.time() * 1000 - 24*3600*1000
            FeatureStore.klines_1min = [
                k for k in FeatureStore.klines_1min 
                if k["ts"] > twenty_four_hours_ago
            ]

特征工程 - 58个技术指标实现

def calculate_tabular_features_and_labels_vectorized(klines, ticks, order_books, is_realtime=False):
    """计算58个技术指标(避免数据泄露版本)"""
    features, labels = [], []
    
    # 基础价格特征
    feature_dict["price_change_1m"] = (closes[-1] - closes[-2]) / closes[-2]
    feature_dict["price_change_5m"] = (closes[-1] - closes[-6]) / closes[-6]
    
    # 波动率特征(关键:不使用未来数据!)
    feature_dict["volatility_10m"] = np.std(closes[-11:-1])  # t-11到t-1
    feature_dict["volatility_30m"] = np.std(closes[-31:-1])  # t-31到t-1
    
    # 成交量特征
    feature_dict["volume_ratio_5m"] = volumes[-1] / np.mean(volumes[-5:-1])
    
    # 技术指标
    feature_dict["rsi_14"] = calculate_rsi(price_changes[-15:-1])  # 使用历史数据
    feature_dict["macd"], feature_dict["macd_hist"] = calculate_macd(closes[:-1])
    
    # 订单簿特征
    feature_dict["bid_ask_spread"] = ask_price - bid_price
    feature_dict["order_imbalance"] = (bid_volume - ask_volume) / (bid_volume + ask_volume)
    
    # 高级统计特征
    feature_dict["price_skewness_30"] = skew(closes[-32:-2])  # t-31到t-2
    feature_dict["price_kurtosis_30"] = kurtosis(closes[-32:-2])
    
    # 交互特征
    feature_dict["rsi_x_volatility"] = feature_dict["rsi_14"] * feature_dict["volatility_30m"]
    
    return features, labels

特征分类

价格特征 成交量特征 技术指标 订单簿特征 统计特征 交互特征

def update_feature_names_with_transformer():
    """更新特征名称列表以包含 Transformer 特征"""
    base_features = [
        "obv_change_rate", "vpt_zscore_20", "cmf_20", "price_to_vwap_ratio", "price_change_1m", "price_change_5m", 
        "price_change_15m", "volatility_10m", "volatility_30m", "volume_1m", "volume_5m", 
        "volume_change_5m", "rsi_14", "hour_of_day", "alpha_5m", "wobi_10s", "spread_10s", 
        "depth_imbalance_5", "trade_imbalance_10s", "macd", "macd_hist", "bollinger_width", 
        "return_rolling_mean_5", "return_rolling_std_5", "rsi_x_volatility_30m", 
        "trend_strength", "price_skewness_30", "price_kurtosis_30", "atr_14"
    ]
    
    if config.TRANSFORMER_ENABLED:
        transformer_features = [f"transformer_feat_{i}" for i in range(config.TRANSFORMER_D_MODEL)]
        ModelRegistry.feature_names = base_features + transformer_features
    else:
        ModelRegistry.feature_names = base_features
    
    Log(f"特征名称已更新: 共 {len(ModelRegistry.feature_names)} 个特征")

## 混合模型架构 - LightGBM + Transformer

# Transformer模型 - 处理序列数据
class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_dim=5, d_model=32, nhead=4):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, 2)
        self.classifier = nn.Linear(d_model, 3)  # 3类:上涨/下跌/盘整

# LightGBM模型 - 处理表格特征
def train_lightgbm_with_bayesian_optimization(X, y):
    """贝叶斯优化调参"""
    def lgbm_objective(num_leaves, max_depth, learning_rate):
        params = {
            'num_leaves': int(num_leaves),
            'max_depth': int(max_depth),
            'learning_rate': learning_rate,
            'objective': 'multiclass',
            'num_class': 3
        }
        
        # 时间序列交叉验证
        tscv = TimeSeriesSplit(n_splits=5)
        accuracies = []
        
        for train_idx, val_idx in tscv.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            model = lgb.LGBMClassifier(**params)
            model.fit(X_train, y_train)
            preds = model.predict(X_val)
            accuracies.append(accuracy_score(y_val, preds))
        
        return np.mean(accuracies)  # 贝叶斯优化最大化准确率
    
    # 运行贝叶斯优化
    optimizer = BayesianOptimization(
        f=lgbm_objective,
        pbounds={'num_leaves': (20, 200), 'max_depth': (5, 50), 'learning_rate': (0.01, 0.1)}
    )
    optimizer.maximize(init_points=5, n_iter=25)
    
    return optimizer.max['params']  # 返回最佳参数

模型融合策略

表格特征(58维) → LightGBM → 特征向量(32维)
序列特征(30×5维) → Transformer → 特征向量(32维)
                      ↓
                拼接(64维) → 全连接层 → 最终预测(3类)

环境特性 - 让系统稳定运行

特征漂移检测

def check_feature_drift(realtime_features):
    """检测数据分布变化"""
    drifts = []
    for i, name in enumerate(ModelRegistry.feature_names):
        train_mean = ModelRegistry.training_feature_dist[name]["mean"]
        train_std = ModelRegistry.training_feature_dist[name]["std"]
        
        # 计算Z-score漂移
        drift = abs(realtime_features[i] - train_mean) / (train_std + 1e-10)
        drifts.append(drift)
    
    avg_drift = np.mean(drifts)
    if avg_drift > config.FEATURE_DRIFT_THRESHOLD:
        Log(f" 特征漂移警报: {avg_drift:.4f}", "#ff0000")
        trigger_auto_retrain()  # 触发自动再训练 

模型热切换

def hot_switch_model():
    """无中断更新模型"""
    if ModelRegistry.next_lgbm_model:
        Log(f" 热切换模型: {ModelRegistry.current_model_version} → {ModelRegistry.next_model_version}")
        
        # 原子性切换
        ModelRegistry.lgbm_model = ModelRegistry.next_lgbm_model
        ModelRegistry.transformer_model = ModelRegistry.next_transformer_model
        ModelRegistry.scaler = ModelRegistry.next_scaler
        ModelRegistry.current_model_version = ModelRegistry.next_model_version
        
        # 清理临时变量
        ModelRegistry.next_lgbm_model = None
        ModelRegistry.next_model_version = None
        
        Log(" 模型热切换完成", "#00ff00")

状态持久化

class StatePersistence:
    @staticmethod
    def save_state():
        """保存所有运行时状态"""
        state_data = {
            "timestamp": time.time(),
            "klines_1min": FeatureStore.klines_1min[-1000:],  # 保存最近1000条
            "performance_log": RealtimeMonitor.performance_log,
            "active_signal": RealtimeMonitor.active_signal,
            "model_version": ModelRegistry.current_model_version,
            "signal_history": ModelRegistry.signal_history[-100:]  # 最近100个信号
        }
        
        with open("strategy_state/strategy_state.pkl", "wb") as f:
            pickle.dump(state_data, f)
        
        Log(" 状态已保存", "#00ff00")

使用Numba加速数值计算

from numba import jit

@jit(nopython=True)
def calculate_ewma_fast(data, span):
    alpha = 2.0 / (span + 1.0)
    ewma = np.empty_like(data)
    ewma[0] = data[0]
    for i in range(1, len(data)):
        ewma[i] = alpha * data[i] + (1.0 - alpha) * ewma[i-1]
    return ewma

# 性能对比:纯Python vs Numba
# 计算10000次EMA,Numba快50倍以上

异步批处理

async def batch_predict(features_batch):
    if len(features_batch) > 1:
        scaled_batch = ModelRegistry.scaler.transform(features_batch)

        predictions = ModelRegistry.lgbm_model.predict_proba(scaled_batch)
        return predictions
    else:
        return await single_predict(features_batch[0])

缓存计算结果

from functools import lru_cache

class FeatureCache:
    _cache = {}
    
    @staticmethod
    def calculate_with_cache(key, calculate_func, *args):
        if key in FeatureCache._cache:
            return FeatureCache._cache[key]
        
        result = calculate_func(*args)
        FeatureCache._cache[key] = result
        
        # 清理旧缓存
        if len(FeatureCache._cache) > 1000:
            oldest_key = next(iter(FeatureCache._cache))
            del FeatureCache._cache[oldest_key]
        
        return result

这个量化交易系统展示了如何将现代机器学习技术应用于金融市场。关键收获:

  • 工程化思维:交易系统不只是算法,更是完整的工程问题

  • 避免数据泄露:严格的时序数据处理是成功的关键

  • 混合模型优势:传统ML与深度学习互补能提升性能

  • 生产环境考量:监控、运维、稳定性同样重要

  • 持续优化:量化交易是一个不断迭代的过程

目前策略没有写下单接口 有需要的可写完。

免责声明

重要提示:

本文仅供技术学习交流,量化交易有风险,实盘前请充分测试,过去表现不代表未来收益。

相关推荐
全部留言
avatar of ianzeng123
ianzeng123
🐮🐮🐮!
2025-12-24 17:29:07
avatar of 发明者量化-小小梦
发明者量化-小小梦
感谢分享!
2025-12-24 11:38:39