本策略是一款专为震荡行情设计的动态区间网格量化交易策略,适用于黄金(PAXG/USDC)等波动率较低的资产永续合约交易。策略核心思路是:先识别震荡区间,再在区间内布设网格低买高卖,通过高频套利累积收益。区间一旦失效(止损或涨破),立即全部清仓并等待下一次机会,避免在趋势行情中持续损耗。
传统网格用ATR判断震荡,存在逻辑矛盾(ATR越小区间越窄越难确认)。本策略改用三重独立过滤器:
三重全部通过,才确认为有效震荡区间并激活网格。
连续2根M1 K线收盘价突破区间上沿,视为上涨趋势确立,全部止盈清仓,等待新的震荡区间形成再重新入场。
若格子空仓状态持续超过150根主K线,自动触发重建流程,防止区间已失效但策略仍在运行的情况。
止损/涨破重置后,K线缓冲完全清空,需重新积累足够K线才能再次确认区间,防止旧数据导致立即复活同一失效区间。
| 参数 | 默认值 | 说明 |
|---|---|---|
KLINE_TYPE |
600秒 | 主K线周期(区间识别用) |
ATR_PERIOD |
30 | ATR计算周期 |
RANGE_CONFIRM_BARS |
20 | 区间识别所需K线根数 |
LEVERAGE |
10 | 杠杆倍数 |
MAX_GRIDS |
10 | 最大格子数量 |
FEE_RATE |
万3 | 单边手续费率 |
FEE_PROFIT_MULTI |
3.0 | 格距最少覆盖双边手续费倍数 |
STOP_BUFFER_RATIO |
0.8 | 止损缓冲:下沿 - ATR × 此值 |
STOP_CONFIRM_BARS |
3 | 连续几根M1 K线收盘价破位才止损 |
BREAK_TOP_BARS |
2 | 连续几根M1 K线收盘价超上沿才重建 |
STOP_TRAIL_ENABLE |
True | 止损线动态跟踪(只降不升) |
SLOPE_THRESHOLD |
0.04%/根 | 趋势斜率阈值 |
BOX_PCT |
±0.8% | 百分比箱体宽度 |
BOX_MIN_RATIO |
75% | 箱体内K线占比阈值 |
EMPTY_TIMEOUT_BARS |
150 | 空仓超时重建阈值(根K线) |
SCANNING(扫描)
├─ 积累K线 → 三重过滤判断震荡区间
└─ 三重通过 → 激活网格 → ACTIVE
ACTIVE(运行)
├─ 每根主K线:更新ATR + 动态止损线
├─ 每根M1 K线:检查止损 / 涨破
├─ 每轮轮询:同步格子挂单状态
└─ 触发重置 → 撤单 + 平仓 + 清空缓冲 → SCANNING
适合:黄金、稳定币等低波动率资产,震荡盘整行情
不适合:强趋势行情(策略会频繁止损,建议降低仓位或暂停运行)
⚠️ 本策略为量化研究用途,回测不代表实盘表现。合约交易含杠杆,存在爆仓风险,请在充分了解风险后合理控制仓位使用。
"""
黄金动态网格测试策略
========================================================
【核心逻辑流程】
─────────────────────────────────────────────────────
SCANNING(扫描)
│
├─ 每根新K线:更新 kline_buffer、ATR
│
├─ 三重过滤判断是否为震荡区间:
│ ① 线性回归斜率 < SLOPE_THRESHOLD(无趋势)
│ ② 最近HH_LL_BARS根K线高低点不同时单调(不单边)
│ ③ 最近RANGE_CONFIRM_BARS根收盘价≥BOX_MIN_RATIO在±BOX_PCT箱内
│
└─ 三重全通过 → activate_grids() → STATE_ACTIVE
ACTIVE(运行)
│
├─ 每根新主K线:update_stop_price()(动态止损只降不升)
│
├─ 每根新M1 K线:check_break_and_top()
│ ├─ 连续STOP_CONFIRM_BARS根M1收盘价 < 止损线 → 止损重置
│ ├─ 连续BREAK_TOP_BARS根M1收盘价 > 上沿 → 涨破重置
│ └─ 空仓超过EMPTY_TIMEOUT_BARS根K线 → 超时重置
│
└─ 每轮:sync_grid_orders()(格子状态机)
├─ pending_buy → 检查成交 → 成交则挂止盈
├─ pending_sell → 检查成交 → 成交则重挂买单
└─ empty/skip → 价格低于买价则重新挂单
任何重置(reset_to_scanning):
├─ 撤所有挂单
├─ 平所有持仓
├─ 清空 grids / 区间参数 / 计数器
└─ ★ 清空 kline_buffer / m1_kline_buffer(重新积累,不复用旧数据)
【止损改进说明】
① 用M1收盘价确认,不用Tick,过滤噪声
② STOP_CONFIRM_BARS 根连续K线确认才触发
③ STOP_BUFFER_RATIO=0.8,比原版(0.3)更宽
④ 动态跟踪:每根新K线用最新ATR重算止损线(只降不升)
【区间识别说明】
不依赖ATR判断震荡(ATR小→区间窄→更难确认,逻辑矛盾)
改用三重过滤:斜率+结构+箱体,区间边界用真实最高/最低价
━━━ 参数说明 ━━━
STOP_BUFFER_RATIO : 止损线 = 下沿 - ATR × 此值
STOP_CONFIRM_BARS : 连续多少根M1 K线收盘价 < 止损线才触发
BREAK_TOP_BARS : 连续多少根M1 K线收盘价 > 上沿才触发重建
STOP_TRAIL_ENABLE : 止损线动态跟踪(True=只降不升)
SLOPE_THRESHOLD : 线性回归斜率阈值(相对价格比例)
BOX_PCT : 百分比箱体宽度(±比例)
HH_LL_BARS : 高低点结构判断所用K线数
BOX_MIN_RATIO : 箱内K线比例阈值
"""
import json
import math
# ═══════════════════════════════════════════
# 用户参数
# ═══════════════════════════════════════════
FEE_RATE = 0.0003 # 单边手续费率(万3)
FEE_PROFIT_MULTI = 3.0 # 格距至少覆盖双边手续费的倍数
MAX_GRIDS = 10 # 最大格数上限
ATR_PERIOD = 30 # ATR 周期(根)
RANGE_CONFIRM_BARS = 20 # 区间确认所需K线数
EMPTY_TIMEOUT_BARS = 150 # 空仓超时→重建(根K线)
LEVERAGE = 10 # 杠杆
KLINE_TYPE = 600 # K线周期(秒)
POLL_INTERVAL = 1000 # 轮询间隔 ms
INIT_FUNDING = 0 # 初始金额
# ── 止损相关参数 ──────────────────────────────────────────
STOP_BUFFER_RATIO = 0.8 # 止损缓冲:下沿 - ATR × 此值
STOP_CONFIRM_BARS = 3 # 连续N根M1 K线收盘价 < 止损线才触发
BREAK_TOP_BARS = 2 # 连续N根M1 K线收盘价 > 上沿才触发重建
STOP_TRAIL_ENABLE = True # 止损线动态跟踪(只降不升)
# ── 区间识别三重过滤参数 ────────────────────────────────────
SLOPE_THRESHOLD = 0.0004 # 线性回归斜率阈值(相对价格比例,0.05%/根)
BOX_PCT = 0.008 # 百分比箱体宽度 ±0.8%
HH_LL_BARS = 10 # 判断高低点结构所用K线数
BOX_MIN_RATIO = 0.75 # 至少75%的K线收盘价在箱体内
# ═══════════════════════════════════════════
# 状态常量
# ═══════════════════════════════════════════
STATE_SCANNING = "SCANNING"
STATE_ACTIVE = "ACTIVE"
# ═══════════════════════════════════════════
# 全局状态
# ═══════════════════════════════════════════
state = STATE_SCANNING
current_atr = 0.0
# 当前区间信息
range_low = 0.0
range_high = 0.0
stop_price = 0.0
grid_prices = []
grid_usdt = []
# 每格订单状态
grids = []
# 区间识别
empty_bars_count = 0
# 止损/涨破 K线确认计数
stop_confirm_count = 0
break_top_count = 0
# 三重过滤实时诊断数据(供状态面板使用)
diag = {
"slope": 0.0,
"slope_rel": 0.0,
"slope_ok": False,
"h_monotone": False,
"l_monotone": False,
"structure_ok": False,
"box_lo": 0.0,
"box_hi": 0.0,
"in_box": 0,
"in_box_total": 0,
"in_box_pct": 0.0,
"box_ok": False,
"center": 0.0,
"failed": [],
}
Funding = 0.0
kline_buffer = [] # 主K线缓冲(用于区间识别、ATR计算)
last_closed_ktime = 0
last_kline_count = 0
# M1 K线缓冲(专门用于止损/涨破确认)
m1_kline_buffer = []
m1_last_closed_ktime = 0
positions_cache = []
assets = {"USDT": {"total_balance": 0, "margin_balance": 0}}
symbol = ""
base_currency = ""
# ═══════════════════════════════════════════
# 信号处理
# ═══════════════════════════════════════════
def compute_atr(klines):
if len(klines) < ATR_PERIOD + 1:
return None
recent = klines[-(ATR_PERIOD + 1):]
trs = []
for i in range(1, len(recent)):
h, l, pc = recent[i][2], recent[i][3], recent[i - 1][4]
trs.append(max(h - l, abs(h - pc), abs(l - pc)))
return sum(trs) / len(trs)
def fetch_klines():
"""拉取主K线,仅在有新已收盘K线时返回True"""
global kline_buffer, last_closed_ktime, last_kline_count
swapcode = symbol + ".swap"
bars = exchange.GetRecords(swapcode, KLINE_TYPE, ATR_PERIOD + 60)
Sleep(500)
if not bars or len(bars) < 2:
return False
closed = bars[:-1] # 去掉未收盘的最后一根
t = closed[-1]["Time"]
if t == last_closed_ktime:
return False # 没有新K线
last_closed_ktime = t
kline_buffer = [
[b["Time"], b["Open"], b["High"], b["Low"], b["Close"], b["Volume"]]
for b in closed
]
last_kline_count = len(kline_buffer)
return True
def fetch_m1_klines():
"""拉取M1 K线,仅在有新已收盘K线时返回True"""
global m1_kline_buffer, m1_last_closed_ktime
swapcode = symbol + ".swap"
bars = exchange.GetRecords(swapcode, 60, max(STOP_CONFIRM_BARS, BREAK_TOP_BARS) + 10)
Sleep(500)
if not bars or len(bars) < 2:
return False
closed = bars[:-1]
t = closed[-1]["Time"]
if t == m1_last_closed_ktime:
return False
m1_last_closed_ktime = t
m1_kline_buffer = [
[b["Time"], b["Open"], b["High"], b["Low"], b["Close"], b["Volume"]]
for b in closed
]
return True
# ═══════════════════════════════════════════
# 精度 / 换算
# ═══════════════════════════════════════════
def fp(price):
try:
return round(price, assets[base_currency]["PricePrecision"])
except:
return price
def fa(amount):
try:
if amount <= 0:
return 0
r = round(amount, assets[base_currency]["AmountPrecision"])
return r if r > 0 else 0
except:
return 0
def usdt_to_contracts(usdt, price):
try:
return usdt / (assets[base_currency]["ctVal"] * price)
except:
return 0
def contracts_to_usdt(contracts, price):
try:
return contracts * assets[base_currency]["ctVal"] * price
except:
return 0
def get_min_qty():
try:
return assets[base_currency]["MinQty"]
except:
return 0.001
def get_price():
return assets[base_currency]["price"]
# ═══════════════════════════════════════════
# 账户 / 行情
# ═══════════════════════════════════════════
def update_price():
ticker = exchange.GetTicker()
if not ticker:
return False
assets[base_currency]["price"] = ticker.Last
return True
def update_account():
try:
acc = exchange.GetAccount()
Sleep(500)
if not acc:
return False
assets["USDT"]["equity"] = acc.Equity
assets["USDT"]["total_balance"] = acc.Balance + acc.FrozenBalance
assets["USDT"]["margin_balance"] = acc.Balance
global Funding
Funding = acc.Equity
amt, hp, pft = _get_position()
assets[base_currency]["amount"] = amt
assets[base_currency]["hold_price"] = hp
assets[base_currency]["unrealised_profit"] = pft
global positions_cache
try:
positions_cache = exchange.GetPosition() or []
Sleep(500)
except Exception:
positions_cache = []
return True
except Exception as e:
Log(f"更新账户异常: {e}")
return False
def _get_position():
try:
positions = exchange.GetPosition()
Sleep(500)
if not positions:
return 0, 0, 0
for pos in positions:
if pos["ContractType"] == "swap" and pos["Amount"] > 0 and pos["Type"] == 0:
return pos["Amount"], pos["Price"], pos["Profit"]
return 0, 0, 0
except Exception as e:
Log(f"获取持仓异常: {e}")
return 0, 0, 0
def get_total_position():
amt, _, _ = _get_position()
return amt
# ═══════════════════════════════════════════
# 止损线动态更新(只降不升)
# ═══════════════════════════════════════════
def update_stop_price():
"""
每根新主K线调用一次。
用最新ATR重新计算止损线,若比当前低则下移(只降不升)。
"""
global stop_price
if not STOP_TRAIL_ENABLE or state != STATE_ACTIVE:
return
if current_atr <= 0 or range_low <= 0:
return
new_stop = fp(range_low - current_atr * STOP_BUFFER_RATIO)
if new_stop < stop_price:
Log(f"止损线下移: {stop_price:.4f} → {new_stop:.4f} (ATR={current_atr:.4f})")
stop_price = new_stop
# ═══════════════════════════════════════════
# 订单操作
# ═══════════════════════════════════════════
def cancel_all_orders(swapcode):
try:
orders = exchange.GetOrders(swapcode)
if orders:
for o in orders:
exchange.CancelOrder(o["Id"])
Sleep(100)
except Exception as e:
Log(f"撤单异常: {e}")
def cancel_order_safe(oid):
if not oid:
return
try:
exchange.CancelOrder(oid)
Sleep(100)
except Exception as e:
Log(f"撤单{oid}异常: {e}")
def check_order_status(oid):
if oid is None:
return "unknown", 0, 0
try:
order = exchange.GetOrder(oid)
if not order:
return "unknown", 0, 0
status = order.get("Status", -1)
deal = order.get("DealAmount", 0) or 0
avg_px = order.get("AvgPrice", 0) or order.get("Price", 0) or 0
if status == 1:
return "filled", deal, avg_px
elif status == 2:
return "cancelled", deal, avg_px
elif status in (0, 3):
return "active", deal, avg_px
else:
return "unknown", deal, avg_px
except Exception as e:
Log(f"查询订单{oid}异常: {e}")
return "unknown", 0, 0
def place_limit_buy(buy_price, usdt_amount, label=""):
raw = usdt_to_contracts(usdt_amount, buy_price)
contracts = fa(raw)
if contracts <= 0:
return None
min_qty = assets[base_currency]["MinQty"]
if contracts < min_qty:
return "skip_min_detail", raw, min_qty
ct_val = assets[base_currency]["ctVal"]
order_u = contracts * ct_val * buy_price
price_f = fp(buy_price)
Log(f"[{label}] 限价买开 @{price_f} {contracts}张 ≈{order_u:.2f}U")
oid = exchange.CreateOrder(f"{symbol}.swap", "buy", price_f, contracts)
Log(f" → {'成功 ID=' + str(oid) if oid else '失败'}")
return oid
def place_limit_close(sell_price, contracts, label=""):
fc = fa(contracts)
if fc <= 0:
return None
price_f = fp(sell_price)
Log(f"[{label}] 限价卖平 @{price_f} {fc}张")
oid = exchange.CreateOrder(f"{symbol}.swap", "closebuy", price_f, fc)
Log(f" → {'成功 ID=' + str(oid) if oid else '失败'}")
return oid
def close_grids_by_contracts(label="平仓"):
"""
止损/重置时平掉所有格子持仓。
直接用实时价挂限价单(合约平多时限价≥市价会立即以市价成交)。
最后兜底:查询真实持仓,若有剩余再补平一次。
"""
price = get_price()
sell_p = fp(price)
total_fc = 0
for i, g in enumerate(grids):
fc = fa(g.get("buy_contracts", 0))
if fc <= 0:
continue
Log(f"[{label}] 格{i} 限价平 {fc}张 @{sell_p}")
exchange.CreateOrder(f"{symbol}.swap", "closebuy", sell_p, fc)
total_fc += fc
Sleep(100)
Sleep(500)
# 兜底:查真实持仓,防止格子记录与实际持仓不一致
remaining, _, _ = _get_position()
if remaining > 0:
fc = fa(remaining)
Log(f"[{label}] 兜底平剩余{fc}张 @{sell_p} (格子记录合计{total_fc}张)")
Log(exchange.GetPositions())
Sleep(500)
exchange.CreateOrder(f"{symbol}.swap", "closebuy", sell_p, fc)
Sleep(300)
# ═══════════════════════════════════════════
# 动态格数计算
# ═══════════════════════════════════════════
def calc_grids(r_low, r_high):
"""
根据区间宽度和资金量动态计算格数:
- 格距 ≥ 双边手续费 × FEE_PROFIT_MULTI
- 格数 ≤ 资金能支撑的最小张数格数
- 格数 ≤ MAX_GRIDS
仓位按"距上沿距离"加权分配(越低格分配越多仓位)。
"""
price = get_price()
range_width = r_high - r_low
total_position_usdt = Funding * LEVERAGE / 2
min_step_pct = FEE_RATE * 2 * FEE_PROFIT_MULTI
min_step_abs = price * min_step_pct
min_step_abs = max(min_step_abs, 10 ** (-assets[base_currency]["PricePrecision"]))
max_by_range = int(range_width / min_step_abs)
min_qty = get_min_qty()
ct_val = assets[base_currency]["ctVal"]
total_contracts = total_position_usdt / (ct_val * price)
max_by_capital = int(total_contracts / min_qty) if min_qty > 0 else MAX_GRIDS
n = min(max_by_range, max_by_capital, MAX_GRIDS)
if n < 1:
Log(f"警告: 区间太窄或资金不足,无法建立有效格子 "
f"(by_range={max_by_range}, by_capital={max_by_capital})")
return [], []
step = range_width / n
prices = [fp(r_low + i * step) for i in range(n + 1)]
prices[-1] = fp(r_high)
# 距上沿越远 → 分配越多仓位(更低的格子风险更小)
dists = [r_high - prices[i] for i in range(n)]
total_dist = sum(dists)
usdt_list = []
for i in range(n):
u = total_position_usdt * dists[i] / total_dist if total_dist > 0 else total_position_usdt / n
usdt_list.append(round(u, 4))
Log(f"动态格数计算: 区间宽度={range_width:.4f} 最小格距={min_step_abs:.4f}")
Log(f" 理论格数={max_by_range} 资金格数={max_by_capital} 实际格数={n}")
Log(f" 总仓位={total_position_usdt:.2f}U 总张数≈{total_contracts:.4f} 最小单张={min_qty} 格距={step:.4f}")
Log(f" 最小格金额={min(usdt_list):.2f}U 最大格金额={max(usdt_list):.2f}U 合计={sum(usdt_list):.2f}U")
return prices, usdt_list
# ═══════════════════════════════════════════
# 区间识别 - 三重过滤
# ═══════════════════════════════════════════
def try_confirm_range():
"""
三重过滤,全部通过才确认区间:
① 线性回归斜率 < SLOPE_THRESHOLD → 无趋势
② 最近HH_LL_BARS根K线高低点不同时单调 → 不单边运动
③ 最近RANGE_CONFIRM_BARS根收盘价
有≥BOX_MIN_RATIO在±BOX_PCT箱内 → 震荡幅度合理
区间边界 = 最近RANGE_CONFIRM_BARS根K线的真实最高/最低价。
诊断数据写入全局 diag,供状态面板展示。
"""
global diag
if len(kline_buffer) < RANGE_CONFIRM_BARS:
diag["failed"] = [f"K线不足{len(kline_buffer)}/{RANGE_CONFIRM_BARS}"]
return False, 0, 0
recent = kline_buffer[-RANGE_CONFIRM_BARS:]
closes = [b[4] for b in recent]
highs = [b[2] for b in recent]
lows = [b[3] for b in recent]
n = len(closes)
center = sum(closes) / n
# ① 线性回归斜率
x_mean = (n - 1) / 2
num = sum((i - x_mean) * (closes[i] - center) for i in range(n))
den = sum((i - x_mean) ** 2 for i in range(n))
slope = (num / den) if den > 0 else 0
slope_rel = abs(slope) / center
slope_ok = slope_rel < SLOPE_THRESHOLD
# ② 高低点结构(不能同时单调)
chk_bars = kline_buffer[-HH_LL_BARS:]
h_seq = [b[2] for b in chk_bars]
l_seq = [b[3] for b in chk_bars]
h_monotone = all(h_seq[i] >= h_seq[i - 1] for i in range(1, len(h_seq)))
l_monotone = all(l_seq[i] <= l_seq[i - 1] for i in range(1, len(l_seq)))
structure_ok = not (h_monotone or l_monotone)
# ③ 百分比箱体
box_half = center * BOX_PCT
box_lo = center - box_half
box_hi = center + box_half
in_box = sum(1 for c in closes if box_lo <= c <= box_hi)
in_box_pct = in_box / n
box_ok = in_box_pct >= BOX_MIN_RATIO
failed = [s for s, ok in [("①斜率", slope_ok), ("②结构", structure_ok), ("③箱体", box_ok)] if not ok]
diag.update({
"slope": round(slope, 6),
"slope_rel": round(slope_rel * 100, 4),
"slope_ok": slope_ok,
"h_monotone": h_monotone,
"l_monotone": l_monotone,
"structure_ok": structure_ok,
"box_lo": round(box_lo, 4),
"box_hi": round(box_hi, 4),
"in_box": in_box,
"in_box_total": n,
"in_box_pct": round(in_box_pct * 100, 1),
"box_ok": box_ok,
"center": round(center, 4),
"failed": failed,
})
if slope_ok and structure_ok and box_ok:
zone_lo = fp(min(lows))
zone_hi = fp(max(highs))
Log(f"区间确认: [{zone_lo}, {zone_hi}] 中轴={center:.4f} 斜率={slope_rel*100:.4f}% 箱内={in_box}/{n}")
return True, zone_lo, zone_hi
else:
return False, 0, 0
# ═══════════════════════════════════════════
# 重置 / 激活
# ═══════════════════════════════════════════
def reset_to_scanning(reason=""):
"""
重置到扫描状态:
1. 撤所有挂单
2. 平所有持仓
3. 清空所有区间/格子/计数器状态
4. ★ 清空K线缓冲 → 重新从零积累,防止止损后立即复活区间
"""
global state, empty_bars_count
global grids, grid_prices, grid_usdt
global range_low, range_high, stop_price
global stop_confirm_count, break_top_count
global kline_buffer, last_closed_ktime # ★ 新增
global m1_kline_buffer, m1_last_closed_ktime # ★ 新增
Log(f"→ SCANNING 原因: {reason}")
Log(exchange.GetPositions('PAXG_USDC.swap'))
Sleep(500)
cancel_all_orders('PAXG_USDC.swap')
Sleep(500)
if get_total_position() > 0:
close_grids_by_contracts(f"重置平仓({reason})")
Sleep(1000)
# ── 区间/格子/计数器 ──────────────────────────────────────
state = STATE_SCANNING
grids = []
grid_prices = []
grid_usdt = []
range_low = 0.0
range_high = 0.0
stop_price = 0.0
stop_confirm_count = 0
break_top_count = 0
empty_bars_count = 0
# ── ★ 清空K线缓冲,强制重新积累 ─────────────────────────────
# 目的:防止止损/涨破后立即用旧K线重新确认同一区间
kline_buffer = []
last_closed_ktime = 0
m1_kline_buffer = []
m1_last_closed_ktime = 0
def activate_grids(r_low, r_high):
"""
区间确认后激活格子:
- 计算止损线(下沿 - ATR × STOP_BUFFER_RATIO)
- 动态计算格数和仓位
- 当前价格以下的格子立即挂限价买单
- 当前价格以上的格子标记为 skip_above
"""
global state, grids, grid_prices, grid_usdt, range_low, range_high, stop_price
global stop_confirm_count, break_top_count
range_low = r_low
range_high = r_high
stop_price = fp(r_low - current_atr * STOP_BUFFER_RATIO)
stop_confirm_count = 0
break_top_count = 0
prices, usdt_list = calc_grids(r_low, r_high)
if not prices:
Log("格子计算失败,继续扫描")
return
grid_prices = prices
grid_usdt = usdt_list
n = len(usdt_list)
Log(f"区间激活: 下沿={r_low} 上沿={r_high} 止损={stop_price} 格数={n}")
Log(f" 止损缓冲=ATR×{STOP_BUFFER_RATIO} ({current_atr:.4f}×{STOP_BUFFER_RATIO}={current_atr * STOP_BUFFER_RATIO:.4f})")
Log(f" 止损确认={STOP_CONFIRM_BARS}根M1K线 涨破确认={BREAK_TOP_BARS}根M1K线")
for i in range(n):
Log(f" 格{i}: 买@{prices[i]}→卖@{prices[i + 1]} {usdt_list[i]:.2f}U")
grids = [{"buy_oid": None, "sell_oid": None, "status": "empty",
"buy_contracts": 0, "filled_price": 0} for _ in range(n)]
price = get_price()
for i in range(n):
if prices[i] < price:
_place_grid_buy(i)
else:
grids[i]["status"] = "skip_above"
state = STATE_ACTIVE
def _place_grid_buy(i):
n = len(grids)
if i >= n:
return
g = grids[i]
if g["status"] in ("holding", "pending_sell", "pending_buy"):
return
result = place_limit_buy(grid_prices[i], grid_usdt[i],
label=f"格{i} 买@{grid_prices[i]}")
if isinstance(result, tuple) and result[0] == "skip_min_detail":
_, raw_contracts, min_qty = result
g["status"] = "skip_min"
g["skip_raw"] = round(raw_contracts, 6)
g["skip_min"] = min_qty
elif result:
g["buy_oid"] = result
g["status"] = "pending_buy"
else:
g["status"] = "skip_min"
def _place_grid_sell(i, contracts):
n = len(grids)
if i >= n:
return
g = grids[i]
sell_price = grid_prices[i + 1]
oid = place_limit_close(sell_price, contracts,
label=f"格{i} 卖@{sell_price}")
if oid:
g["sell_oid"] = oid
g["buy_contracts"] = contracts
g["status"] = "pending_sell"
# ═══════════════════════════════════════════
# ACTIVE:格子状态机
# ═══════════════════════════════════════════
def sync_grid_orders():
"""
每轮轮询执行:
pending_buy → 检查成交 → 成交则挂止盈卖单
pending_sell → 检查成交 → 成交则重挂该格买单
empty/skip → 若价格已低于买价则挂买单
同时统计空仓计数(用于超时重建)。
"""
global empty_bars_count
price = get_price()
n = len(grids)
if n == 0:
return
for i in range(n):
g = grids[i]
if g["status"] == "pending_buy":
st, deal, avg_px = check_order_status(g["buy_oid"])
if st == "filled":
contracts = fa(deal) if deal > 0 else fa(usdt_to_contracts(grid_usdt[i], grid_prices[i]))
filled_px = avg_px if avg_px > 0 else grid_prices[i]
Log(f"格{i} 买单成交 均价={filled_px} 实际{contracts}张")
g["buy_oid"] = None
g["buy_contracts"] = contracts
g["filled_price"] = filled_px
_place_grid_sell(i, contracts)
elif st == "cancelled":
Log(f"格{i} 买单已撤销,重置为 empty")
g["buy_oid"] = None
g["status"] = "empty"
elif g["status"] == "pending_sell":
st, deal, avg_px = check_order_status(g["sell_oid"])
if st == "filled":
filled_px = avg_px if avg_px > 0 else grid_prices[i + 1]
Log(f"格{i} 止盈成交 均价={filled_px} → 重置买单")
g["sell_oid"] = None
g["buy_contracts"] = 0
g["filled_price"] = 0
g["status"] = "empty"
if grid_prices[i] < price:
_place_grid_buy(i)
elif st == "cancelled":
Log(f"格{i} 止盈单已撤销,重新挂止盈单")
g["sell_oid"] = None
contracts = g.get("buy_contracts", 0)
if contracts > 0:
_place_grid_sell(i, contracts)
else:
g["status"] = "empty"
elif g["status"] in ("empty", "skip_min", "skip_above"):
if grid_prices[i] < price:
_place_grid_buy(i)
# 空仓计数(无持仓且无挂单中的止盈单)
total_amt = get_total_position()
has_holding = any(g["status"] == "pending_sell" for g in grids)
if total_amt <= 0 and not has_holding:
empty_bars_count += 1
else:
empty_bars_count = 0
def check_break_and_top(new_m1_kline=False):
"""
检查三种重置条件(仅在有新M1 K线时执行止损/涨破检测):
1. 空仓超时:empty_bars_count ≥ EMPTY_TIMEOUT_BARS
2. 止损:连续STOP_CONFIRM_BARS根M1收盘价 < stop_price
3. 涨破:连续BREAK_TOP_BARS根M1收盘价 > range_high
返回 True 表示已触发重置。
"""
global stop_confirm_count, break_top_count
# 空仓超时
if empty_bars_count >= EMPTY_TIMEOUT_BARS:
Log(f"空仓超过{EMPTY_TIMEOUT_BARS}根K线 → 重建")
reset_to_scanning("空仓超时")
return True
if new_m1_kline and len(m1_kline_buffer) >= 1:
last_close = m1_kline_buffer[-1][4]
# 止损检测
if last_close < stop_price:
stop_confirm_count += 1
Log(f"【止损确认】M1收盘价{last_close:.4f} < 止损线{stop_price:.4f} "
f"计数{stop_confirm_count}/{STOP_CONFIRM_BARS}")
if stop_confirm_count >= STOP_CONFIRM_BARS:
Log(f"连续{STOP_CONFIRM_BARS}根M1 K线低于止损线 → 触发止损重建")
reset_to_scanning("M1收盘止损")
return True
else:
if stop_confirm_count > 0:
Log(f"止损确认计数重置({last_close:.4f} ≥ {stop_price:.4f})")
stop_confirm_count = 0
# 涨破上沿检测
if last_close > range_high:
break_top_count += 1
Log(f"【涨破确认】M1收盘价{last_close:.4f} > 上沿{range_high:.4f} "
f"计数{break_top_count}/{BREAK_TOP_BARS}")
if break_top_count >= BREAK_TOP_BARS:
Log(f"连续{BREAK_TOP_BARS}根M1 K线高于上沿 → 全部止盈重建")
reset_to_scanning("M1收盘涨破上沿")
return True
else:
break_top_count = 0
return False
# ═══════════════════════════════════════════
# 状态机主入口
# ═══════════════════════════════════════════
def run_state_machine(new_kline=False, new_m1_kline=False):
global current_atr
# ── SCANNING ──────────────────────────────────────────────
if state == STATE_SCANNING:
need = max(RANGE_CONFIRM_BARS, ATR_PERIOD + 1)
if len(kline_buffer) < need:
Log(f"SCANNING 等待K线 {len(kline_buffer)}/{need}")
return
if kline_buffer:
atr = compute_atr(kline_buffer)
if atr:
current_atr = atr
confirmed, r_lo, r_hi = try_confirm_range()
if confirmed:
Log(f"区间确认: {r_lo} ~ {r_hi} ATR={current_atr:.4f}")
activate_grids(r_lo, r_hi)
return
# ── ACTIVE ────────────────────────────────────────────────
if state == STATE_ACTIVE:
if new_kline:
# 每根新主K线更新ATR和动态止损线
if kline_buffer:
atr = compute_atr(kline_buffer)
if atr:
current_atr = atr
update_stop_price()
if check_break_and_top(new_m1_kline=new_m1_kline):
return
sync_grid_orders()
return
# ═══════════════════════════════════════════
# 初始化
# ═══════════════════════════════════════════
def init():
global symbol, base_currency, Funding, INIT_FUNDING
Log("小仓位动态区间网格 启动")
exchange.SetMarginLevel(LEVERAGE)
currency = exchange.GetCurrency()
symbol = currency
base_currency = symbol.split("_")[0]
swapcode = symbol + ".swap"
ticker = exchange.GetTicker(swapcode)
data = exchange.GetMarkets().get(swapcode)
if not data:
raise Exception(f"无法获取市场信息: {swapcode}")
assets[base_currency] = {
"amount": 0, "hold_price": 0, "price": 0, "unrealised_profit": 0,
"AmountPrecision": data["AmountPrecision"],
"PricePrecision": data["PricePrecision"],
"MinQty": data["MinQty"],
"ctVal": data["CtVal"]
}
cancel_all_orders(swapcode)
acc = exchange.GetAccount()
if acc:
INIT_FUNDING = acc.Equity
Funding = INIT_FUNDING
Log(f"账户资金={Funding:.2f}U")
Log(f"手续费=万{FEE_RATE * 10000:.0f} 格距覆盖费{FEE_PROFIT_MULTI}倍 最大格数={MAX_GRIDS}")
Log(f"最小下单张数={get_min_qty()}")
Log(f"【止损参数】缓冲=ATR×{STOP_BUFFER_RATIO} K线确认={STOP_CONFIRM_BARS}根M1 "
f"涨破确认={BREAK_TOP_BARS}根M1 动态跟踪={'开启' if STOP_TRAIL_ENABLE else '关闭'}")
Log(f"【区间识别】斜率阈值={SLOPE_THRESHOLD * 100:.4f}%/根 "
f"箱体宽度=±{BOX_PCT * 100:.1f}% 箱内比例≥{BOX_MIN_RATIO * 100:.0f}% "
f"确认K线数={RANGE_CONFIRM_BARS}根 结构判断={HH_LL_BARS}根")
# ═══════════════════════════════════════════
# 状态面板
# ═══════════════════════════════════════════
def refresh_tables():
try:
price = get_price()
bal = assets["USDT"]["total_balance"]
equity = assets["USDT"].get("equity", bal)
unreal = assets[base_currency]["unrealised_profit"]
profit = round(equity - INIT_FUNDING, 4)
ppct = round(profit / INIT_FUNDING * 100, 2) if INIT_FUNDING else 0
amt = assets[base_currency]["amount"]
n = len(grids)
pending = sum(1 for g in grids if g["status"] == "pending_buy")
holding = sum(1 for g in grids if g["status"] in ("holding", "pending_sell"))
state_label = {
STATE_SCANNING: "🔍 扫描区间",
STATE_ACTIVE: f"✅ 网格运行 ({n}格 买单{pending} 持仓{holding})",
}.get(state, state)
rng_w = round(range_high - range_low, 4) if range_high > 0 else 0
rng_pct = round(rng_w / range_low * 100, 3) if range_low > 0 else 0
main = {
"type": "table",
"title": f"动态区间网格 | {symbol} | {state_label}",
"cols": ["项目", "数值", "项目", "数值", "项目", "数值"],
"rows": [
["状态", state_label, "账户权益", f"{equity:.4f}U",
"总盈亏", f"{profit:+.4f}U ({ppct:+.2f}%)"],
["当前价格", f"{price:.4f}", "持仓张数", f"{amt:.4f}",
"未实现盈亏", f"{unreal:.4f}U"],
["区间下沿", f"{range_low:.4f}", "区间上沿", f"{range_high:.4f}",
"区间宽度", f"{rng_w} ({rng_pct}%)"],
["止损线", f"{stop_price:.4f}",
"止损确认", f"{stop_confirm_count}/{STOP_CONFIRM_BARS}根M1",
"涨破确认", f"{break_top_count}/{BREAK_TOP_BARS}根M1"],
["ATR", f"{current_atr:.4f}",
"止损缓冲", f"ATR×{STOP_BUFFER_RATIO}={current_atr * STOP_BUFFER_RATIO:.4f}",
"动态止损", "开启" if STOP_TRAIL_ENABLE else "关闭"],
["账户资金", f"{Funding:.4f}U",
"空仓超时", f"{empty_bars_count}/{EMPTY_TIMEOUT_BARS}",
"区间识别", f"三重过滤/{RANGE_CONFIRM_BARS}根K线"],
["斜率阈值", f"{SLOPE_THRESHOLD * 100:.4f}%/根",
"箱体宽度", f"±{BOX_PCT * 100:.1f}%",
"箱内比例", f"≥{BOX_MIN_RATIO * 100:.0f}%"],
]
}
ct_val = assets[base_currency].get("ctVal", 1)
def short_id(oid):
return str(oid)[-8:] if oid else "-"
grid_rows = []
for i, g in enumerate(grids):
buy_p = grid_prices[i] if i < len(grid_prices) else 0
sell_p = grid_prices[i + 1] if i + 1 < len(grid_prices) else 0
usdt = grid_usdt[i] if i < len(grid_usdt) else 0
contracts = g.get("buy_contracts", 0)
filled_px = g.get("filled_price", 0)
status_cn = {
"empty": "⬜ 等待",
"pending_buy": "🟡 挂单买入",
"holding": "🟢 持仓",
"pending_sell": "🔵 持仓止盈中",
"skip_above": "⬆️ 等待入场",
}.get(g["status"], g["status"])
if g["status"] == "skip_min":
skip_raw = g.get("skip_raw", 0)
skip_min = g.get("skip_min", assets[base_currency].get("MinQty", "?"))
status_cn = f"⛔ 不足最小({skip_raw}张 < {skip_min}张)"
hold_p_str = f"{filled_px:.4f}" if (contracts > 0 and filled_px > 0) else "-"
if contracts > 0 and filled_px > 0:
pnl = contracts * ct_val * (price - filled_px)
pnl_pct = pnl / (contracts * ct_val * filled_px) * 100
pnl_str = f"{pnl:+.2f}U ({pnl_pct:+.2f}%)"
else:
pnl_str = "-"
buy_id_str = f"买:{short_id(g.get('buy_oid'))}" if g.get("buy_oid") else "-"
sell_id_str = f"卖:{short_id(g.get('sell_oid'))}" if g.get("sell_oid") else "-"
order_str = " / ".join(x for x in [buy_id_str, sell_id_str] if x != "-") or "-"
grid_rows.append([
f"格{i}",
f"{buy_p}",
f"{sell_p}",
f"{contracts}张" if contracts > 0 else "-",
hold_p_str,
f"{price:.4f}",
pnl_str,
order_str,
status_cn,
])
if not grid_rows:
grid_rows = [["无格子", "-", "-", "-", "-", "-", "-", "-", "等待区间识别"]]
grid_tbl = {
"type": "table",
"title": f"格子持仓 | 下沿={range_low} 上沿={range_high} 止损={stop_price}",
"cols": ["格子", "挂单买价", "止盈价", "开仓数量", "开仓价", "实时价", "浮动盈亏", "挂单ID", "状态"],
"rows": grid_rows
}
LogProfit(profit, "&")
# 三重过滤诊断表格
d = diag
def ok_icon(v): return "✅ 通过" if v else "❌ 未通过"
diag_rows = [
[
"① 趋势斜率",
f"{d['slope_rel']:.4f}% / 根",
f"阈值 < {SLOPE_THRESHOLD * 100:.4f}%",
ok_icon(d["slope_ok"]),
f"原始斜率={d['slope']:.4f} 中轴={d['center']:.4f}",
],
[
"② 高低点结构",
f"高点单调={d['h_monotone']} 低点单调={d['l_monotone']}",
f"判断最近{HH_LL_BARS}根K线",
ok_icon(d["structure_ok"]),
"高低点同时单调=趋势,至少一个不单调=震荡",
],
[
"③ 百分比箱体",
f"箱内={d['in_box']}/{d['in_box_total']}根 ({d['in_box_pct']}%)",
f"箱体=[{d['box_lo']:.2f}, {d['box_hi']:.2f}] 阈值≥{BOX_MIN_RATIO*100:.0f}%",
ok_icon(d["box_ok"]),
f"箱宽=±{BOX_PCT*100:.1f}% 中轴={d['center']:.4f}",
],
[
"📊 综合结论",
f"失败项: {d['failed'] if d['failed'] else '无'}",
f"确认需{RANGE_CONFIRM_BARS}根K线",
"✅ 区间确认" if not d["failed"] else "❌ 等待确认",
f"当前K线数={len(kline_buffer)}",
],
]
diag_tbl = {
"type": "table",
"title": f"🔍 三重过滤区间识别诊断",
"cols": ["过滤项", "当前数值", "判断条件", "结果", "说明"],
"rows": diag_rows,
}
LogStatus(
"`" + json.dumps(main, ensure_ascii=False) + "`\n"
"`" + json.dumps(diag_tbl, ensure_ascii=False) + "`\n"
"`" + json.dumps(grid_tbl, ensure_ascii=False) + "`"
)
except Exception as e:
Log(f"状态面板异常: {e}")
# ═══════════════════════════════════════════
# 主入口
# ═══════════════════════════════════════════
def main():
_G("null")
LogProfitReset(0)
LogReset()
SetErrorFilter(
"502:|503:|tcp|character|unexpected|network|timeout|"
"WSARecv|Connect|GetAddr|no such|reset|http|received|EOF|reused|Unknown"
)
init()
Log("=" * 60)
Log(f"策略: 动态区间网格 杠杆={LEVERAGE}x")
Log(f"格数: 由区间宽度÷最小盈利格距动态计算(上限{MAX_GRIDS}格)")
Log(f"区间识别: 三重过滤(斜率+结构+箱体)")
Log(f" ① 线性回归斜率 < {SLOPE_THRESHOLD * 100:.4f}%/根")
Log(f" ② 最近{HH_LL_BARS}根K线高低点不同时单调")
Log(f" ③ 最近{RANGE_CONFIRM_BARS}根K线收盘价≥{BOX_MIN_RATIO * 100:.0f}%在±{BOX_PCT * 100:.1f}%箱内")
Log(f"止损: 缓冲ATR×{STOP_BUFFER_RATIO} 连续{STOP_CONFIRM_BARS}根M1收盘价低于止损线才触发")
Log(f"涨破: 连续{BREAK_TOP_BARS}根M1收盘价高于上沿才触发重建")
Log(f"动态止损: {'开启(只降不升)' if STOP_TRAIL_ENABLE else '关闭'}")
Log(f"重置后: K线缓冲完全清空,重新积累{max(RANGE_CONFIRM_BARS, ATR_PERIOD + 1)}根K线才能重建区间")
Log("=" * 60)
while True:
if not update_account():
Sleep(5000)
continue
if not update_price():
Sleep(5000)
continue
new_kline = fetch_klines()
new_m1_kline = fetch_m1_klines()
# 主K线有更新时同步更新ATR(SCANNING/ACTIVE 均需要)
if new_kline and kline_buffer:
atr = compute_atr(kline_buffer)
if atr:
current_atr = atr
run_state_machine(new_kline=new_kline, new_m1_kline=new_m1_kline)
refresh_tables()
Sleep(POLL_INTERVAL)