基于 Kronos 金融 K 线基础模型的 AI 趋势跟踪策略。Kronos 是首个开源金融 K 线基础模型,训练于 12B+ 条 K 线数据。
策略逻辑:模型预测未来价格区间(P10/P90),当预测中位数有足够利润空间时开仓,到达目标价止盈,触及止损价离场,预测到期强制平仓。一把开一把清,不加仓不减仓。
自动适配现货和期货交易所:现货仅做多,期货支持双向。支持纸面交易和实盘交易两种模式。
━━━ 部署前需修改 ━━━
使用本策略前,请根据你的本地环境修改以下两处:
代码第 1 行 shebang(Python 解释器路径) 当前值: #!/Users/xxxxxx 改为你本地 Kronos 项目 venv 的 Python 路径,例如: #!/home/yourname/Kronos/.venv/bin/python3
策略参数「Kronos 仓库路径」 当前默认值: /Users/xxxxxx 改为你本地 git clone Kronos 仓库的绝对路径
━━━ 环境准备 ━━━
#!/Users/xxxxxx
import sys
import time
import json
import numpy as np
import pandas as pd
# ── 全局状态 ──
predictor = None
trade_history = []
total_trades = 0
cycle_count = 0
init_equity = 0.0
current_position = 0.0 # 仓位方向: -1.0=满空仓, 0.0=空仓, 1.0=满多仓
entry_price = 0.0 # 建仓价
target_price = 0.0 # 止盈目标价(预测中位数)
stop_price = 0.0 # 止损价(P10 或 P90)
bars_held = 0 # 当前持仓已持有的 K 线数
peak_equity = 0.0 # 历史最高资产(用于最大回撤)
consecutive_losses = 0 # 连续亏损次数
halted = False # 熔断标志
bars_since_close = 0 # 平仓后经过的 K 线数(冷却计数)
chart = None
is_futures = False # 是否期货交易所(运行时检测)
# ── 纸面交易虚拟账户 ──
paper_balance = 0.0
paper_stocks = 0.0 # 正数=多头持仓,负数=空头持仓(仅期货)
paper_init_equity = 0.0
# ── 持久化 key ──
STATE_KEY = "kronos_state"
# ── K 线周期 → 秒数映射 ──
PERIOD_MAP = {
"1m": 60, "5m": 300, "15m": 900, "30m": 1800,
"1h": 3600, "4h": 14400, "1d": 86400,
}
# ── 交易精度(运行时初始化) ──
price_precision = 2
amount_precision = 6
min_amount = 0.001
last_predict_ts_ms = 0 # 上次预测时的 K 线时间戳(去重)
# ── 持仓同步节流 ──
last_sync_ts = 0
def detect_exchange_type():
global is_futures
name = exchange.GetName()
is_futures = "Futures" in name
Log("交易所:", name, "| 类型:", "期货" if is_futures else "现货")
def init_precision():
global price_precision, amount_precision, min_amount
if PricePrecision >= 0:
price_precision = int(PricePrecision)
if AmountPrecision >= 0:
amount_precision = int(AmountPrecision)
if PricePrecision < 0 or AmountPrecision < 0:
markets = exchange.GetMarkets()
if markets:
sym = str(Symbol)
if sym in markets:
m = markets[sym]
if PricePrecision < 0 and "PricePrecision" in m:
price_precision = int(m["PricePrecision"])
if AmountPrecision < 0 and "AmountPrecision" in m:
amount_precision = int(m["AmountPrecision"])
if "MinQty" in m and m["MinQty"] > 0:
min_amount = float(m["MinQty"])
Log("交易精度(自动) | 价格:", price_precision, "位 | 数量:", amount_precision,
"位 | 最小量:", min_amount)
return
Log("交易精度 | 价格:", price_precision, "位 | 数量:", amount_precision, "位 | 最小量:", min_amount)
def round_price(price):
return _N(price, price_precision)
def round_amount(amount):
return _N(amount, amount_precision)
def save_state():
state = {
"trade_history": trade_history[-100:],
"total_trades": total_trades,
"cycle_count": cycle_count,
"init_equity": init_equity,
"current_position": current_position,
"entry_price": entry_price,
"target_price": target_price,
"stop_price": stop_price,
"bars_held": bars_held,
"peak_equity": peak_equity,
"consecutive_losses": consecutive_losses,
"halted": halted,
"bars_since_close": bars_since_close,
"paper_balance": paper_balance,
"paper_stocks": paper_stocks,
"paper_init_equity": paper_init_equity,
"last_predict_ts_ms": last_predict_ts_ms,
}
_G(STATE_KEY, json.dumps(state))
def restore_state():
global trade_history, total_trades, cycle_count, init_equity, current_position
global entry_price, target_price, stop_price, bars_held
global peak_equity, consecutive_losses, halted, bars_since_close
global paper_balance, paper_stocks, paper_init_equity, last_predict_ts_ms
raw = _G(STATE_KEY)
if raw is None:
return False
try:
s = json.loads(raw)
_trade_history = s.get("trade_history", [])
_total_trades = s.get("total_trades", 0)
_cycle_count = s.get("cycle_count", 0)
_init_equity = s.get("init_equity", 0.0)
_current_position = s.get("current_position", 0.0)
_entry_price = s.get("entry_price", 0.0)
_target_price = s.get("target_price", 0.0)
_stop_price = s.get("stop_price", 0.0)
_bars_held = s.get("bars_held", 0)
_peak_equity = s.get("peak_equity", 0.0)
_consecutive_losses = s.get("consecutive_losses", 0)
_halted = s.get("halted", False)
_bars_since_close = s.get("bars_since_close", 0)
_paper_balance = s.get("paper_balance", 0.0)
_paper_stocks = s.get("paper_stocks", 0.0)
_paper_init_equity = s.get("paper_init_equity", 0.0)
_last_predict_ts_ms = s.get("last_predict_ts_ms", 0)
trade_history = _trade_history
total_trades = _total_trades
cycle_count = _cycle_count
init_equity = _init_equity
current_position = _current_position
entry_price = _entry_price
target_price = _target_price
stop_price = _stop_price
bars_held = _bars_held
peak_equity = _peak_equity
consecutive_losses = _consecutive_losses
halted = _halted
bars_since_close = _bars_since_close
paper_balance = _paper_balance
paper_stocks = _paper_stocks
paper_init_equity = _paper_init_equity
last_predict_ts_ms = _last_predict_ts_ms
pos_label = "多" if current_position > 0 else ("空" if current_position < 0 else "无")
halt_info = " | 熔断: 是" if halted else ""
Log("已恢复状态 | 周期:", cycle_count, "| 交易:", total_trades,
"| 仓位:", pos_label + halt_info)
return True
except Exception as e:
Log("状态恢复失败:", str(e))
return False
def validate_state():
global current_position, entry_price, target_price, stop_price, bars_held
if not is_futures and current_position < 0:
Log("警告:现货交易所不支持空头,已清除恢复的空头仓位")
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
def sync_position():
global current_position, entry_price, target_price, stop_price, bars_held, last_sync_ts
if TradeMode == 0:
return
now = time.time()
if now - last_sync_ts < 300:
return
last_sync_ts = now
if is_futures:
long_amt = _get_position_amount(0)
short_amt = _get_position_amount(1)
if long_amt < 0 or short_amt < 0:
Log("持仓查询失败,跳过本轮同步")
return
if current_position > 0 and long_amt == 0:
Log("持仓同步: 本地多头但交易所无持仓,清除本地状态")
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
elif current_position < 0 and short_amt == 0:
Log("持仓同步: 本地空头但交易所无持仓,清除本地状态")
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
elif current_position == 0 and (long_amt > 0 or short_amt > 0):
Log("持仓同步警告: 本地无仓位但交易所有持仓,请手动处理")
else:
account = exchange.GetAccount()
if account:
stocks = float(account.get("Stocks", 0))
if current_position > 0 and stocks < min_amount:
Log("持仓同步: 本地多头但交易所无持仓,清除本地状态")
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
def reset_all():
global trade_history, total_trades, cycle_count, init_equity, current_position
global entry_price, target_price, stop_price, bars_held
global peak_equity, consecutive_losses, halted, bars_since_close
global paper_balance, paper_stocks, paper_init_equity, last_predict_ts_ms
trade_history = []
total_trades = 0
cycle_count = 0
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
consecutive_losses = 0
halted = False
bars_since_close = 0
last_predict_ts_ms = 0
_G(STATE_KEY, None)
LogReset()
LogProfitReset()
if chart:
chart.reset()
if TradeMode == 0:
paper_balance = PaperInitBalance
paper_stocks = 0.0
paper_init_equity = PaperInitBalance
init_equity = PaperInitBalance
else:
init_equity = get_equity()
peak_equity = init_equity
Log("=== 已重置所有数据 ===")
Log("初始资产:", str(_N(init_equity, 2)), "USDT")
def init_kronos():
global predictor
if predictor is not None:
return
if KronosPath not in sys.path:
sys.path.insert(0, KronosPath)
import torch
from model import Kronos, KronosTokenizer, KronosPredictor
if torch.backends.mps.is_available():
device = "mps"
elif torch.cuda.is_available():
device = "cuda:0"
else:
device = "cpu"
Log("Kronos 初始化 | 设备:", device, "| 模型: Kronos-" + str(ModelSize))
tokenizer = KronosTokenizer.from_pretrained("NeoQuasar/Kronos-Tokenizer-base")
model = Kronos.from_pretrained("NeoQuasar/Kronos-" + str(ModelSize))
predictor = KronosPredictor(model, tokenizer, device=device, max_context=512)
Log("Kronos 模型加载完成")
def init_chart():
global chart
chart = Chart({
"tooltip": {"xDateFormat": "%Y-%m-%d %H:%M"},
"title": {"text": "Kronos 预测 vs 实际价格"},
"xAxis": {"type": "datetime"},
"yAxis": [{"title": {"text": "价格"}, "opposite": False}],
"series": [
{"name": "实际价格", "type": "line", "color": "#00d4ff", "data": []},
{"name": "预测中位价", "type": "line", "color": "#ff6b35", "dashStyle": "Dash", "data": []},
{"name": "预测区间(P10-P90)", "type": "arearange", "color": "#ff6b35", "fillOpacity": 0.15, "lineWidth": 0, "data": []},
{"name": "买入", "type": "scatter", "color": "#00ff88", "marker": {"symbol": "triangle", "radius": 6}, "data": []},
{"name": "卖出", "type": "scatter", "color": "#ff4444", "marker": {"symbol": "triangle-down", "radius": 6}, "data": []},
]
})
chart.reset()
def get_equity():
account = exchange.GetAccount()
if account is None:
return 0.0
if is_futures:
return float(account["Balance"]) + float(account.get("FrozenBalance", 0))
else:
ticker = exchange.GetTicker()
if ticker is None:
return 0.0
return float(account["Balance"]) + float(account["Stocks"]) * float(ticker["Last"])
def get_paper_equity(current_price):
if paper_stocks >= 0:
return paper_balance + paper_stocks * current_price
else:
abs_stocks = abs(paper_stocks)
pnl = (entry_price - current_price) * abs_stocks if entry_price > 0 else 0
return paper_balance + abs_stocks * entry_price + pnl
def get_klines():
period_sec = PERIOD_MAP.get(str(KlinePeriod), 3600)
records = exchange.GetRecords(period_sec)
if records is None or len(records) < Lookback + 1:
return None, None, None, None
records = records[:-1]
records = records[-Lookback:]
df = pd.DataFrame([{
"open": float(r["Open"]),
"high": float(r["High"]),
"low": float(r["Low"]),
"close": float(r["Close"]),
"volume": float(r["Volume"]),
} for r in records])
timestamps = pd.Series([pd.Timestamp(r["Time"], unit="ms") for r in records])
interval = timestamps.iloc[-1] - timestamps.iloc[-2]
future_ts = pd.Series([timestamps.iloc[-1] + interval * (i + 1) for i in range(PredLen)])
last_ts_ms = int(records[-1]["Time"])
return df, timestamps, future_ts, last_ts_ms
def kronos_multi_sample(df, x_ts, y_ts):
paths = []
for _ in range(SampleCount):
pred_df = predictor.predict(
df=df, x_timestamp=x_ts, y_timestamp=y_ts,
pred_len=PredLen, T=Temperature, top_p=TopP,
sample_count=1, verbose=False,
)
paths.append(pred_df["close"].values)
return np.array(paths)
def calc_signal(current_close, paths):
"""趋势跟踪信号:预测中位数方向 + 利润空间判断"""
endpoints = paths[:, -1]
pred_median = float(np.median(endpoints))
pred_low = float(np.percentile(endpoints, 10))
pred_high = float(np.percentile(endpoints, 90))
# 方向一致性
path_returns = (endpoints - current_close) / current_close
bullish_count = int(np.sum(path_returns > 0))
bearish_count = int(np.sum(path_returns < 0))
consistency = float(max(bullish_count, bearish_count)) / len(path_returns)
# 预测中位变化幅度
pct_change = (pred_median - current_close) / current_close
threshold = SignalThreshold / 100.0
con_threshold = ConsistencyThreshold / 100.0
# 信号判定:中位数方向有足够利润空间 + 路径一致性达标
if pct_change > threshold and consistency >= con_threshold:
signal = "long"
elif pct_change < -threshold and consistency >= con_threshold:
signal = "short"
else:
signal = "hold"
return signal, pct_change, pred_median, pred_low, pred_high, consistency
def calc_position_size(equity, entry, stop):
"""风险定额仓位:每笔最大亏损 = 权益 × RiskPerTrade%"""
risk_dist = abs(entry - stop)
if risk_dist < 1e-8:
return 0.0
max_loss = equity * (RiskPerTrade / 100.0)
size_value = max_loss / risk_dist * entry
# 不超过权益的 MaxPositionPct%
max_value = equity * (MaxPositionPct / 100.0)
return min(size_value, max_value)
def calc_stop_price(current_close, signal, pred_low, pred_high):
"""计算止损价:使用预测区间边界,但不小于 MinStopPct%"""
min_stop_dist = current_close * (MinStopPct / 100.0)
if signal == "long":
raw_stop = pred_low
stop = min(raw_stop, current_close - min_stop_dist)
else:
raw_stop = pred_high
stop = max(raw_stop, current_close + min_stop_dist)
return round_price(stop)
def check_max_drawdown(equity):
global peak_equity, halted
if equity > peak_equity:
peak_equity = equity
if peak_equity > 0:
drawdown = (peak_equity - equity) / peak_equity * 100
if drawdown > MaxDrawdownPct:
halted = True
Log("触发最大回撤熔断 | 峰值:", str(_N(peak_equity, 2)),
"| 当前:", str(_N(equity, 2)),
"| 回撤:", str(_N(drawdown, 2)) + "%")
return True
return False
def record_trade_result(exit_price, was_long):
global consecutive_losses, halted
if entry_price <= 0:
return
if was_long:
is_loss = exit_price < entry_price
else:
is_loss = exit_price > entry_price
if is_loss:
consecutive_losses += 1
if consecutive_losses >= MaxConsecutiveLoss:
halted = True
Log("连续亏损", consecutive_losses, "次,触发熔断")
else:
consecutive_losses = 0
def do_close_position(current_price, ts_ms, reason):
"""统一平仓:一把全清"""
global current_position, entry_price, target_price, stop_price, bars_held
global total_trades, bars_since_close
if current_position == 0:
return
was_long = current_position > 0
if TradeMode == 0:
if was_long:
paper_sell(current_price)
else:
paper_close_short(current_price)
else:
executed = False
if is_futures:
amount = _get_position_amount(0 if was_long else 1)
if amount < 0:
Log("警告: 持仓查询失败,跳过平仓操作")
return
elif amount == 0:
Log("警告: 交易所无持仓记录,同步清除本地仓位状态")
elif was_long:
exchange.SetDirection("closebuy")
exchange.Sell(-1, amount)
executed = True
else:
exchange.SetDirection("closesell")
exchange.Buy(-1, amount)
executed = True
else:
account = exchange.GetAccount()
if account:
sell_amount = round_amount(float(account["Stocks"]))
if sell_amount >= min_amount:
exchange.Sell(-1, sell_amount)
executed = True
else:
Log("警告: 账户查询失败,跳过平仓操作")
return
# 盈亏
if was_long:
pnl_pct = (current_price - entry_price) / entry_price * 100
else:
pnl_pct = (entry_price - current_price) / entry_price * 100
record_trade_result(current_price, was_long)
total_trades += 1
direction_label = "平多" if was_long else "平空"
trade_history.append({"time": _D(), "signal": direction_label, "price": current_price,
"reason": reason, "pnl_pct": _N(pnl_pct, 2)})
Log(reason, direction_label, "| 价格:", str(round_price(current_price)),
"| 盈亏:", str(_N(pnl_pct, 2)) + "%",
"| 持仓:", str(bars_held), "根K线")
if chart:
chart.add(4, [ts_ms, current_price])
current_position = 0.0
entry_price = 0.0
target_price = 0.0
stop_price = 0.0
bars_held = 0
bars_since_close = 0
def _get_position_amount(direction_type):
pos = exchange.GetPosition()
if pos is None:
return -1
if len(pos) == 0:
return 0.0
for p in pos:
if int(p.get("Type", -1)) == direction_type:
return round_amount(float(p["Amount"]))
if len(pos) == 1:
return round_amount(float(pos[0]["Amount"]))
return 0.0
def paper_buy(pos_value, current_price):
"""纸面交易买入"""
global paper_balance, paper_stocks
buy_value = min(pos_value, paper_balance)
if buy_value < 1:
return 0.0
amount = buy_value / current_price * 0.999
paper_balance -= buy_value
paper_stocks += amount
return amount
def paper_sell(current_price):
"""纸面交易平多:全部卖出"""
global paper_balance, paper_stocks
if paper_stocks <= 0:
return 0.0
sell_amount = paper_stocks
paper_balance += sell_amount * current_price * 0.999
paper_stocks = 0.0
return sell_amount
def paper_open_short(pos_value, current_price):
"""纸面交易开空"""
global paper_balance, paper_stocks
margin_value = min(pos_value, paper_balance)
if margin_value < 1:
return 0.0
amount = margin_value / current_price * 0.999
paper_balance -= margin_value
paper_stocks -= amount
return amount
def paper_close_short(current_price):
"""纸面交易平空:全部平仓"""
global paper_balance, paper_stocks
abs_stocks = abs(paper_stocks)
if abs_stocks < 1e-10:
return 0.0
pnl = (entry_price - current_price) * abs_stocks
returned = abs_stocks * entry_price + pnl
if returned < 0:
returned = 0
Log("纸面交易空头爆仓: 亏损超过保证金")
paper_balance += returned * 0.999
paper_stocks = 0.0
return abs_stocks
def open_position(signal, current_price, pred_median, pred_low, pred_high, equity, ts_ms):
"""开仓:一把开,设定目标价和止损价"""
global current_position, entry_price, target_price, stop_price, bars_held, total_trades, bars_since_close
# 计算止损价
stop = calc_stop_price(current_price, signal, pred_low, pred_high)
# 计算仓位大小(风险定额)
pos_value = calc_position_size(equity, current_price, stop)
if pos_value < 1:
Log("仓位过小,跳过开仓 | 信号:", signal)
return
if signal == "long":
if TradeMode == 0:
amount = paper_buy(pos_value, current_price)
if amount <= 0:
return
else:
buy_amount = round_amount(pos_value / current_price)
if buy_amount < min_amount:
return
if is_futures:
exchange.SetDirection("buy")
exchange.Buy(round_price(current_price * 1.002), buy_amount)
current_position = 1.0
entry_price = current_price
target_price = round_price(pred_median)
stop_price = stop
bars_held = 0
total_trades += 1
bars_since_close = -1
trade_history.append({"time": _D(), "signal": "开多", "price": current_price,
"reason": "信号", "target": target_price, "stop": stop_price})
Log("开多 | 价格:", str(round_price(current_price)),
"| 目标:", str(target_price), "| 止损:", str(stop_price),
"| 仓位价值:", str(_N(pos_value, 2)), "USDT")
if chart:
chart.add(3, [ts_ms, current_price])
elif signal == "short":
if not is_futures:
return
if TradeMode == 0:
amount = paper_open_short(pos_value, current_price)
if amount <= 0:
return
else:
sell_amount = round_amount(pos_value / current_price)
if sell_amount < min_amount:
return
exchange.SetDirection("sell")
exchange.Sell(round_price(current_price * 0.998), sell_amount)
current_position = -1.0
entry_price = current_price
target_price = round_price(pred_median)
stop_price = stop
bars_held = 0
total_trades += 1
bars_since_close = -1
trade_history.append({"time": _D(), "signal": "开空", "price": current_price,
"reason": "信号", "target": target_price, "stop": stop_price})
Log("开空 | 价格:", str(round_price(current_price)),
"| 目标:", str(target_price), "| 止损:", str(stop_price),
"| 仓位价值:", str(_N(pos_value, 2)), "USDT")
if chart:
chart.add(4, [ts_ms, current_price])
def check_exit_conditions(current_price, signal, consistency, ts_ms):
"""检查平仓条件:止盈、止损、到期、信号反转"""
if current_position == 0:
return
was_long = current_position > 0
con_threshold = ConsistencyThreshold / 100.0
# 1. 止盈:到达目标价
if was_long and current_price >= target_price:
do_close_position(current_price, ts_ms, "止盈")
return
if not was_long and current_price <= target_price:
do_close_position(current_price, ts_ms, "止盈")
return
# 2. 止损:到达止损价
if was_long and current_price <= stop_price:
do_close_position(current_price, ts_ms, "止损")
return
if not was_long and current_price >= stop_price:
do_close_position(current_price, ts_ms, "止损")
return
# 3. 到期:持仓超过预测步数,强制平仓
if bars_held >= PredLen:
do_close_position(current_price, ts_ms, "到期")
return
# 4. 信号反转:新预测方向与持仓方向相反,且一致性达标
if signal is not None and consistency is not None:
if was_long and signal == "short" and consistency >= con_threshold:
do_close_position(current_price, ts_ms, "反转")
return
if not was_long and signal == "long" and consistency >= con_threshold:
do_close_position(current_price, ts_ms, "反转")
return
def update_chart(current_price, pred_median, pred_low, pred_high, ts_ms):
if chart is None:
return
chart.add(0, [ts_ms, current_price])
chart.add(1, [ts_ms, pred_median])
chart.add(2, [ts_ms, pred_low, pred_high])
def build_status(current_price, signal, pct, pred_median, pred_low, pred_high,
consistency, elapsed, equity, profit_pct):
signal_text = {"long": "LONG", "short": "SHORT", "hold": "HOLD"}
signal_cn = {"long": "做多", "short": "做空", "hold": "观望"}
mode = "纸面交易" if TradeMode == 0 else "实盘交易"
ex_type = "期货" if is_futures else "现货"
if current_position > 0:
pos_label = "多头持仓"
elif current_position < 0:
pos_label = "空头持仓"
else:
pos_label = "空仓"
halt_text = " [已熔断]" if halted else ""
# 持仓盈亏
hold_pnl = ""
if current_position != 0 and entry_price > 0:
if current_position > 0:
pnl = (current_price - entry_price) / entry_price * 100
else:
pnl = (entry_price - current_price) / entry_price * 100
hold_pnl = str(_N(pnl, 2)) + "%"
table1 = {
"type": "table",
"title": "Kronos 预测信号",
"cols": ["项目", "数值"],
"rows": [
["交易模式", mode + " (" + ex_type + ")" + halt_text],
["仓位状态", pos_label],
["建仓价", str(_N(entry_price, 2)) if entry_price > 0 else "-"],
["目标价(止盈)", str(_N(target_price, 2)) if target_price > 0 else "-"],
["止损价", str(_N(stop_price, 2)) if stop_price > 0 else "-"],
["持仓盈亏", hold_pnl if hold_pnl else "-"],
["持仓K线数", str(bars_held) + " / " + str(PredLen) if current_position != 0 else "-"],
["当前价格", str(_N(current_price, 2))],
["预测中位价", str(_N(pred_median, 2))],
["预测区间(P10-P90)", str(_N(pred_low, 2)) + " ~ " + str(_N(pred_high, 2))],
["预期变化", str(_N(pct * 100, 2)) + "%"],
["方向一致性", str(_N(consistency * 100, 1)) + "% (" + str(SampleCount) + "路径)"],
["当前信号", signal_cn[signal]],
]
}
dd = 0.0
if peak_equity > 0:
dd = (peak_equity - equity) / peak_equity * 100
table2 = {
"type": "table",
"title": "账户与风控",
"cols": ["项目", "数值"],
"rows": [
["交易对", str(Symbol)],
["总资产", str(_N(equity, 2)) + " USDT"],
["累计收益", str(_N(profit_pct, 2)) + "%"],
["峰值资产", str(_N(peak_equity, 2)) + " USDT"],
["当前回撤", str(_N(dd, 2)) + "% / " + str(_N(MaxDrawdownPct, 1)) + "%"],
["连续亏损", str(consecutive_losses) + " / " + str(MaxConsecutiveLoss)],
["冷却剩余", str(max(0, CooldownBars - bars_since_close)) + " 根" if current_position == 0 else "持仓中"],
["单笔风险", str(_N(RiskPerTrade, 1)) + "% 权益"],
["模型", "Kronos-" + str(ModelSize)],
["推理耗时", str(_N(elapsed, 1)) + "s"],
["运行周期", str(cycle_count)],
["累计交易", str(total_trades) + " 次"],
]
}
table3 = {
"type": "table",
"title": "最近交易 (最新 10 条)",
"cols": ["时间", "方向", "价格", "盈亏", "原因"],
"rows": []
}
for t in trade_history[-10:][::-1]:
reason = t.get("reason", "信号")
pnl_str = str(t.get("pnl_pct", "-")) + "%" if "pnl_pct" in t else "-"
table3["rows"].append([
str(t["time"]),
str(t["signal"]),
str(_N(t["price"], 2)),
pnl_str,
reason,
])
if not table3["rows"]:
table3["rows"].append(["暂无交易记录", "-", "-", "-", "-"])
LogStatus(
_D() + " | " + mode + halt_text +
" | " + pos_label +
" | 信号: " + signal_text[signal] +
" | 预期: " + str(_N(pct * 100, 2)) + "%" +
" | 收益: " + str(_N(profit_pct, 2)) + "%\n" +
"`" + json.dumps([table1, table2, table3]) + "`"
)
def main():
global cycle_count, init_equity, peak_equity, bars_since_close, bars_held, halted
global paper_balance, paper_stocks, paper_init_equity
global Lookback, PredLen, SampleCount, Temperature, TopP, SignalThreshold, TradeMode
global RiskPerTrade, MaxPositionPct, MinStopPct
global MaxDrawdownPct, MaxConsecutiveLoss, CooldownBars, ConsistencyThreshold
global PricePrecision, AmountPrecision, ResetOnStart, PaperInitBalance
global last_predict_ts_ms
# 参数类型转换
Lookback = int(Lookback)
PredLen = int(PredLen)
SampleCount = int(SampleCount)
Temperature = float(Temperature)
TopP = float(TopP)
SignalThreshold = float(SignalThreshold)
TradeMode = int(TradeMode)
RiskPerTrade = float(RiskPerTrade)
MaxPositionPct = float(MaxPositionPct)
MinStopPct = float(MinStopPct)
MaxDrawdownPct = float(MaxDrawdownPct)
MaxConsecutiveLoss = int(MaxConsecutiveLoss)
CooldownBars = int(CooldownBars)
ConsistencyThreshold = float(ConsistencyThreshold)
PricePrecision = int(PricePrecision)
AmountPrecision = int(AmountPrecision)
ResetOnStart = int(ResetOnStart)
PaperInitBalance = float(PaperInitBalance)
if SampleCount < 2:
SampleCount = 2
Log("采样次数已自动调整为 2")
exchange.SetCurrency(str(Symbol))
detect_exchange_type()
init_precision()
init_kronos()
init_chart()
if ResetOnStart == 1:
reset_all()
Log("已根据参数重置所有数据")
elif not restore_state():
if TradeMode == 0:
paper_balance = PaperInitBalance
paper_stocks = 0.0
paper_init_equity = PaperInitBalance
init_equity = PaperInitBalance
else:
init_equity = get_equity()
peak_equity = init_equity
Log("=== Kronos AI 预测策略启动(全新) ===")
Log("初始资产:", str(_N(init_equity, 2)), "USDT")
else:
validate_state()
Log("=== Kronos AI 预测策略启动(已恢复) ===")
Log("交易模式:", "纸面交易" if TradeMode == 0 else "实盘交易",
"| 交易所类型:", "期货" if is_futures else "现货")
Log("交易对:", str(Symbol), "| K 线周期:", str(KlinePeriod),
"| 历史:", Lookback, "根 | 预测:", PredLen, "根")
Log("采样:", SampleCount, "条 | 信号阈值: ±" + str(SignalThreshold) + "%",
"| 单笔风险:", str(RiskPerTrade) + "% | 最大仓位:", str(MaxPositionPct) + "%",
"| 最小止损:", str(MinStopPct) + "%",
"| 回撤熔断:", str(MaxDrawdownPct) + "% | 连亏熔断:", MaxConsecutiveLoss, "次",
"| 冷却:", CooldownBars, "根")
# 缓存最新预测结果(用于持仓中的信号反转检查)
last_signal = "hold"
last_consistency = 0.0
last_pct = 0.0
last_pred_median = 0.0
last_pred_low = 0.0
last_pred_high = 0.0
last_elapsed = 0.0
while True:
try:
cmd = GetCommand()
if cmd:
Log("收到指令:", cmd)
if cmd.startswith("cmdUnhalt"):
halted = False
consecutive_losses = 0
if TradeMode == 0:
peak_equity = get_paper_equity(float(exchange.GetTicker()["Last"]))
else:
peak_equity = get_equity()
Log("已手动解除熔断 | 连续亏损已清零 | 峰值已重置为:", str(_N(peak_equity, 2)))
elif cmd.startswith("cmdForcePredict"):
last_predict_ts_ms = 0
Log("手动触发预测,将在本轮执行")
# 获取当前价格
ticker = exchange.GetTicker()
if ticker is None:
Sleep(5 * 1000)
continue
current_price = float(ticker["Last"])
# 持仓同步
sync_position()
# 计算资产
if TradeMode == 0:
equity = get_paper_equity(current_price)
else:
equity = get_equity()
# 最大回撤检查
check_max_drawdown(equity)
# 持仓中:每轮检查止盈止损(不等新 K 线)
if current_position != 0:
ts_now_ms = int(time.time() * 1000)
check_exit_conditions(current_price, None, None, ts_now_ms)
# 拉取 K 线
result = get_klines()
if result[0] is None:
LogStatus(_D() + " | 等待 K 线数据... 需要 " + str(Lookback) + " 根(已收盘)")
Sleep(30 * 1000)
continue
df, x_ts, y_ts, ts_ms = result
# 只在新 K 线时执行预测
if ts_ms != last_predict_ts_ms:
last_predict_ts_ms = ts_ms
# 多路径采样预测
t0 = time.time()
paths = kronos_multi_sample(df, x_ts, y_ts)
last_elapsed = time.time() - t0
last_close = float(df.iloc[-1]["close"])
last_signal, last_pct, last_pred_median, last_pred_low, last_pred_high, last_consistency = calc_signal(last_close, paths)
cycle_count += 1
# 持仓计数
if current_position != 0:
bars_held += 1
else:
bars_since_close += 1
# 更新图表
update_chart(current_price, last_pred_median, last_pred_low, last_pred_high, ts_ms)
# 持仓中:检查信号反转和到期
if current_position != 0:
check_exit_conditions(current_price, last_signal, last_consistency, ts_ms)
# 空仓时:检查是否开仓
if current_position == 0 and not halted and last_signal != "hold" and bars_since_close >= CooldownBars:
open_position(last_signal, current_price, last_pred_median, last_pred_low, last_pred_high, equity, ts_ms)
# 收益
ref_equity = paper_init_equity if TradeMode == 0 else init_equity
profit_pct = ((equity - ref_equity) / ref_equity * 100) if ref_equity > 0 else 0.0
LogProfit(equity - ref_equity)
# 状态栏
build_status(current_price, last_signal, last_pct, last_pred_median, last_pred_low, last_pred_high,
last_consistency, last_elapsed, equity, profit_pct)
# 日志
direction = {"long": "买入", "short": "卖出", "hold": "观望"}
pos_label = "多头" if current_position > 0 else ("空头" if current_position < 0 else "空仓")
Log("第", cycle_count, "轮 |", direction[last_signal],
"| 仓位:", pos_label,
"| 预期:", str(_N(last_pct * 100, 2)) + "%",
"| 收益:", str(_N(profit_pct, 2)) + "%",
"| 耗时:", str(_N(last_elapsed, 1)) + "s")
save_state()
except Exception as e:
Log("异常:", str(e))
import traceback
Log(traceback.format_exc())
LogStatus(_D() + " | 异常: " + str(e))
Sleep(30 * 1000)