今天我将分享一个FMZ上完整的量化交易系统实现,它不是一个简单的回测脚本,而是一个的实时交易框架。这个系统已经在加密货币市场(ETH/USDT)上实际运行,具备以下特性:
* 实时数据处理 - WebSocket流式数据
* 混合AI模型 - LightGBM + Transformer
* 自动运维 - 特征漂移检测 + 自动再训练
* 生产就绪 - 热切换 + 状态持久化 + 实时监控
* 完整工程化 - 配置管理 + 错误处理 + 性能优化
## 第一部分:环境搭建
### 安装依赖
```bash
# 核心依赖
pip install websockets lightgbm torch scikit-learn
pip install bayesian-optimization pandas numpy scipy
# 可选:钉钉通知
pip install requests pyyaml
## 代码结构
quant_trading/
├── config.yaml # 配置文件
├── models_v4/ # 模型存储目录
├── strategy_state/ # 运行时状态保存
├── main.py # 主程序(2000+行完整代码)
├── requirements.txt # 依赖列表
└── README.md # 项目说明
class Config:
"""智能配置管理系统"""
def __init__(self, config_file="config.yaml"):
# 默认配置
self.defaults = {
"trading": {
"pair": "ETH_USDT", # 交易对
"train_bars": 1440, # 训练数据量(24小时)
"predict_horizon": 10, # 预测未来几分钟
"spread_threshold": 0.002 # 交易阈值
},
"transformer": {
"enabled": True, # 启用Transformer
"seq_len": 30, # 序列长度
"d_model": 32, # 特征维度
"train_epochs": 10 # 训练轮数
}
}
# 支持外部配置文件热重载
self._load_external_config(config_file)
## 实时数据流处理
async def websocket_producer(uri, queue):
"""WebSocket数据生产者"""
reconnect_delay = 5 # 智能重连机制
while True:
try:
async with websockets.connect(uri, ping_interval=20) as ws:
reconnect_delay = 5 # 重置延迟
while True:
data = await ws.recv()
parsed = json.loads(data)
await queue.put(parsed) # 放入异步队列
except Exception as e:
Log(f"连接断开: {e}, {reconnect_delay}秒后重连", "#ff0000")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(300, reconnect_delay * 2)
async def kline_generator():
"""分钟K线合成器"""
while True:
# 精确等待下一分钟
now = time.time()
wait_seconds = 60.5 - (now % 60) # 0.5秒缓冲
await asyncio.sleep(wait_seconds)
# 合成K线
minute_ticks = get_last_minute_ticks()
if minute_ticks:
new_kline = {
"ts": last_minute_start_ts,
"open": minute_ticks[0]["price"],
"high": max(t["price"] for t in minute_ticks),
"low": min(t["price"] for t in minute_ticks),
"close": minute_ticks[-1]["price"],
"volume": sum(t["qty"] for t in minute_ticks)
}
FeatureStore.klines_1min.append(new_kline)
# 自动清理旧数据
twenty_four_hours_ago = time.time() * 1000 - 24*3600*1000
FeatureStore.klines_1min = [
k for k in FeatureStore.klines_1min
if k["ts"] > twenty_four_hours_ago
]
def calculate_tabular_features_and_labels_vectorized(klines, ticks, order_books, is_realtime=False):
"""计算58个技术指标(避免数据泄露版本)"""
features, labels = [], []
# 基础价格特征
feature_dict["price_change_1m"] = (closes[-1] - closes[-2]) / closes[-2]
feature_dict["price_change_5m"] = (closes[-1] - closes[-6]) / closes[-6]
# 波动率特征(关键:不使用未来数据!)
feature_dict["volatility_10m"] = np.std(closes[-11:-1]) # t-11到t-1
feature_dict["volatility_30m"] = np.std(closes[-31:-1]) # t-31到t-1
# 成交量特征
feature_dict["volume_ratio_5m"] = volumes[-1] / np.mean(volumes[-5:-1])
# 技术指标
feature_dict["rsi_14"] = calculate_rsi(price_changes[-15:-1]) # 使用历史数据
feature_dict["macd"], feature_dict["macd_hist"] = calculate_macd(closes[:-1])
# 订单簿特征
feature_dict["bid_ask_spread"] = ask_price - bid_price
feature_dict["order_imbalance"] = (bid_volume - ask_volume) / (bid_volume + ask_volume)
# 高级统计特征
feature_dict["price_skewness_30"] = skew(closes[-32:-2]) # t-31到t-2
feature_dict["price_kurtosis_30"] = kurtosis(closes[-32:-2])
# 交互特征
feature_dict["rsi_x_volatility"] = feature_dict["rsi_14"] * feature_dict["volatility_30m"]
return features, labels
价格特征 成交量特征 技术指标 订单簿特征 统计特征 交互特征
def update_feature_names_with_transformer():
"""更新特征名称列表以包含 Transformer 特征"""
base_features = [
"obv_change_rate", "vpt_zscore_20", "cmf_20", "price_to_vwap_ratio", "price_change_1m", "price_change_5m",
"price_change_15m", "volatility_10m", "volatility_30m", "volume_1m", "volume_5m",
"volume_change_5m", "rsi_14", "hour_of_day", "alpha_5m", "wobi_10s", "spread_10s",
"depth_imbalance_5", "trade_imbalance_10s", "macd", "macd_hist", "bollinger_width",
"return_rolling_mean_5", "return_rolling_std_5", "rsi_x_volatility_30m",
"trend_strength", "price_skewness_30", "price_kurtosis_30", "atr_14"
]
if config.TRANSFORMER_ENABLED:
transformer_features = [f"transformer_feat_{i}" for i in range(config.TRANSFORMER_D_MODEL)]
ModelRegistry.feature_names = base_features + transformer_features
else:
ModelRegistry.feature_names = base_features
Log(f"特征名称已更新: 共 {len(ModelRegistry.feature_names)} 个特征")
## 混合模型架构 - LightGBM + Transformer
# Transformer模型 - 处理序列数据
class TimeSeriesTransformer(nn.Module):
def __init__(self, input_dim=5, d_model=32, nhead=4):
super().__init__()
self.input_proj = nn.Linear(input_dim, d_model)
encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, batch_first=True)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, 2)
self.classifier = nn.Linear(d_model, 3) # 3类:上涨/下跌/盘整
# LightGBM模型 - 处理表格特征
def train_lightgbm_with_bayesian_optimization(X, y):
"""贝叶斯优化调参"""
def lgbm_objective(num_leaves, max_depth, learning_rate):
params = {
'num_leaves': int(num_leaves),
'max_depth': int(max_depth),
'learning_rate': learning_rate,
'objective': 'multiclass',
'num_class': 3
}
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
accuracies = []
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model = lgb.LGBMClassifier(**params)
model.fit(X_train, y_train)
preds = model.predict(X_val)
accuracies.append(accuracy_score(y_val, preds))
return np.mean(accuracies) # 贝叶斯优化最大化准确率
# 运行贝叶斯优化
optimizer = BayesianOptimization(
f=lgbm_objective,
pbounds={'num_leaves': (20, 200), 'max_depth': (5, 50), 'learning_rate': (0.01, 0.1)}
)
optimizer.maximize(init_points=5, n_iter=25)
return optimizer.max['params'] # 返回最佳参数
表格特征(58维) → LightGBM → 特征向量(32维)
序列特征(30×5维) → Transformer → 特征向量(32维)
↓
拼接(64维) → 全连接层 → 最终预测(3类)
def check_feature_drift(realtime_features):
"""检测数据分布变化"""
drifts = []
for i, name in enumerate(ModelRegistry.feature_names):
train_mean = ModelRegistry.training_feature_dist[name]["mean"]
train_std = ModelRegistry.training_feature_dist[name]["std"]
# 计算Z-score漂移
drift = abs(realtime_features[i] - train_mean) / (train_std + 1e-10)
drifts.append(drift)
avg_drift = np.mean(drifts)
if avg_drift > config.FEATURE_DRIFT_THRESHOLD:
Log(f" 特征漂移警报: {avg_drift:.4f}", "#ff0000")
trigger_auto_retrain() # 触发自动再训练
def hot_switch_model():
"""无中断更新模型"""
if ModelRegistry.next_lgbm_model:
Log(f" 热切换模型: {ModelRegistry.current_model_version} → {ModelRegistry.next_model_version}")
# 原子性切换
ModelRegistry.lgbm_model = ModelRegistry.next_lgbm_model
ModelRegistry.transformer_model = ModelRegistry.next_transformer_model
ModelRegistry.scaler = ModelRegistry.next_scaler
ModelRegistry.current_model_version = ModelRegistry.next_model_version
# 清理临时变量
ModelRegistry.next_lgbm_model = None
ModelRegistry.next_model_version = None
Log(" 模型热切换完成", "#00ff00")
class StatePersistence:
@staticmethod
def save_state():
"""保存所有运行时状态"""
state_data = {
"timestamp": time.time(),
"klines_1min": FeatureStore.klines_1min[-1000:], # 保存最近1000条
"performance_log": RealtimeMonitor.performance_log,
"active_signal": RealtimeMonitor.active_signal,
"model_version": ModelRegistry.current_model_version,
"signal_history": ModelRegistry.signal_history[-100:] # 最近100个信号
}
with open("strategy_state/strategy_state.pkl", "wb") as f:
pickle.dump(state_data, f)
Log(" 状态已保存", "#00ff00")
from numba import jit
@jit(nopython=True)
def calculate_ewma_fast(data, span):
alpha = 2.0 / (span + 1.0)
ewma = np.empty_like(data)
ewma[0] = data[0]
for i in range(1, len(data)):
ewma[i] = alpha * data[i] + (1.0 - alpha) * ewma[i-1]
return ewma
# 性能对比:纯Python vs Numba
# 计算10000次EMA,Numba快50倍以上
async def batch_predict(features_batch):
if len(features_batch) > 1:
scaled_batch = ModelRegistry.scaler.transform(features_batch)
predictions = ModelRegistry.lgbm_model.predict_proba(scaled_batch)
return predictions
else:
return await single_predict(features_batch[0])
from functools import lru_cache
class FeatureCache:
_cache = {}
@staticmethod
def calculate_with_cache(key, calculate_func, *args):
if key in FeatureCache._cache:
return FeatureCache._cache[key]
result = calculate_func(*args)
FeatureCache._cache[key] = result
# 清理旧缓存
if len(FeatureCache._cache) > 1000:
oldest_key = next(iter(FeatureCache._cache))
del FeatureCache._cache[oldest_key]
return result
这个量化交易系统展示了如何将现代机器学习技术应用于金融市场。关键收获:
工程化思维:交易系统不只是算法,更是完整的工程问题
避免数据泄露:严格的时序数据处理是成功的关键
混合模型优势:传统ML与深度学习互补能提升性能
生产环境考量:监控、运维、稳定性同样重要
持续优化:量化交易是一个不断迭代的过程
目前策略没有写下单接口 有需要的可写完。
重要提示:
本文仅供技术学习交流,量化交易有风险,实盘前请充分测试,过去表现不代表未来收益。