TradFi多品种网格(纯网格版)


创建日期: 2026-05-11 15:02:11 最后修改: 2026-05-11 15:59:18
复制: 21 点击次数: 107
avatar of ianzeng123 ianzeng123
2
关注
469
关注者
策略源码
"""
TradFi 多品种网格策略(纯网格版)
================================================================
核心逻辑:低买高卖,循环往复。
 
【选品逻辑】只做一件事:找波动最大的品种
  波动得分 = 过去N根日线的平均日振幅(%)
  选出波动最大的 TOP_N 个品种,在它们身上建网格
  波动大 = 每天穿越格子次数多 = 止盈次数多 = 赚得多
 
【网格逻辑】
  - 在 [当前价×(1-LOWER_RANGE), 当前价×(1+UPPER_RANGE)] 区间内等比建格
  - 每格间距 GRID_RATIO
  - 每格开仓 GRID_VALUE USDT
  - 价格下穿格子:买入开多
  - 价格上穿格子:平多止盈
  - 止盈后自动在原位重挂买单,循环
 
【换仓逻辑】
  - 每隔 REBALANCE_HOURS 重新计算各品种波动得分
  - 新品种波动得分 > 当前持仓品种 × (1+HYSTERESIS) 才触发换仓
  - 换仓 = 平掉旧品种所有仓位 + 在新品种上建新网格
 
【资金分配】
  - 各品种平均分配总资金
  - 每格金额 = GRID_VALUE(固定,不随品种波动调整)
"""
 
import json
import time
 
# ═══════════════════════════════
#  参数默认值(界面未配置时使用)
# ═══════════════════════════════
try:    TOP_N
except: TOP_N = 3
 
try:    REBALANCE_HOURS
except: REBALANCE_HOURS = 48
 
try:    HYSTERESIS
except: HYSTERESIS = 0.20
 
try:    LEVERAGE
except: LEVERAGE = 3
 
try:    GRID_RATIO
except: GRID_RATIO = 0.015
 
try:    GRID_VALUE
except: GRID_VALUE = 50
 
try:    LOWER_RANGE
except: LOWER_RANGE = 0.10
 
try:    STOP_LOSS_RATIO
except: STOP_LOSS_RATIO = 0.3
 
try:    LOOP_INTERVAL
except: LOOP_INTERVAL = 30
 
try:    KLINE_COUNT
except: KLINE_COUNT = 20
 
try:    CATEGORY_FILTER
except: CATEGORY_FILTER = "ALL"
 
try:    EXCLUDE_SYMBOLS
except: EXCLUDE_SYMBOLS = "NATGAS,CRCL"
 
# EXCLUDE_SYMBOLS 字符串解析为列表
if isinstance(EXCLUDE_SYMBOLS, str):
    EXCLUDE_SYMBOLS = [s.strip().upper() for s in EXCLUDE_SYMBOLS.split(',') if s.strip()]
 
# ═══════════════════════════════
#  全局状态
# ═══════════════════════════════
g_state          = "RUN"
g_all_tradfi     = []       # 全部TradFi品种
g_active         = []       # 当前持仓品种列表
g_states         = {}       # 每个品种的网格状态
g_init_equity    = 0.0
g_total_profit   = 0.0
g_last_rebalance = 0
g_score_log      = []       # 上次评分结果
 
 
# ═══════════════════════════════
#  精度工具
# ═══════════════════════════════
 
def _floor_to_step(val, step):
    """将 val 向下对齐到 step 的整数倍,避免浮点误差"""
    if step <= 0: return val
    # 用字符串解析 step 的小数位数,避免 0.01*3=0.030000000000000002
    s = f"{step:.10f}".rstrip('0')
    dec = len(s.split('.')[1]) if '.' in s else 0
    result = int(val / step) * step
    return round(result, dec)
 
def fp(price, sym):
    """价格对齐到 tickSz 整数倍"""
    tick = g_states.get(sym, {}).get("tick", 0)
    if tick > 0:
        return _floor_to_step(price, tick)
    return round(price, g_states.get(sym, {}).get("pp", 2))
 
def fa(amt, sym):
    """数量对齐到 lotSz 整数倍,且不低于 minSz"""
    if amt <= 0: return 0
    lot   = g_states.get(sym, {}).get("lot", 0)
    min_s = g_states.get(sym, {}).get("mq", 0.001)
    if lot > 0:
        r = _floor_to_step(amt, lot)
    else:
        r = _floor_to_step(amt, min_s)
    return r if r >= min_s else 0
 
def usdt_to_ct(usdt, price, sym):
    cv = g_states.get(sym, {}).get("cv", 1)
    return usdt / (cv * price) if cv * price > 0 else 0
 
def min_qty(sym):
    return g_states.get(sym, {}).get("mq", 0.001)
 
 
# ═══════════════════════════════
#  品种扫描
# ═══════════════════════════════
 
def scan_tradfi():
    """
    过滤逻辑:instCategory != 1 且以 USDT.swap 结尾
    (instCategory=1 是普通加密货币永续,非1 = TradFi品种)
    """
    global g_all_tradfi
    try:
        markets = exchange.GetMarkets()
        result  = []
        for sym, mkt in markets.items():
            if not sym.endswith("USDT.swap"):
                continue
            try:
                info          = mkt.get("Info") or {}
                inst_category = int(info.get("instCategory", 1))
                if inst_category == 1:
                    continue
            except:
                continue
            # 提取 base(SPY_USDT.swap → SPY)
            base = sym.replace("_USDT.swap", "").replace("USDT.swap", "")
            if base in EXCLUDE_SYMBOLS:
                continue
            cat = (mkt.get("Info") or {}).get("underlyingType", "EQUITY")
            if CATEGORY_FILTER != "ALL" and cat != CATEGORY_FILTER:
                continue
            result.append({"sym": sym, "base": base, "cat": cat})
        g_all_tradfi = result
        Log(f"扫描到 {len(result)} 个TradFi品种: {[r['base'] for r in result]}")
    except Exception as e:
        Log(f"扫描异常: {e}")
 
 
# ═══════════════════════════════
#  选品:只看波动大小
# ═══════════════════════════════
 
def to_swap(sym):
    """
    sym 从 GetMarkets 拿到时已经是 SPY_USDT.swap 格式
    直接返回即可
    """
    return sym
 
 
def score_symbol(info):
    """
    唯一指标:过去N根日线的平均日振幅(%)
    振幅 = (High - Low) / Close × 100
    振幅越大 = 每天穿越格子次数越多 = 网格赚得越多
    """
    try:
        sc = to_swap(info["sym"])   # SPYUSDT → SPY_USDT.swap
        bars = exchange.GetRecords(sc, PERIOD_D1, KLINE_COUNT + 2)
        Sleep(150)
        if not bars or len(bars) < 3:
            return None
        bars = bars[-KLINE_COUNT:]
        atr_pcts = [(b["High"] - b["Low"]) / b["Close"] * 100 for b in bars if b["Close"] > 0]
        if not atr_pcts:
            return None
        avg_atr = sum(atr_pcts) / len(atr_pcts)
        # 过滤:日振幅必须至少是网格间距的1.5倍,否则网格根本触发不了
        if avg_atr < GRID_RATIO * 100 * 1.5:
            return None
        return {
            "sym":   info["sym"],
            "base":  info["base"],
            "cat":   info["cat"],
            "atr":   round(avg_atr, 3),
            "price": bars[-1]["Close"],
        }
    except Exception as e:
        Log(f"评分 {info['sym']} 异常: {e}")
        return None
 
 
def select_top():
    global g_score_log
    Log(f"开始选品评分({len(g_all_tradfi)}个品种)...")
    scored = []
    for info in g_all_tradfi:
        r = score_symbol(info)
        if r:
            scored.append(r)
    scored.sort(key=lambda x: x["atr"], reverse=True)
    g_score_log = scored
    selected = scored[:TOP_N]
    Log(f"通过筛选: {len(scored)}个,选出: {[s['base']+'('+str(s['atr'])+'%)' for s in selected]}")
    return selected
 
 
# ═══════════════════════════════
#  换仓磁滞判断
# ═══════════════════════════════
 
def needs_rebalance(new_selected):
    """
    只有当新品种的ATR显著高于现有品种时才换仓
    避免因为微小波动频繁换仓(换仓本身有手续费成本)
    """
    if not g_active:
        return True, new_selected
 
    # 当前持仓品种的ATR
    cur_scores = {s["sym"]: s["atr"] for s in g_score_log if s["sym"] in g_active}
 
    truly_new  = []  # 确实要引入的新品种
    keep_syms  = set(g_active)  # 继续持有的品种
 
    for s in new_selected:
        if s["sym"] in g_active:
            truly_new.append(s)
            continue
        # 找当前持仓中ATR最低的(候选被替换的)
        if cur_scores:
            weakest_atr = min(cur_scores.values())
        else:
            weakest_atr = 0
        threshold = weakest_atr * (1 + HYSTERESIS)
        if s["atr"] >= threshold:
            truly_new.append(s)
            Log(f"  ✅ {s['base']} ATR={s['atr']:.2f}% > 阈值={threshold:.2f}%,触发换仓")
        else:
            Log(f"  🚫 {s['base']} ATR={s['atr']:.2f}% < 阈值={threshold:.2f}%,磁滞保持")
            # 保持当前对应品种不换
            pass
 
    exit_syms  = [s for s in g_active if s not in [x["sym"] for x in truly_new]]
    enter_syms = [s["sym"] for s in truly_new if s["sym"] not in g_active]
 
    has_change = bool(exit_syms or enter_syms)
    return has_change, truly_new
 
 
# ═══════════════════════════════
#  订单工具
# ═══════════════════════════════
 
def cancel_sym(sym):
    try:
        sc = to_swap(sym)
        orders = exchange.GetOrders(sc)
        if orders:
            for o in orders:
                exchange.CancelOrder(o["Id"])
                Sleep(100)
    except Exception as e:
        Log(f"[{sym}] 撤单异常: {e}")
 
def check_order(oid):
    if not oid: return "unknown", 0, 0
    try:
        o = exchange.GetOrder(oid)
        if not o: return "unknown", 0, 0
        st = o.get("Status", -1)
        deal = o.get("DealAmount", 0) or 0
        avgp = o.get("AvgPrice", 0) or o.get("Price", 0) or 0
        if st == 1:      return "filled",    deal, avgp
        if st == 2:      return "cancelled", deal, avgp
        if st in (0, 3): return "active",    deal, avgp
        return "unknown", deal, avgp
    except:
        return "unknown", 0, 0
 
def buy_open(sym, price, usdt):
    sc  = to_swap(sym)
    ct  = fa(usdt_to_ct(usdt, price, sym), sym)
    if ct <= 0 or ct < min_qty(sym): return None
    return exchange.CreateOrder(sc, "buy", fp(price, sym), ct)
 
def sell_close(sym, price, ct):
    sc = to_swap(sym)
    fc = fa(ct, sym)
    if fc <= 0: return None
    return exchange.CreateOrder(sc, "closebuy", fp(price, sym), fc)
 
def close_all(sym):
    """平掉某品种所有多仓"""
    cancel_sym(sym)
    Sleep(500)
    try:
        sc = to_swap(sym)
        positions = exchange.GetPosition()
        if not positions: return
        for pos in positions:
            if pos.get("Symbol") != sc: continue
            if pos["Type"] == 0 and pos["Amount"] > 0:
                price = g_states[sym].get("price", 0)
                if price > 0:
                    exchange.CreateOrder(sc, "closebuy", fp(price * 0.999, sym), fa(pos["Amount"], sym))
                    Sleep(300)
    except Exception as e:
        Log(f"[{sym}] 平仓异常: {e}")
 
 
# ═══════════════════════════════
#  网格初始化
# ═══════════════════════════════
 
def init_sym_state(sym, score_info):
    try:
        markets = exchange.GetMarkets()
        sc  = to_swap(sym)          # SPY_USDT.swap 格式
        mkt = markets.get(sc) or {}
        info = mkt.get("Info") or {}
 
        # 从 Info 里读原始精度字段(比 FMZ 封装更准确)
        tick = float(info.get("tickSz", 0) or 0)   # 价格最小变动单位
        lot  = float(info.get("lotSz",  0) or 0)   # 数量最小变动单位
        mq   = float(info.get("minSz",  0) or 0)   # 最小下单量
        if mq  <= 0: mq  = mkt.get("MinQty", 0.001)
        if tick <= 0: tick = mkt.get("TickSize", 0)
        if lot  <= 0: lot  = mq
 
        g_states[sym] = {
            "sym":   sym,
            "sc":    sc,
            "base":  score_info["base"],
            "price": score_info["price"],
            "pp":    mkt.get("PricePrecision", 4),
            "ap":    mkt.get("AmountPrecision", 2),
            "tick":  tick,   # tickSz:价格对齐单位
            "lot":   lot,    # lotSz:数量对齐单位
            "mq":    mq,     # minSz:最小下单量
            "cv":    mkt.get("CtVal", 1),
            "grids": [],
            "profit": 0.0,
            "trades": 0,
            "atr":   score_info["atr"],
        }
        Log(f"[{sym}] 精度: tick={tick}  lot={lot}  minSz={mq}  ctVal={mkt.get('CtVal',1)}")
    except Exception as e:
        Log(f"初始化 {sym} 状态异常: {e}")
 
 
def build_grid(sym, price):
    low   = round(price * (1 - LOWER_RANGE), g_states[sym]["pp"])
    # 仅做多网格:上边界只是止盈挂单用,设为当前价 + 一格止盈空间即可
    # 用 LOWER_RANGE 同比例作为向上止盈空间(对称)
    high  = round(price * (1 + LOWER_RANGE), g_states[sym]["pp"])
    grids = []
    p = low
    while p <= high * 1.001:
        grids.append(round(p, g_states[sym]["pp"]))
        p = p * (1 + GRID_RATIO)
    grids = sorted(set(grids))
 
    grid_list = []
    for i in range(len(grids) - 1):
        buy_p  = grids[i]
        sell_p = grids[i + 1]
        g = {
            "bp": buy_p, "sp": sell_p,
            "status": "empty",
            "buy_oid": None, "sell_oid": None,
            "ct": 0, "fp": 0,
        }
        if buy_p < price:
            oid = buy_open(sym, buy_p, GRID_VALUE)
            Sleep(300)   # 避免 Too Many Requests
            if oid:
                g["buy_oid"] = oid
                g["status"]  = "pending_buy"
            else:
                g["status"]  = "skip"
        else:
            g["status"] = "above"
        grid_list.append(g)
 
    g_states[sym]["grids"] = grid_list
    pend = sum(1 for g in grid_list if g["status"] == "pending_buy")
    Log(f"[{sym}] 网格建立: {len(grid_list)}格  挂单{pend}张  区间[{low},{high}]  间距{GRID_RATIO*100:.1f}%")
 
 
# ═══════════════════════════════
#  网格同步(核心:低买高卖)
# ═══════════════════════════════
 
def sync(sym):
    global g_total_profit
    state  = g_states[sym]
    grids  = state["grids"]
    cv     = state["cv"]
 
    sc = to_swap(sym)
    ticker = exchange.GetTicker(sc)
    if not ticker: return
    price  = ticker.Last
    state["price"] = price
 
    for g in grids:
        st = g["status"]
 
        # ── 挂单等成交 → 成交后挂止盈 ──
        if st == "pending_buy":
            s, deal, avgp = check_order(g["buy_oid"])
            if s == "filled":
                ct  = fa(deal, sym) if deal > 0 else fa(usdt_to_ct(GRID_VALUE, g["bp"], sym), sym)
                fpx = avgp if avgp > 0 else g["bp"]
                g.update({"buy_oid": None, "ct": ct, "fp": fpx})
                oid = sell_close(sym, g["sp"], ct)
                g["sell_oid"] = oid
                g["status"]   = "pending_sell" if oid else "hold"
            elif s == "cancelled":
                g["buy_oid"] = None
                g["status"]  = "empty"
 
        # ── 止盈等成交 → 成交后重挂买单 ──
        elif st == "pending_sell":
            s, deal, avgp = check_order(g["sell_oid"])
            if s == "filled":
                fpx    = avgp if avgp > 0 else g["sp"]
                profit = g["ct"] * cv * (fpx - g["fp"])
                state["profit"] += profit
                state["trades"] += 1
                g_total_profit  += profit
                g.update({"sell_oid": None, "ct": 0, "fp": 0, "status": "empty"})
                # 重挂买单,循环
                if g["bp"] < price and g_state == "RUN":
                    oid = buy_open(sym, g["bp"], GRID_VALUE)
                    if oid:
                        g["buy_oid"] = oid
                        g["status"]  = "pending_buy"
            elif s == "cancelled":
                g["sell_oid"] = None
                ct = g.get("ct", 0)
                if ct > 0:
                    oid = sell_close(sym, g["sp"], ct)
                    g["sell_oid"] = oid if oid else None
                else:
                    g["status"] = "empty"
 
        # ── 空格:价格回落到格子下方时补挂 ──
        elif st in ("empty", "above", "skip"):
            if g["bp"] < price and g_state == "RUN" and not g.get("buy_oid"):
                oid = buy_open(sym, g["bp"], GRID_VALUE)
                if oid:
                    g["buy_oid"] = oid
                    g["status"]  = "pending_buy"
 
        # ── 持仓但止盈单挂失败 → 重试 ──
        elif st == "hold":
            ct = g.get("ct", 0)
            if ct > 0:
                oid = sell_close(sym, g["sp"], ct)
                if oid:
                    g["sell_oid"] = oid
                    g["status"]   = "pending_sell"
            else:
                g["status"] = "empty"
 
 
# ═══════════════════════════════
#  换仓
# ═══════════════════════════════
 
def do_rebalance():
    global g_active, g_last_rebalance
 
    Log("=" * 50)
    Log("开始换仓评估...")
 
    new_selected = select_top()
    if not new_selected:
        Log("无品种通过筛选,保持现状")
        g_last_rebalance = int(time.time())
        return
 
    has_change, truly_new = needs_rebalance(new_selected)
    if not has_change:
        Log("无需换仓(磁滞保护)")
        g_last_rebalance = int(time.time())
        return
 
    new_syms  = [s["sym"] for s in truly_new]
    exit_syms = [s for s in g_active if s not in new_syms]
 
    for sym in exit_syms:
        Log(f"退出品种: {sym}")
        close_all(sym)
        g_active = [s for s in g_active if s != sym]
 
    Sleep(1000)
 
    for s in truly_new:
        if s["sym"] not in g_active:
            Log(f"进入品种: {s['sym']}")
            init_sym_state(s["sym"], s)
            build_grid(s["sym"], s["price"])
            g_active.append(s["sym"])
 
    g_last_rebalance = int(time.time())
    Log(f"换仓完成 → 当前持仓: {g_active}")
    Log("=" * 50)
 
 
# ═══════════════════════════════
#  止损检查
# ═══════════════════════════════
 
def check_stop():
    global g_state
    if STOP_LOSS_RATIO <= 0 or g_init_equity <= 0: return
    try:
        acc = exchange.GetAccount()
        if not acc: return
        loss = (g_init_equity - acc.Equity) / g_init_equity
        if loss < STOP_LOSS_RATIO: return
        Log(f"★ 触发止损!亏损={loss*100:.1f}% → 全部平仓停止")
        for sym in list(g_active):
            close_all(sym)
        g_state = "STOP"
    except Exception as e:
        Log(f"止损检查异常: {e}")
 
 
# ═══════════════════════════════
#  状态面板
# ═══════════════════════════════
 
def show():
    try:
        acc    = exchange.GetAccount()
        equity = acc.Equity if acc else 0
        profit = equity - g_init_equity
        ppct   = profit / g_init_equity * 100 if g_init_equity > 0 else 0
        next_h = max(0, REBALANCE_HOURS - (time.time() - g_last_rebalance) / 3600) if g_last_rebalance else 0
 
        main = {
            "type": "table",
            "title": f"TradFi多品种网格 | {'✅运行中' if g_state=='RUN' else '🛑已停止'} | 下次换仓:{next_h:.1f}h后",
            "cols": ["项目", "数值", "项目", "数值"],
            "rows": [
                ["账户权益", f"${equity:.2f}", "初始权益", f"${g_init_equity:.2f}"],
                ["总盈亏",   f"{profit:+.2f}U ({ppct:+.2f}%)", "网格总收益", f"{g_total_profit:+.4f}U"],
                ["持仓品种", str([g_states[s]["base"] for s in g_active]), "换仓间隔", f"{REBALANCE_HOURS}h"],
                ["TOP_N",   str(TOP_N), "磁滞阈值", f"{HYSTERESIS*100:.0f}%"],
                ["杠杆",    f"{LEVERAGE}x", "止损",  f"亏损{STOP_LOSS_RATIO*100:.0f}%" if STOP_LOSS_RATIO > 0 else "禁用"],
            ]
        }
 
        sym_rows = []
        for sym in g_active:
            st   = g_states.get(sym, {})
            grids= st.get("grids", [])
            pend = sum(1 for g in grids if g["status"] == "pending_buy")
            hold = sum(1 for g in grids if g["status"] == "pending_sell")
            sym_rows.append([
                st.get("base", sym),
                f"${st.get('price', 0):.2f}",
                f"ATR {st.get('atr', 0):.2f}%",
                f"{len(grids)}格",
                f"{pend}挂/{hold}持",
                f"{st.get('profit', 0):+.4f}U({st.get('trades', 0)}次)",
            ])
 
        sym_tbl = {
            "type": "table",
            "title": "持仓品种",
            "cols": ["品种", "现价", "日振幅", "格数", "挂单/持仓", "本品收益"],
            "rows": sym_rows or [["暂无", "-", "-", "-", "-", "-"]],
        }
 
        score_rows = []
        for i, s in enumerate(g_score_log[:10]):
            mark = "★" if s["sym"] in g_active else " "
            score_rows.append([f"{mark}#{i+1}", s["base"], s["cat"], f"{s['atr']:.2f}%"])
 
        score_tbl = {
            "type": "table",
            "title": "波动排行榜(★=当前持仓)",
            "cols": ["排名", "品种", "类型", "日均振幅"],
            "rows": score_rows or [["待评分", "-", "-", "-"]],
        }
 
        LogProfit(profit, "&")
        LogStatus(
            "`" + json.dumps(main,      ensure_ascii=False) + "`\n"
            "`" + json.dumps(sym_tbl,   ensure_ascii=False) + "`\n"
            "`" + json.dumps(score_tbl, ensure_ascii=False) + "`"
        )
    except Exception as e:
        Log(f"面板异常: {e}")
 
 
# ═══════════════════════════════
#  主入口
# ═══════════════════════════════
 
def main():
    global g_init_equity, g_state
 
    SetErrorFilter("502:|503:|tcp|character|unexpected|network|timeout|WSARecv|Connect|GetAddr|no such|reset|http|received|EOF|reused|Unknown")
 
    exchange.SetMarginLevel(LEVERAGE)
 
    acc = exchange.GetAccount()
    Sleep(300)
    if acc:
        g_init_equity = acc.Equity
    Log(f"初始权益=${g_init_equity:.2f}  杠杆={LEVERAGE}x  格距={GRID_RATIO*100:.1f}%  格值=${GRID_VALUE}")
 
    # 扫描品种
    scan_tradfi()
    if not g_all_tradfi:
        raise Exception("未找到TradFi品种,请检查交易所配置")
 
    # 首次选品
    selected = select_top()
    if not selected:
        raise Exception("无品种通过筛选,请检查参数(GRID_RATIO是否太大)")
 
    # 建立初始网格
    for s in selected:
        init_sym_state(s["sym"], s)
        build_grid(s["sym"], s["price"])
        g_active.append(s["sym"])
 
    g_last_rebalance = int(time.time())
    Log(f"启动完成 → 持仓品种: {[g_states[s]['base'] for s in g_active]}")
 
    while True:
        if g_state == "STOP":
            Sleep(60000)
            continue
 
        # 定时换仓
        if (time.time() - g_last_rebalance) >= REBALANCE_HOURS * 3600:
            try:
                do_rebalance()
            except Exception as e:
                Log(f"换仓异常: {e}")
 
        # 止损
        check_stop()
        if g_state == "STOP":
            continue
 
        # 各品种网格同步
        for sym in list(g_active):
            try:
                sync(sym)
            except Exception as e:
                Log(f"[{sym}] 同步异常: {e}")
            Sleep(200)
 
        show()
        Sleep(LOOP_INTERVAL * 1000)