2
پر توجہ دیں
45
پیروکار

Python (LightGBM + ٹرانسفارمر ہائبرڈ حکمت عملی) کا استعمال کرتے ہوئے AI سے چلنے والے مقداری تجارتی نظام کی تعمیر

میں تخلیق کیا: 2025-12-24 10:44:43, تازہ کاری: 2025-12-30 10:02:31
comments   2
hits   671

تعارف

آج میں ایف ایم زیڈ پر ایک مکمل کوانٹم ٹریڈنگ سسٹم کا نفاذ شیئر کروں گا۔ یہ ایک سادہ ریٹرننگ اسکرپٹ نہیں ہے بلکہ یہ ایک ریئل ٹائم ٹریڈنگ فریم ورک ہے۔ یہ سسٹم پہلے سے ہی کرپٹو کرنسی مارکیٹ (ETH/USDT) پر عملی طور پر کام کر رہا ہے اور اس میں درج ذیل خصوصیات ہیں:

  • ریئل ٹائم ڈیٹا پروسیسنگ- WebSocket سٹریم ڈیٹا
  • مخلوط اے آئی ماڈل - LightGBM + Transformer
  • خودکار بحالی- خصوصیت بہاؤ کا پتہ لگانے + خود کار طریقے سے دوبارہ تربیت
  • پیداوار کے لئے تیار- گرم، شہوت انگیز سوئچنگ + حالت کو برقرار رکھنے + اصل وقت کی نگرانی
  • مکمل انجنیئرنگ- ترتیب مینجمنٹ + خرابی ہینڈلنگ + کارکردگی کی اصلاح

سسٹم فن تعمیر کا منظر

  • ڈیٹا کی پرت: Binance WebSocket → ریئل ٹائم K لائن ترکیب → خصوصیت انجینئرنگ
  • ماڈل پرت: لائٹ جی بی ایم ((ٹیبل کی خصوصیت) + ٹرانسفارمر ((سیریل کی خصوصیت) → انضمام کی پیش گوئی
  • فیصلہ سازی کی سطح: سگنل کی توثیق → رسک مینجمنٹ → ٹرانزیکشن پر عملدرآمد
  • نگرانی کی پرت: کارکردگی کی نگرانی + بہاؤ کا پتہ لگانے + خود کار طریقے سے دوبارہ تربیت
## 第一部分:环境搭建

###  安装依赖
```bash
# 核心依赖
pip install websockets lightgbm torch scikit-learn 
pip install bayesian-optimization pandas numpy scipy

# 可选:钉钉通知
pip install requests pyyaml

کوڈ کی ساخت

quant_trading/
├── config.yaml              # 配置文件
├── models_v4/              # 模型存储目录
├── strategy_state/         # 运行时状态保存
├── main.py                 # 主程序(2000+行完整代码)
├── requirements.txt        # 依赖列表
└── README.md              # 项目说明

بنیادی کوڈ

class Config:
    """智能配置管理系统"""
    def __init__(self, config_file="config.yaml"):
        # 默认配置
        self.defaults = {
            "trading": {
                "pair": "ETH_USDT",        # 交易对
                "train_bars": 1440,        # 训练数据量(24小时)
                "predict_horizon": 10,     # 预测未来几分钟
                "spread_threshold": 0.002  # 交易阈值
            },
            "transformer": {
                "enabled": True,           # 启用Transformer
                "seq_len": 30,             # 序列长度
                "d_model": 32,             # 特征维度
                "train_epochs": 10         # 训练轮数
            }
        }
        # 支持外部配置文件热重载
        self._load_external_config(config_file)

ریئل ٹائم ڈیٹا اسٹریم پروسیسنگ

async def websocket_producer(uri, queue):
    """WebSocket数据生产者"""
    reconnect_delay = 5  # 智能重连机制
    while True:
        try:
            async with websockets.connect(uri, ping_interval=20) as ws:
                reconnect_delay = 5  # 重置延迟
                while True:
                    data = await ws.recv()
                    parsed = json.loads(data)
                    await queue.put(parsed)  # 放入异步队列
        except Exception as e:
            Log(f"连接断开: {e}, {reconnect_delay}秒后重连", "#ff0000")
            await asyncio.sleep(reconnect_delay)
            reconnect_delay = min(300, reconnect_delay * 2) 
async def kline_generator():
    """分钟K线合成器"""
    while True:
        # 精确等待下一分钟
        now = time.time()
        wait_seconds = 60.5 - (now % 60)  # 0.5秒缓冲
        await asyncio.sleep(wait_seconds)
        
        # 合成K线
        minute_ticks = get_last_minute_ticks()
        if minute_ticks:
            new_kline = {
                "ts": last_minute_start_ts,
                "open": minute_ticks[0]["price"],
                "high": max(t["price"] for t in minute_ticks),
                "low": min(t["price"] for t in minute_ticks),
                "close": minute_ticks[-1]["price"],
                "volume": sum(t["qty"] for t in minute_ticks)
            }
            FeatureStore.klines_1min.append(new_kline)
            
            # 自动清理旧数据
            twenty_four_hours_ago = time.time() * 1000 - 24*3600*1000
            FeatureStore.klines_1min = [
                k for k in FeatureStore.klines_1min 
                if k["ts"] > twenty_four_hours_ago
            ]

خصوصیت کی تعمیر - 58 تکنیکی اشارے حاصل

def calculate_tabular_features_and_labels_vectorized(klines, ticks, order_books, is_realtime=False):
    """计算58个技术指标(避免数据泄露版本)"""
    features, labels = [], []
    
    # 基础价格特征
    feature_dict["price_change_1m"] = (closes[-1] - closes[-2]) / closes[-2]
    feature_dict["price_change_5m"] = (closes[-1] - closes[-6]) / closes[-6]
    
    # 波动率特征(关键:不使用未来数据!)
    feature_dict["volatility_10m"] = np.std(closes[-11:-1])  # t-11到t-1
    feature_dict["volatility_30m"] = np.std(closes[-31:-1])  # t-31到t-1
    
    # 成交量特征
    feature_dict["volume_ratio_5m"] = volumes[-1] / np.mean(volumes[-5:-1])
    
    # 技术指标
    feature_dict["rsi_14"] = calculate_rsi(price_changes[-15:-1])  # 使用历史数据
    feature_dict["macd"], feature_dict["macd_hist"] = calculate_macd(closes[:-1])
    
    # 订单簿特征
    feature_dict["bid_ask_spread"] = ask_price - bid_price
    feature_dict["order_imbalance"] = (bid_volume - ask_volume) / (bid_volume + ask_volume)
    
    # 高级统计特征
    feature_dict["price_skewness_30"] = skew(closes[-32:-2])  # t-31到t-2
    feature_dict["price_kurtosis_30"] = kurtosis(closes[-32:-2])
    
    # 交互特征
    feature_dict["rsi_x_volatility"] = feature_dict["rsi_14"] * feature_dict["volatility_30m"]
    
    return features, labels

خصوصیت کی درجہ بندی

قیمت کی خصوصیت حجم کی خصوصیت تکنیکی اشارے آرڈر بک کی خصوصیت شماریاتی خصوصیت تعامل کی خصوصیت

def update_feature_names_with_transformer():
    """更新特征名称列表以包含 Transformer 特征"""
    base_features = [
        "obv_change_rate", "vpt_zscore_20", "cmf_20", "price_to_vwap_ratio", "price_change_1m", "price_change_5m", 
        "price_change_15m", "volatility_10m", "volatility_30m", "volume_1m", "volume_5m", 
        "volume_change_5m", "rsi_14", "hour_of_day", "alpha_5m", "wobi_10s", "spread_10s", 
        "depth_imbalance_5", "trade_imbalance_10s", "macd", "macd_hist", "bollinger_width", 
        "return_rolling_mean_5", "return_rolling_std_5", "rsi_x_volatility_30m", 
        "trend_strength", "price_skewness_30", "price_kurtosis_30", "atr_14"
    ]
    
    if config.TRANSFORMER_ENABLED:
        transformer_features = [f"transformer_feat_{i}" for i in range(config.TRANSFORMER_D_MODEL)]
        ModelRegistry.feature_names = base_features + transformer_features
    else:
        ModelRegistry.feature_names = base_features
    
    Log(f"特征名称已更新: 共 {len(ModelRegistry.feature_names)} 个特征")

مخلوط ماڈل فن تعمیر - LightGBM + ٹرانسفارمر

# Transformer模型 - 处理序列数据
class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_dim=5, d_model=32, nhead=4):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, 2)
        self.classifier = nn.Linear(d_model, 3)  # 3类:上涨/下跌/盘整

# LightGBM模型 - 处理表格特征
def train_lightgbm_with_bayesian_optimization(X, y):
    """贝叶斯优化调参"""
    def lgbm_objective(num_leaves, max_depth, learning_rate):
        params = {
            'num_leaves': int(num_leaves),
            'max_depth': int(max_depth),
            'learning_rate': learning_rate,
            'objective': 'multiclass',
            'num_class': 3
        }
        
        # 时间序列交叉验证
        tscv = TimeSeriesSplit(n_splits=5)
        accuracies = []
        
        for train_idx, val_idx in tscv.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            model = lgb.LGBMClassifier(**params)
            model.fit(X_train, y_train)
            preds = model.predict(X_val)
            accuracies.append(accuracy_score(y_val, preds))
        
        return np.mean(accuracies)  # 贝叶斯优化最大化准确率
    
    # 运行贝叶斯优化
    optimizer = BayesianOptimization(
        f=lgbm_objective,
        pbounds={'num_leaves': (20, 200), 'max_depth': (5, 50), 'learning_rate': (0.01, 0.1)}
    )
    optimizer.maximize(init_points=5, n_iter=25)
    
    return optimizer.max['params']  # 返回最佳参数

ماڈل انضمام کی حکمت عملی

表格特征(58维) → LightGBM → 特征向量(32维)
序列特征(30×5维) → Transformer → 特征向量(32维)
                      ↓
                拼接(64维) → 全连接层 → 最终预测(3类)

ماحولیاتی خصوصیات - نظام کو مستحکم چلانے کے لئے

خصوصیت بہاؤ کا پتہ لگانے

def check_feature_drift(realtime_features):
    """检测数据分布变化"""
    drifts = []
    for i, name in enumerate(ModelRegistry.feature_names):
        train_mean = ModelRegistry.training_feature_dist[name]["mean"]
        train_std = ModelRegistry.training_feature_dist[name]["std"]
        
        # 计算Z-score漂移
        drift = abs(realtime_features[i] - train_mean) / (train_std + 1e-10)
        drifts.append(drift)
    
    avg_drift = np.mean(drifts)
    if avg_drift > config.FEATURE_DRIFT_THRESHOLD:
        Log(f" 特征漂移警报: {avg_drift:.4f}", "#ff0000")
        trigger_auto_retrain()  # 触发自动再训练 

ماڈل گرم سوئچ

def hot_switch_model():
    """无中断更新模型"""
    if ModelRegistry.next_lgbm_model:
        Log(f" 热切换模型: {ModelRegistry.current_model_version} → {ModelRegistry.next_model_version}")
        
        # 原子性切换
        ModelRegistry.lgbm_model = ModelRegistry.next_lgbm_model
        ModelRegistry.transformer_model = ModelRegistry.next_transformer_model
        ModelRegistry.scaler = ModelRegistry.next_scaler
        ModelRegistry.current_model_version = ModelRegistry.next_model_version
        
        # 清理临时变量
        ModelRegistry.next_lgbm_model = None
        ModelRegistry.next_model_version = None
        
        Log(" 模型热切换完成", "#00ff00")

حالت کا مستقل ہونا

class StatePersistence:
    @staticmethod
    def save_state():
        """保存所有运行时状态"""
        state_data = {
            "timestamp": time.time(),
            "klines_1min": FeatureStore.klines_1min[-1000:],  # 保存最近1000条
            "performance_log": RealtimeMonitor.performance_log,
            "active_signal": RealtimeMonitor.active_signal,
            "model_version": ModelRegistry.current_model_version,
            "signal_history": ModelRegistry.signal_history[-100:]  # 最近100个信号
        }
        
        with open("strategy_state/strategy_state.pkl", "wb") as f:
            pickle.dump(state_data, f)
        
        Log(" 状态已保存", "#00ff00")

Numba کی طرف سے ایکسلریٹڈ عددی حساب کتاب

from numba import jit

@jit(nopython=True)
def calculate_ewma_fast(data, span):
    alpha = 2.0 / (span + 1.0)
    ewma = np.empty_like(data)
    ewma[0] = data[0]
    for i in range(1, len(data)):
        ewma[i] = alpha * data[i] + (1.0 - alpha) * ewma[i-1]
    return ewma

# 性能对比:纯Python vs Numba
# 计算10000次EMA,Numba快50倍以上

غیر ہم آہنگ بیچنگ

async def batch_predict(features_batch):
    if len(features_batch) > 1:
        scaled_batch = ModelRegistry.scaler.transform(features_batch)

        predictions = ModelRegistry.lgbm_model.predict_proba(scaled_batch)
        return predictions
    else:
        return await single_predict(features_batch[0])

کیشے حساب کے نتائج

from functools import lru_cache

class FeatureCache:
    _cache = {}
    
    @staticmethod
    def calculate_with_cache(key, calculate_func, *args):
        if key in FeatureCache._cache:
            return FeatureCache._cache[key]
        
        result = calculate_func(*args)
        FeatureCache._cache[key] = result
        
        # 清理旧缓存
        if len(FeatureCache._cache) > 1000:
            oldest_key = next(iter(FeatureCache._cache))
            del FeatureCache._cache[oldest_key]
        
        return result

اس کوانٹم ٹریڈنگ سسٹم نے مالیاتی منڈیوں میں جدید مشین لرننگ ٹیکنالوجی کو لاگو کرنے کا مظاہرہ کیا ہے۔

  • انجینئرنگ سوچ: ٹرانزیکشن سسٹم صرف الگورتھم نہیں بلکہ انجینئرنگ کا ایک مکمل مسئلہ ہے

  • ڈیٹا لیک سے بچنے کے لیے: سخت ٹائم لائن ڈیٹا پروسیسنگ کامیابی کی کلید ہے

  • ہائبرڈ ماڈل کے فوائد: روایتی ایم ایل اور گہری سیکھنے کی کارکردگی میں اضافہ

  • پیداوار کے ماحول پر غور: نگرانی، بحالی اور استحکام بھی اہم ہیں

  • مسلسل بہتر بنانا: تجارت کو مقدار میں بڑھانا ایک بار بار چلنے والا عمل ہے

اس وقت کی حکمت عملی میں ایک انٹرفیس نہیں لکھا گیا ہے۔

ڈس کلیمر

اہم نوٹ:

یہ مضمون صرف تکنیکی سیکھنے اور تبادلہ خیال کے لئے ہے ، مقداری تجارت میں خطرہ ہے ، براہ کرم عملی طور پر ٹیسٹ کرنے سے پہلے اچھی طرح سے ٹیسٹ کریں ، ماضی کی کارکردگی مستقبل کی آمدنی کی نمائندگی نہیں کرتی ہے۔