2
집중하다
45
수행원

파이썬을 활용한 AI 기반 정량적 트레이딩 시스템 구축 (LightGBM + Transformer 하이브리드 전략)

만든 날짜: 2025-12-24 10:44:43, 업데이트 날짜: 2025-12-30 10:02:31
comments   2
hits   671

소개

오늘 저는 FMZ에 대한 완전한 양자 거래 시스템 구현을 공유할 것입니다. 이것은 단순한 재검토 스크립트가 아니라 실시간 거래 프레임워크입니다. 이 시스템은 이미 암호화폐 시장 (ETH/USDT) 에서 실제로 작동하고 있으며 다음과 같은 특징을 가지고 있습니다.

  • 실시간 데이터 처리- 웹소켓 스트림 데이터
  • 혼합 AI 모델 - LightGBM + Transformer
  • 자동화- 특징 지연 탐지 + 자동 재 훈련
  • 생산 준비- 핫 스위치 + 상태 지속 + 실시간 모니터링
  • 완전 엔지니어링- 구성 관리 + 오류 처리 + 성능 최적화

시스템 아키텍처 전체

  • 데이터 계층: 뱅이안 웹소켓 → 실시간 K선 합성 → 특징 엔지니어링
  • 모델 계층: LightGBM ((표 특성) + Transformer ((열 특성) → 융합 예측
  • 의사 결정 계층: 신호 검증 → 위험 관리 → 거래 실행
  • 모니터링 계층: 성능 모니터링 + 지동 탐지 + 자동 재교육
## 第一部分:环境搭建

###  安装依赖
```bash
# 核心依赖
pip install websockets lightgbm torch scikit-learn 
pip install bayesian-optimization pandas numpy scipy

# 可选:钉钉通知
pip install requests pyyaml

코드 구조

quant_trading/
├── config.yaml              # 配置文件
├── models_v4/              # 模型存储目录
├── strategy_state/         # 运行时状态保存
├── main.py                 # 主程序(2000+行完整代码)
├── requirements.txt        # 依赖列表
└── README.md              # 项目说明

핵심 코드

class Config:
    """智能配置管理系统"""
    def __init__(self, config_file="config.yaml"):
        # 默认配置
        self.defaults = {
            "trading": {
                "pair": "ETH_USDT",        # 交易对
                "train_bars": 1440,        # 训练数据量(24小时)
                "predict_horizon": 10,     # 预测未来几分钟
                "spread_threshold": 0.002  # 交易阈值
            },
            "transformer": {
                "enabled": True,           # 启用Transformer
                "seq_len": 30,             # 序列长度
                "d_model": 32,             # 特征维度
                "train_epochs": 10         # 训练轮数
            }
        }
        # 支持外部配置文件热重载
        self._load_external_config(config_file)

실시간 데이터 스트림 처리

async def websocket_producer(uri, queue):
    """WebSocket数据生产者"""
    reconnect_delay = 5  # 智能重连机制
    while True:
        try:
            async with websockets.connect(uri, ping_interval=20) as ws:
                reconnect_delay = 5  # 重置延迟
                while True:
                    data = await ws.recv()
                    parsed = json.loads(data)
                    await queue.put(parsed)  # 放入异步队列
        except Exception as e:
            Log(f"连接断开: {e}, {reconnect_delay}秒后重连", "#ff0000")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(300, reconnect_delay * 2) 
async def kline_generator():
    """分钟K线合成器"""
    while True:
        # 精确等待下一分钟
        now = time.time()
        wait_seconds = 60.5 - (now % 60)  # 0.5秒缓冲
        await asyncio.sleep(wait_seconds)
        
        # 合成K线
        minute_ticks = get_last_minute_ticks()
        if minute_ticks:
            new_kline = {
                "ts": last_minute_start_ts,
                "open": minute_ticks[0]["price"],
                "high": max(t["price"] for t in minute_ticks),
                "low": min(t["price"] for t in minute_ticks),
                "close": minute_ticks[-1]["price"],
                "volume": sum(t["qty"] for t in minute_ticks)
            }
            FeatureStore.klines_1min.append(new_kline)
            
            # 自动清理旧数据
            twenty_four_hours_ago = time.time() * 1000 - 24*3600*1000
            FeatureStore.klines_1min = [
                k for k in FeatureStore.klines_1min 
                if k["ts"] > twenty_four_hours_ago
            ]

특징 프로젝트 - 58 기술 지표 실현

def calculate_tabular_features_and_labels_vectorized(klines, ticks, order_books, is_realtime=False):
    """计算58个技术指标(避免数据泄露版本)"""
    features, labels = [], []
    
    # 基础价格特征
    feature_dict["price_change_1m"] = (closes[-1] - closes[-2]) / closes[-2]
    feature_dict["price_change_5m"] = (closes[-1] - closes[-6]) / closes[-6]
    
    # 波动率特征(关键:不使用未来数据!)
    feature_dict["volatility_10m"] = np.std(closes[-11:-1])  # t-11到t-1
    feature_dict["volatility_30m"] = np.std(closes[-31:-1])  # t-31到t-1
    
    # 成交量特征
    feature_dict["volume_ratio_5m"] = volumes[-1] / np.mean(volumes[-5:-1])
    
    # 技术指标
    feature_dict["rsi_14"] = calculate_rsi(price_changes[-15:-1])  # 使用历史数据
    feature_dict["macd"], feature_dict["macd_hist"] = calculate_macd(closes[:-1])
    
    # 订单簿特征
    feature_dict["bid_ask_spread"] = ask_price - bid_price
    feature_dict["order_imbalance"] = (bid_volume - ask_volume) / (bid_volume + ask_volume)
    
    # 高级统计特征
    feature_dict["price_skewness_30"] = skew(closes[-32:-2])  # t-31到t-2
    feature_dict["price_kurtosis_30"] = kurtosis(closes[-32:-2])
    
    # 交互特征
    feature_dict["rsi_x_volatility"] = feature_dict["rsi_14"] * feature_dict["volatility_30m"]
    
    return features, labels

특징 분류

가격 특성, 거래량 특성, 기술 지표, 주문서기 특성, 통계 특성, 상호 작용 특성

def update_feature_names_with_transformer():
    """更新特征名称列表以包含 Transformer 特征"""
    base_features = [
        "obv_change_rate", "vpt_zscore_20", "cmf_20", "price_to_vwap_ratio", "price_change_1m", "price_change_5m", 
        "price_change_15m", "volatility_10m", "volatility_30m", "volume_1m", "volume_5m", 
        "volume_change_5m", "rsi_14", "hour_of_day", "alpha_5m", "wobi_10s", "spread_10s", 
        "depth_imbalance_5", "trade_imbalance_10s", "macd", "macd_hist", "bollinger_width", 
        "return_rolling_mean_5", "return_rolling_std_5", "rsi_x_volatility_30m", 
        "trend_strength", "price_skewness_30", "price_kurtosis_30", "atr_14"
    ]
    
    if config.TRANSFORMER_ENABLED:
        transformer_features = [f"transformer_feat_{i}" for i in range(config.TRANSFORMER_D_MODEL)]
        ModelRegistry.feature_names = base_features + transformer_features
    else:
        ModelRegistry.feature_names = base_features
    
    Log(f"特征名称已更新: 共 {len(ModelRegistry.feature_names)} 个特征")

혼합 모델 구조 - LightGBM + Transformer

# Transformer模型 - 处理序列数据
class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_dim=5, d_model=32, nhead=4):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, 2)
        self.classifier = nn.Linear(d_model, 3)  # 3类:上涨/下跌/盘整

# LightGBM模型 - 处理表格特征
def train_lightgbm_with_bayesian_optimization(X, y):
    """贝叶斯优化调参"""
    def lgbm_objective(num_leaves, max_depth, learning_rate):
        params = {
            'num_leaves': int(num_leaves),
            'max_depth': int(max_depth),
            'learning_rate': learning_rate,
            'objective': 'multiclass',
            'num_class': 3
        }
        
        # 时间序列交叉验证
        tscv = TimeSeriesSplit(n_splits=5)
        accuracies = []
        
        for train_idx, val_idx in tscv.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            model = lgb.LGBMClassifier(**params)
            model.fit(X_train, y_train)
            preds = model.predict(X_val)
            accuracies.append(accuracy_score(y_val, preds))
        
        return np.mean(accuracies)  # 贝叶斯优化最大化准确率
    
    # 运行贝叶斯优化
    optimizer = BayesianOptimization(
        f=lgbm_objective,
        pbounds={'num_leaves': (20, 200), 'max_depth': (5, 50), 'learning_rate': (0.01, 0.1)}
    )
    optimizer.maximize(init_points=5, n_iter=25)
    
    return optimizer.max['params']  # 返回最佳参数

모델 융합 전략

表格特征(58维) → LightGBM → 特征向量(32维)
序列特征(30×5维) → Transformer → 特征向量(32维)
                      ↓
                拼接(64维) → 全连接层 → 最终预测(3类)

환경 특성 - 시스템을 안정적으로 작동시키는 것

특징 기어 탐지

def check_feature_drift(realtime_features):
    """检测数据分布变化"""
    drifts = []
    for i, name in enumerate(ModelRegistry.feature_names):
        train_mean = ModelRegistry.training_feature_dist[name]["mean"]
        train_std = ModelRegistry.training_feature_dist[name]["std"]
        
        # 计算Z-score漂移
        drift = abs(realtime_features[i] - train_mean) / (train_std + 1e-10)
        drifts.append(drift)
    
    avg_drift = np.mean(drifts)
    if avg_drift > config.FEATURE_DRIFT_THRESHOLD:
        Log(f" 特征漂移警报: {avg_drift:.4f}", "#ff0000")
        trigger_auto_retrain()  # 触发自动再训练 

모형 열전환

def hot_switch_model():
    """无中断更新模型"""
    if ModelRegistry.next_lgbm_model:
        Log(f" 热切换模型: {ModelRegistry.current_model_version} → {ModelRegistry.next_model_version}")
        
        # 原子性切换
        ModelRegistry.lgbm_model = ModelRegistry.next_lgbm_model
        ModelRegistry.transformer_model = ModelRegistry.next_transformer_model
        ModelRegistry.scaler = ModelRegistry.next_scaler
        ModelRegistry.current_model_version = ModelRegistry.next_model_version
        
        # 清理临时变量
        ModelRegistry.next_lgbm_model = None
        ModelRegistry.next_model_version = None
        
        Log(" 模型热切换完成", "#00ff00")

상태의 지속

class StatePersistence:
    @staticmethod
    def save_state():
        """保存所有运行时状态"""
        state_data = {
            "timestamp": time.time(),
            "klines_1min": FeatureStore.klines_1min[-1000:],  # 保存最近1000条
            "performance_log": RealtimeMonitor.performance_log,
            "active_signal": RealtimeMonitor.active_signal,
            "model_version": ModelRegistry.current_model_version,
            "signal_history": ModelRegistry.signal_history[-100:]  # 最近100个信号
        }
        
        with open("strategy_state/strategy_state.pkl", "wb") as f:
            pickle.dump(state_data, f)
        
        Log(" 状态已保存", "#00ff00")

Numba 가속값 계산

from numba import jit

@jit(nopython=True)
def calculate_ewma_fast(data, span):
    alpha = 2.0 / (span + 1.0)
    ewma = np.empty_like(data)
    ewma[0] = data[0]
    for i in range(1, len(data)):
        ewma[i] = alpha * data[i] + (1.0 - alpha) * ewma[i-1]
    return ewma

# 性能对比:纯Python vs Numba
# 计算10000次EMA,Numba快50倍以上

비동기 대량 처리

async def batch_predict(features_batch):
    if len(features_batch) > 1:
        scaled_batch = ModelRegistry.scaler.transform(features_batch)

        predictions = ModelRegistry.lgbm_model.predict_proba(scaled_batch)
        return predictions
    else:
        return await single_predict(features_batch[0])

캐시 계산 결과

from functools import lru_cache

class FeatureCache:
    _cache = {}
    
    @staticmethod
    def calculate_with_cache(key, calculate_func, *args):
        if key in FeatureCache._cache:
            return FeatureCache._cache[key]
        
        result = calculate_func(*args)
        FeatureCache._cache[key] = result
        
        # 清理旧缓存
        if len(FeatureCache._cache) > 1000:
            oldest_key = next(iter(FeatureCache._cache))
            del FeatureCache._cache[oldest_key]
        
        return result

이 양적 거래 시스템은 현대 기계 학습 기술을 금융 시장에 적용하는 방법을 보여줍니다.

  • 엔지니어링 사고: 거래 시스템은 알고리즘이 아니라 전체적인 엔지니어링 문제입니다.

  • 데이터 유출 방지: 엄격한 시간 순서 데이터 처리가 성공의 열쇠

  • 혼합 모델의 장점: 전통적인 ML와 딥러닝이 상호 보완하여 성능을 향상시킵니다.

  • 생산환경 고려: 모니터링, 유지보수, 안정성도 마찬가지

  • 지속적인 최적화: 양적 거래는 반복되는 과정입니다.

현재 정책은 단일 인터페이스를 작성하지 않고 필요한 것은 작성할 수 있습니다.

부인 성명

중요 사항:

이 글은 기술 학습 교류를 위한 내용입니다. 양적 거래에는 위험성이 있으며, 실전 투입 전에 충분히 테스트해 보시기 바랍니다. 과거의 성과는 미래의 수익을 의미하지 않습니다.