策略源码
"""
TradFi 多品种网格策略(纯网格版)
================================================================
核心逻辑:低买高卖,循环往复。
【选品逻辑】只做一件事:找波动最大的品种
波动得分 = 过去N根日线的平均日振幅(%)
选出波动最大的 TOP_N 个品种,在它们身上建网格
波动大 = 每天穿越格子次数多 = 止盈次数多 = 赚得多
【网格逻辑】
- 在 [当前价×(1-LOWER_RANGE), 当前价×(1+UPPER_RANGE)] 区间内等比建格
- 每格间距 GRID_RATIO
- 每格开仓 GRID_VALUE USDT
- 价格下穿格子:买入开多
- 价格上穿格子:平多止盈
- 止盈后自动在原位重挂买单,循环
【换仓逻辑】
- 每隔 REBALANCE_HOURS 重新计算各品种波动得分
- 新品种波动得分 > 当前持仓品种 × (1+HYSTERESIS) 才触发换仓
- 换仓 = 平掉旧品种所有仓位 + 在新品种上建新网格
【资金分配】
- 各品种平均分配总资金
- 每格金额 = GRID_VALUE(固定,不随品种波动调整)
"""
import json
import time
# ═══════════════════════════════
# 参数默认值(界面未配置时使用)
# ═══════════════════════════════
try: TOP_N
except: TOP_N = 3
try: REBALANCE_HOURS
except: REBALANCE_HOURS = 48
try: HYSTERESIS
except: HYSTERESIS = 0.20
try: LEVERAGE
except: LEVERAGE = 3
try: GRID_RATIO
except: GRID_RATIO = 0.015
try: GRID_VALUE
except: GRID_VALUE = 50
try: LOWER_RANGE
except: LOWER_RANGE = 0.10
try: STOP_LOSS_RATIO
except: STOP_LOSS_RATIO = 0.3
try: LOOP_INTERVAL
except: LOOP_INTERVAL = 30
try: KLINE_COUNT
except: KLINE_COUNT = 20
try: CATEGORY_FILTER
except: CATEGORY_FILTER = "ALL"
try: EXCLUDE_SYMBOLS
except: EXCLUDE_SYMBOLS = "NATGAS,CRCL"
# EXCLUDE_SYMBOLS 字符串解析为列表
if isinstance(EXCLUDE_SYMBOLS, str):
EXCLUDE_SYMBOLS = [s.strip().upper() for s in EXCLUDE_SYMBOLS.split(',') if s.strip()]
# ═══════════════════════════════
# 全局状态
# ═══════════════════════════════
g_state = "RUN"
g_all_tradfi = [] # 全部TradFi品种
g_active = [] # 当前持仓品种列表
g_states = {} # 每个品种的网格状态
g_init_equity = 0.0
g_total_profit = 0.0
g_last_rebalance = 0
g_score_log = [] # 上次评分结果
# ═══════════════════════════════
# 精度工具
# ═══════════════════════════════
def _floor_to_step(val, step):
"""将 val 向下对齐到 step 的整数倍,避免浮点误差"""
if step <= 0: return val
# 用字符串解析 step 的小数位数,避免 0.01*3=0.030000000000000002
s = f"{step:.10f}".rstrip('0')
dec = len(s.split('.')[1]) if '.' in s else 0
result = int(val / step) * step
return round(result, dec)
def fp(price, sym):
"""价格对齐到 tickSz 整数倍"""
tick = g_states.get(sym, {}).get("tick", 0)
if tick > 0:
return _floor_to_step(price, tick)
return round(price, g_states.get(sym, {}).get("pp", 2))
def fa(amt, sym):
"""数量对齐到 lotSz 整数倍,且不低于 minSz"""
if amt <= 0: return 0
lot = g_states.get(sym, {}).get("lot", 0)
min_s = g_states.get(sym, {}).get("mq", 0.001)
if lot > 0:
r = _floor_to_step(amt, lot)
else:
r = _floor_to_step(amt, min_s)
return r if r >= min_s else 0
def usdt_to_ct(usdt, price, sym):
cv = g_states.get(sym, {}).get("cv", 1)
return usdt / (cv * price) if cv * price > 0 else 0
def min_qty(sym):
return g_states.get(sym, {}).get("mq", 0.001)
# ═══════════════════════════════
# 品种扫描
# ═══════════════════════════════
def scan_tradfi():
"""
过滤逻辑:instCategory != 1 且以 USDT.swap 结尾
(instCategory=1 是普通加密货币永续,非1 = TradFi品种)
"""
global g_all_tradfi
try:
markets = exchange.GetMarkets()
result = []
for sym, mkt in markets.items():
if not sym.endswith("USDT.swap"):
continue
try:
info = mkt.get("Info") or {}
inst_category = int(info.get("instCategory", 1))
if inst_category == 1:
continue
except:
continue
# 提取 base(SPY_USDT.swap → SPY)
base = sym.replace("_USDT.swap", "").replace("USDT.swap", "")
if base in EXCLUDE_SYMBOLS:
continue
cat = (mkt.get("Info") or {}).get("underlyingType", "EQUITY")
if CATEGORY_FILTER != "ALL" and cat != CATEGORY_FILTER:
continue
result.append({"sym": sym, "base": base, "cat": cat})
g_all_tradfi = result
Log(f"扫描到 {len(result)} 个TradFi品种: {[r['base'] for r in result]}")
except Exception as e:
Log(f"扫描异常: {e}")
# ═══════════════════════════════
# 选品:只看波动大小
# ═══════════════════════════════
def to_swap(sym):
"""
sym 从 GetMarkets 拿到时已经是 SPY_USDT.swap 格式
直接返回即可
"""
return sym
def score_symbol(info):
"""
唯一指标:过去N根日线的平均日振幅(%)
振幅 = (High - Low) / Close × 100
振幅越大 = 每天穿越格子次数越多 = 网格赚得越多
"""
try:
sc = to_swap(info["sym"]) # SPYUSDT → SPY_USDT.swap
bars = exchange.GetRecords(sc, PERIOD_D1, KLINE_COUNT + 2)
Sleep(150)
if not bars or len(bars) < 3:
return None
bars = bars[-KLINE_COUNT:]
atr_pcts = [(b["High"] - b["Low"]) / b["Close"] * 100 for b in bars if b["Close"] > 0]
if not atr_pcts:
return None
avg_atr = sum(atr_pcts) / len(atr_pcts)
# 过滤:日振幅必须至少是网格间距的1.5倍,否则网格根本触发不了
if avg_atr < GRID_RATIO * 100 * 1.5:
return None
return {
"sym": info["sym"],
"base": info["base"],
"cat": info["cat"],
"atr": round(avg_atr, 3),
"price": bars[-1]["Close"],
}
except Exception as e:
Log(f"评分 {info['sym']} 异常: {e}")
return None
def select_top():
global g_score_log
Log(f"开始选品评分({len(g_all_tradfi)}个品种)...")
scored = []
for info in g_all_tradfi:
r = score_symbol(info)
if r:
scored.append(r)
scored.sort(key=lambda x: x["atr"], reverse=True)
g_score_log = scored
selected = scored[:TOP_N]
Log(f"通过筛选: {len(scored)}个,选出: {[s['base']+'('+str(s['atr'])+'%)' for s in selected]}")
return selected
# ═══════════════════════════════
# 换仓磁滞判断
# ═══════════════════════════════
def needs_rebalance(new_selected):
"""
只有当新品种的ATR显著高于现有品种时才换仓
避免因为微小波动频繁换仓(换仓本身有手续费成本)
"""
if not g_active:
return True, new_selected
# 当前持仓品种的ATR
cur_scores = {s["sym"]: s["atr"] for s in g_score_log if s["sym"] in g_active}
truly_new = [] # 确实要引入的新品种
keep_syms = set(g_active) # 继续持有的品种
for s in new_selected:
if s["sym"] in g_active:
truly_new.append(s)
continue
# 找当前持仓中ATR最低的(候选被替换的)
if cur_scores:
weakest_atr = min(cur_scores.values())
else:
weakest_atr = 0
threshold = weakest_atr * (1 + HYSTERESIS)
if s["atr"] >= threshold:
truly_new.append(s)
Log(f" ✅ {s['base']} ATR={s['atr']:.2f}% > 阈值={threshold:.2f}%,触发换仓")
else:
Log(f" 🚫 {s['base']} ATR={s['atr']:.2f}% < 阈值={threshold:.2f}%,磁滞保持")
# 保持当前对应品种不换
pass
exit_syms = [s for s in g_active if s not in [x["sym"] for x in truly_new]]
enter_syms = [s["sym"] for s in truly_new if s["sym"] not in g_active]
has_change = bool(exit_syms or enter_syms)
return has_change, truly_new
# ═══════════════════════════════
# 订单工具
# ═══════════════════════════════
def cancel_sym(sym):
try:
sc = to_swap(sym)
orders = exchange.GetOrders(sc)
if orders:
for o in orders:
exchange.CancelOrder(o["Id"])
Sleep(100)
except Exception as e:
Log(f"[{sym}] 撤单异常: {e}")
def check_order(oid):
if not oid: return "unknown", 0, 0
try:
o = exchange.GetOrder(oid)
if not o: return "unknown", 0, 0
st = o.get("Status", -1)
deal = o.get("DealAmount", 0) or 0
avgp = o.get("AvgPrice", 0) or o.get("Price", 0) or 0
if st == 1: return "filled", deal, avgp
if st == 2: return "cancelled", deal, avgp
if st in (0, 3): return "active", deal, avgp
return "unknown", deal, avgp
except:
return "unknown", 0, 0
def buy_open(sym, price, usdt):
sc = to_swap(sym)
ct = fa(usdt_to_ct(usdt, price, sym), sym)
if ct <= 0 or ct < min_qty(sym): return None
return exchange.CreateOrder(sc, "buy", fp(price, sym), ct)
def sell_close(sym, price, ct):
sc = to_swap(sym)
fc = fa(ct, sym)
if fc <= 0: return None
return exchange.CreateOrder(sc, "closebuy", fp(price, sym), fc)
def close_all(sym):
"""平掉某品种所有多仓"""
cancel_sym(sym)
Sleep(500)
try:
sc = to_swap(sym)
positions = exchange.GetPosition()
if not positions: return
for pos in positions:
if pos.get("Symbol") != sc: continue
if pos["Type"] == 0 and pos["Amount"] > 0:
price = g_states[sym].get("price", 0)
if price > 0:
exchange.CreateOrder(sc, "closebuy", fp(price * 0.999, sym), fa(pos["Amount"], sym))
Sleep(300)
except Exception as e:
Log(f"[{sym}] 平仓异常: {e}")
# ═══════════════════════════════
# 网格初始化
# ═══════════════════════════════
def init_sym_state(sym, score_info):
try:
markets = exchange.GetMarkets()
sc = to_swap(sym) # SPY_USDT.swap 格式
mkt = markets.get(sc) or {}
info = mkt.get("Info") or {}
# 从 Info 里读原始精度字段(比 FMZ 封装更准确)
tick = float(info.get("tickSz", 0) or 0) # 价格最小变动单位
lot = float(info.get("lotSz", 0) or 0) # 数量最小变动单位
mq = float(info.get("minSz", 0) or 0) # 最小下单量
if mq <= 0: mq = mkt.get("MinQty", 0.001)
if tick <= 0: tick = mkt.get("TickSize", 0)
if lot <= 0: lot = mq
g_states[sym] = {
"sym": sym,
"sc": sc,
"base": score_info["base"],
"price": score_info["price"],
"pp": mkt.get("PricePrecision", 4),
"ap": mkt.get("AmountPrecision", 2),
"tick": tick, # tickSz:价格对齐单位
"lot": lot, # lotSz:数量对齐单位
"mq": mq, # minSz:最小下单量
"cv": mkt.get("CtVal", 1),
"grids": [],
"profit": 0.0,
"trades": 0,
"atr": score_info["atr"],
}
Log(f"[{sym}] 精度: tick={tick} lot={lot} minSz={mq} ctVal={mkt.get('CtVal',1)}")
except Exception as e:
Log(f"初始化 {sym} 状态异常: {e}")
def build_grid(sym, price):
low = round(price * (1 - LOWER_RANGE), g_states[sym]["pp"])
# 仅做多网格:上边界只是止盈挂单用,设为当前价 + 一格止盈空间即可
# 用 LOWER_RANGE 同比例作为向上止盈空间(对称)
high = round(price * (1 + LOWER_RANGE), g_states[sym]["pp"])
grids = []
p = low
while p <= high * 1.001:
grids.append(round(p, g_states[sym]["pp"]))
p = p * (1 + GRID_RATIO)
grids = sorted(set(grids))
grid_list = []
for i in range(len(grids) - 1):
buy_p = grids[i]
sell_p = grids[i + 1]
g = {
"bp": buy_p, "sp": sell_p,
"status": "empty",
"buy_oid": None, "sell_oid": None,
"ct": 0, "fp": 0,
}
if buy_p < price:
oid = buy_open(sym, buy_p, GRID_VALUE)
Sleep(300) # 避免 Too Many Requests
if oid:
g["buy_oid"] = oid
g["status"] = "pending_buy"
else:
g["status"] = "skip"
else:
g["status"] = "above"
grid_list.append(g)
g_states[sym]["grids"] = grid_list
pend = sum(1 for g in grid_list if g["status"] == "pending_buy")
Log(f"[{sym}] 网格建立: {len(grid_list)}格 挂单{pend}张 区间[{low},{high}] 间距{GRID_RATIO*100:.1f}%")
# ═══════════════════════════════
# 网格同步(核心:低买高卖)
# ═══════════════════════════════
def sync(sym):
global g_total_profit
state = g_states[sym]
grids = state["grids"]
cv = state["cv"]
sc = to_swap(sym)
ticker = exchange.GetTicker(sc)
if not ticker: return
price = ticker.Last
state["price"] = price
for g in grids:
st = g["status"]
# ── 挂单等成交 → 成交后挂止盈 ──
if st == "pending_buy":
s, deal, avgp = check_order(g["buy_oid"])
if s == "filled":
ct = fa(deal, sym) if deal > 0 else fa(usdt_to_ct(GRID_VALUE, g["bp"], sym), sym)
fpx = avgp if avgp > 0 else g["bp"]
g.update({"buy_oid": None, "ct": ct, "fp": fpx})
oid = sell_close(sym, g["sp"], ct)
g["sell_oid"] = oid
g["status"] = "pending_sell" if oid else "hold"
elif s == "cancelled":
g["buy_oid"] = None
g["status"] = "empty"
# ── 止盈等成交 → 成交后重挂买单 ──
elif st == "pending_sell":
s, deal, avgp = check_order(g["sell_oid"])
if s == "filled":
fpx = avgp if avgp > 0 else g["sp"]
profit = g["ct"] * cv * (fpx - g["fp"])
state["profit"] += profit
state["trades"] += 1
g_total_profit += profit
g.update({"sell_oid": None, "ct": 0, "fp": 0, "status": "empty"})
# 重挂买单,循环
if g["bp"] < price and g_state == "RUN":
oid = buy_open(sym, g["bp"], GRID_VALUE)
if oid:
g["buy_oid"] = oid
g["status"] = "pending_buy"
elif s == "cancelled":
g["sell_oid"] = None
ct = g.get("ct", 0)
if ct > 0:
oid = sell_close(sym, g["sp"], ct)
g["sell_oid"] = oid if oid else None
else:
g["status"] = "empty"
# ── 空格:价格回落到格子下方时补挂 ──
elif st in ("empty", "above", "skip"):
if g["bp"] < price and g_state == "RUN" and not g.get("buy_oid"):
oid = buy_open(sym, g["bp"], GRID_VALUE)
if oid:
g["buy_oid"] = oid
g["status"] = "pending_buy"
# ── 持仓但止盈单挂失败 → 重试 ──
elif st == "hold":
ct = g.get("ct", 0)
if ct > 0:
oid = sell_close(sym, g["sp"], ct)
if oid:
g["sell_oid"] = oid
g["status"] = "pending_sell"
else:
g["status"] = "empty"
# ═══════════════════════════════
# 换仓
# ═══════════════════════════════
def do_rebalance():
global g_active, g_last_rebalance
Log("=" * 50)
Log("开始换仓评估...")
new_selected = select_top()
if not new_selected:
Log("无品种通过筛选,保持现状")
g_last_rebalance = int(time.time())
return
has_change, truly_new = needs_rebalance(new_selected)
if not has_change:
Log("无需换仓(磁滞保护)")
g_last_rebalance = int(time.time())
return
new_syms = [s["sym"] for s in truly_new]
exit_syms = [s for s in g_active if s not in new_syms]
for sym in exit_syms:
Log(f"退出品种: {sym}")
close_all(sym)
g_active = [s for s in g_active if s != sym]
Sleep(1000)
for s in truly_new:
if s["sym"] not in g_active:
Log(f"进入品种: {s['sym']}")
init_sym_state(s["sym"], s)
build_grid(s["sym"], s["price"])
g_active.append(s["sym"])
g_last_rebalance = int(time.time())
Log(f"换仓完成 → 当前持仓: {g_active}")
Log("=" * 50)
# ═══════════════════════════════
# 止损检查
# ═══════════════════════════════
def check_stop():
global g_state
if STOP_LOSS_RATIO <= 0 or g_init_equity <= 0: return
try:
acc = exchange.GetAccount()
if not acc: return
loss = (g_init_equity - acc.Equity) / g_init_equity
if loss < STOP_LOSS_RATIO: return
Log(f"★ 触发止损!亏损={loss*100:.1f}% → 全部平仓停止")
for sym in list(g_active):
close_all(sym)
g_state = "STOP"
except Exception as e:
Log(f"止损检查异常: {e}")
# ═══════════════════════════════
# 状态面板
# ═══════════════════════════════
def show():
try:
acc = exchange.GetAccount()
equity = acc.Equity if acc else 0
profit = equity - g_init_equity
ppct = profit / g_init_equity * 100 if g_init_equity > 0 else 0
next_h = max(0, REBALANCE_HOURS - (time.time() - g_last_rebalance) / 3600) if g_last_rebalance else 0
main = {
"type": "table",
"title": f"TradFi多品种网格 | {'✅运行中' if g_state=='RUN' else '🛑已停止'} | 下次换仓:{next_h:.1f}h后",
"cols": ["项目", "数值", "项目", "数值"],
"rows": [
["账户权益", f"${equity:.2f}", "初始权益", f"${g_init_equity:.2f}"],
["总盈亏", f"{profit:+.2f}U ({ppct:+.2f}%)", "网格总收益", f"{g_total_profit:+.4f}U"],
["持仓品种", str([g_states[s]["base"] for s in g_active]), "换仓间隔", f"{REBALANCE_HOURS}h"],
["TOP_N", str(TOP_N), "磁滞阈值", f"{HYSTERESIS*100:.0f}%"],
["杠杆", f"{LEVERAGE}x", "止损", f"亏损{STOP_LOSS_RATIO*100:.0f}%" if STOP_LOSS_RATIO > 0 else "禁用"],
]
}
sym_rows = []
for sym in g_active:
st = g_states.get(sym, {})
grids= st.get("grids", [])
pend = sum(1 for g in grids if g["status"] == "pending_buy")
hold = sum(1 for g in grids if g["status"] == "pending_sell")
sym_rows.append([
st.get("base", sym),
f"${st.get('price', 0):.2f}",
f"ATR {st.get('atr', 0):.2f}%",
f"{len(grids)}格",
f"{pend}挂/{hold}持",
f"{st.get('profit', 0):+.4f}U({st.get('trades', 0)}次)",
])
sym_tbl = {
"type": "table",
"title": "持仓品种",
"cols": ["品种", "现价", "日振幅", "格数", "挂单/持仓", "本品收益"],
"rows": sym_rows or [["暂无", "-", "-", "-", "-", "-"]],
}
score_rows = []
for i, s in enumerate(g_score_log[:10]):
mark = "★" if s["sym"] in g_active else " "
score_rows.append([f"{mark}#{i+1}", s["base"], s["cat"], f"{s['atr']:.2f}%"])
score_tbl = {
"type": "table",
"title": "波动排行榜(★=当前持仓)",
"cols": ["排名", "品种", "类型", "日均振幅"],
"rows": score_rows or [["待评分", "-", "-", "-"]],
}
LogProfit(profit, "&")
LogStatus(
"`" + json.dumps(main, ensure_ascii=False) + "`\n"
"`" + json.dumps(sym_tbl, ensure_ascii=False) + "`\n"
"`" + json.dumps(score_tbl, ensure_ascii=False) + "`"
)
except Exception as e:
Log(f"面板异常: {e}")
# ═══════════════════════════════
# 主入口
# ═══════════════════════════════
def main():
global g_init_equity, g_state
SetErrorFilter("502:|503:|tcp|character|unexpected|network|timeout|WSARecv|Connect|GetAddr|no such|reset|http|received|EOF|reused|Unknown")
exchange.SetMarginLevel(LEVERAGE)
acc = exchange.GetAccount()
Sleep(300)
if acc:
g_init_equity = acc.Equity
Log(f"初始权益=${g_init_equity:.2f} 杠杆={LEVERAGE}x 格距={GRID_RATIO*100:.1f}% 格值=${GRID_VALUE}")
# 扫描品种
scan_tradfi()
if not g_all_tradfi:
raise Exception("未找到TradFi品种,请检查交易所配置")
# 首次选品
selected = select_top()
if not selected:
raise Exception("无品种通过筛选,请检查参数(GRID_RATIO是否太大)")
# 建立初始网格
for s in selected:
init_sym_state(s["sym"], s)
build_grid(s["sym"], s["price"])
g_active.append(s["sym"])
g_last_rebalance = int(time.time())
Log(f"启动完成 → 持仓品种: {[g_states[s]['base'] for s in g_active]}")
while True:
if g_state == "STOP":
Sleep(60000)
continue
# 定时换仓
if (time.time() - g_last_rebalance) >= REBALANCE_HOURS * 3600:
try:
do_rebalance()
except Exception as e:
Log(f"换仓异常: {e}")
# 止损
check_stop()
if g_state == "STOP":
continue
# 各品种网格同步
for sym in list(g_active):
try:
sync(sym)
except Exception as e:
Log(f"[{sym}] 同步异常: {e}")
Sleep(200)
show()
Sleep(LOOP_INTERVAL * 1000)