Type/to search

AI 产业链 TradFi 合约图谱交易系统

Cryptocurrency
Created: 2026-06-25 16:05:43
Last modified: 5 days ago
2
Follow
483
Followers

策略简介

本策略是一套基于 AI 主题资产轮动逻辑的 TradFi 股票合约联动交易系统。

策略通过交易所市场接口自动扫描币安 TradFi 股票类永续合约,只筛选 underlyingType = EQUITY 的股票合约,然后使用大语言模型为每只股票建立结构化画像,判断其是否属于 AI 算力、AI 存储、AI 服务器、AI 云、AI 模型应用、半导体设备、晶圆代工、光通信网络、电力散热、AI 平台广告等多个 AI 子板块。

与传统单一板块分类不同,本策略允许一只股票同时属于多个 AI basket。例如,GOOGL 既可能属于 AI 云,也可能属于 AI 模型应用和 AI 平台广告;MSFT 既可能属于 AI 云,也可能属于企业 AI 应用;NVDA 既可能属于 AI 算力芯片,也可能影响 AI 服务器和 AI 云基础设施。

策略的核心不是简单判断某只股票是否上涨,而是识别 AI 主题内部的资金轮动过程:当某个 AI 子板块中的 leader 股票出现明显价格异动时,系统会进一步判断是否存在同板块 follower 股票尚未充分反应的机会。

策略同时接入 BraveSearch 作为实时信息源,用于抓取 AI 子板块和重点股票的相关新闻,再交由大模型分析板块趋势、核心催化、核心风险、候选 leader 和候选 follower。策略不会直接根据新闻下单,而是采用“价格异常优先,新闻追因确认”的方式:先检测价格突破,再搜索新闻解释,最后结合 K 线统计关系决定是否生成交易信号。

核心逻辑

(1)自动筛选 TradFi EQUITY 股票合约

策略定时调用交易所市场接口,扫描当前所有合约,只保留满足条件的 TradFi 股票合约。

筛选条件包括:

(1)合约为 USDT 永续合约。
(2)underlyingSubType 包含 TradFi
(3)underlyingType 必须为 EQUITY
(4)通过大模型进一步判断 asset_type,只有普通股票或 ADR 可以进入 basket。
(5)ETF、杠杆 ETF、指数、基金、商品 ETF、地区 ETF 等不允许进入股票 basket。

这样可以避免 SOXL、QQQ、EWT、EWY 等非公司股票类标的污染 AI 股票板块。

(2)使用 LLM 建立股票多 basket 画像

每只股票会被发送给大语言模型进行结构化分析。

模型需要输出:

(1)股票代码。
(2)资产类型。
(3)公司名称。
(4)行业和业务摘要。
(5)是否可纳入策略观察。
(6)所属 AI basket。
(7)每个 basket 的 exposure 暴露度。
(8)该股票在 basket 中更像 leader、follower、both 还是 observer。
(9)搜索关键词和 BraveSearch 查询语句。
(10)可能的 leader 和 follower。
(11)中文 reason。

策略强制要求大模型输出固定字段,并对置信度做统一处理,避免模型一会儿输出 confidence,一会儿输出 model_confidence,或者一会儿输出 0.75,一会儿输出 75

(3)构建 AI 子板块 basket

根据每只股票的大模型画像,策略会自动构建多个 AI 子板块。

当前主要 basket 包括:

(1)ai_compute:AI 算力芯片,包括 GPU、CPU、AI 加速器、ASIC、SoC 等。
(2)ai_memory:AI 存储,包括 HBM、DRAM、NAND、企业级存储等。
(3)ai_server:AI 服务器,包括整机、ODM、企业硬件、液冷服务器等。
(4)ai_cloud:AI 云和算力租赁,包括公有云、GPU 云、AI 基础设施租赁等。
(5)ai_model_application:AI 模型和应用,包括大模型、企业软件、Agent 平台等。
(6)ai_semicap:半导体设备,包括光刻、刻蚀、沉积、量测、先进封装设备等。
(7)ai_foundry:晶圆代工,包括先进制程、CoWoS、晶圆制造等。
(8)ai_optical_networking:光通信和网络,包括光模块、交换机、网络设备、互联芯片等。
(9)ai_power_cooling:电力和散热,包括数据中心电力、液冷、散热、能源基础设施等。
(10)ai_platform_ads:AI 平台广告,包括搜索、推荐、广告、内容平台中的 AI 变现。

同一股票可以出现在多个 basket 中,每个 basket 内有不同的 exposure 和 role。

(4)BraveSearch 板块情报分析

策略会定时使用 BraveSearch 获取每个 AI basket 的板块级新闻和重点股票新闻。

板块情报分析包括:

(1)当前板块趋势。
(2)趋势置信度。
(3)板块摘要。
(4)核心催化因素。
(5)核心风险。
(6)候选 leader。
(7)候选 follower。
(8)后续需要验证的 source → target 关系。

这一步的作用是给策略建立实时市场背景。比如 AI 存储板块是否受 HBM 需求推动,AI 云板块是否受资本开支上调影响,AI 服务器板块是否有订单或财报催化。

(5)实时检测 basket 内价格强弱

策略会定时刷新每个 basket 内所有股票的实时状态。

计算内容包括:

(1)每只股票的实时涨跌幅。
(2)basket 平均涨跌幅。
(3)每只股票相对 basket 的偏离。
(4)标准化 ZScore。
(5)当前 leader。
(6)basket 状态。

basket 状态主要包括:

(1)WATCH_BREAKOUT:板块相对平衡,等待突破。
(2)BALANCE_SEEKING:板块内部已经分化,需要等待重新平衡。
(3)BREAKOUT_DETECTED:某只股票明显偏离板块平均表现,成为当前 leader。

策略关注的是:leader 已经异动,但同板块 follower 尚未充分反应的情况。

(6)K 线统计验证 leader / follower 关系

大模型可以提供产业逻辑,但交易不能只依赖语义判断。

因此,策略会在每个 basket 内对股票之间的历史 K 线关系进行统计验证。

统计内容包括:

(1)source 与 target 的收益率相关性。
(2)source 突破后 target 的跟随成功率。
(3)最佳滞后周期。
(4)平均跟随收益。
(5)综合 edge score。
(6)该关系是否 active。

只有满足统计条件的 source → target 关系,才会用于后续交易判断。

也就是说,即使大模型认为 NVDA 和某个股票有产业关系,如果历史价格统计不支持跟随关系,策略也不会把它作为有效 follower。

(7)价格异常后进行新闻追因

本策略不是新闻驱动交易,而是价格异常后进行新闻追因。

当某个 basket 出现 BREAKOUT_DETECTED 时,策略会检查:

(1)当前 leader 是否存在 active edge。
(2)是否有 follower 仍然处于滞后状态。
(3)该信号是否处于冷却期。
(4)是否配置了 BraveSearch 和 LLM。
(5)是否能搜索到 leader 的相关新闻。

如果条件满足,策略会针对 leader 拉取最新新闻,并将以下信息交给大模型:

(1)当前 basket 信息。
(2)板块情报。
(3)实时价格状态。
(4)leader 的 ZScore 和方向。
(5)候选 follower。
(6)已验证的统计关系。
(7)BraveSearch 搜索到的 leader 新闻。

大模型需要判断:

(1)新闻是否能解释 leader 的价格异动。
(2)新闻方向是否和价格方向一致。
(3)是否允许交易。
(4)可能影响哪些 follower。
(5)每个 follower 的 impact 强度。
(6)为什么这些 follower 可能跟随。

只有新闻解释成立、方向一致、置信度足够、trade_allowed = true,策略才会生成 confirmed signal。

(8)交易执行逻辑

策略支持两种模式:

(1)notify 模式
只记录和展示信号,不实际下单。适合观察策略信号质量。

(2)trade 模式
当 confirmed signal 满足条件后,自动执行交易。

交易对象主要是 follower,而不是已经突破的 leader。因为 leader 往往已经发生明显异动,直接追 leader 容易追高。策略更关注同板块中尚未充分反应的 follower。

对于同一合约已有持仓的情况,策略处理方式如下:

(1)如果已有同向持仓,则执行加仓。
(2)如果已有反向持仓,则先平仓,再按新方向开仓。
(3)如果没有持仓,则直接按信号方向开仓。

这种设计适合处理一只股票同时属于多个 AI basket 的情况。

(9)移动止损与硬止损

策略内置硬止损和移动止损。

硬止损用于控制单笔亏损。当价格达到预设亏损阈值时,直接平仓。

移动止损用于保护浮盈。当持仓浮盈达到启动阈值后,策略开始记录最高浮盈。如果后续盈利从峰值回撤超过允许范围,则触发平仓。

这种方式的思路是:亏损快速截断,盈利尽量让趋势继续发展。

交易流程

(1)扫描交易所市场,筛选 TradFi EQUITY 股票合约。
(2)使用 LLM 为每只股票建立多 basket 画像。
(3)根据画像构建 AI 子板块 basket。
(4)使用 BraveSearch 获取板块和重点股票信息。
(5)LLM 汇总板块情报。
(6)实时计算 basket 内涨跌幅、ZScore、leader 和状态。
(7)每日统计 basket 内 source → target 的 K 线跟随关系。
(8)当 leader 出现价格突破后,检查是否存在滞后 follower。
(9)针对 leader 搜索最新新闻。
(10)LLM 判断新闻是否解释本次价格异动。
(11)判断新闻是否可能传导到 follower。
(12)生成 confirmed signal。
(13)notify 模式只提示,trade 模式自动交易。
(14)持仓后使用硬止损和移动止损进行风控。

策略特点

(1)自动识别 TradFi 股票类合约,不需要手动维护股票池。
(2)只允许普通股票或 ADR 进入 basket,避免 ETF、指数、杠杆 ETF 污染统计。
(3)支持一只股票同时属于多个 AI 子板块,更贴近真实市场叙事。
(4)使用 LLM 对股票进行结构化画像,自动生成板块归属、业务解释和搜索关键词。
(5)使用 BraveSearch 获取实时板块信息和个股新闻。
(6)使用 LLM 汇总 AI 子板块趋势、催化和风险。
(7)通过 ZScore 识别 basket 内 leader 异动。
(8)通过 K 线统计验证 leader / follower 关系。
(9)采用“价格异常优先,新闻追因确认”的交易逻辑。
(10)支持 notify 和 trade 两种模式。
(11)支持同向加仓、反向平仓再开。
(12)内置硬止损和移动止损。
(13)使用发明者 LogStatus 表格展示 basket 实时状态、板块情报、信号、交易和步骤日志。

适用场景

本策略适合用于观察和研究 AI 主题内部的资金轮动和板块扩散。

尤其适合以下场景:

(1)AI 算力、HBM、AI 服务器、光模块、云计算等子板块轮动明显时。
(2)某个 AI 子板块出现 leader 明显突破时。
(3)市场围绕某个 AI 叙事进行扩散炒作时。
(4)希望用 LLM 辅助理解公司业务、板块关系和新闻影响时。
(5)希望先用 notify 模式观察 AI 股票合约联动信号时。

本策略更适合作为 AI 主题轮动研究工具和半自动交易系统,而不是一个可以直接无脑运行的成熟盈利系统。

风险提示

(1)TradFi 股票永续合约并不等同于直接持有美股,没有分红和股东权益。
(2)股票类永续合约可能存在资金费率、流动性、溢价折价和交易规则变化风险。
(3)LLM 可能错误分类股票,或误判新闻影响。
(4)BraveSearch 新闻可能滞后、不完整,或存在噪声。
(5)价格突破后,leader 可能已经充分反应,followers 不一定补涨。
(6)历史统计关系不代表未来一定有效。
(7)AI 热点板块波动较大,容易出现快速冲高回落。
(8)策略目前仍需要继续加强账户级风控、成交确认、单板块敞口限制和信号复盘。
(9)实盘前建议长期使用 notify 模式观察 confirmed signal 的质量。
(10)不建议高杠杆运行。

使用建议

(1)首次运行建议执行重新分析,让系统重新建立股票画像和 basket。
(2)先使用 notify 模式运行一段时间,观察股票分类是否合理。
(3)重点检查 confirmed signal,而不是 price_only 信号。
(4)观察大模型新闻追因是否能解释真实价格异动。
(5)检查 followers 是否真的存在滞后反应。
(6)确认 K 线统计关系 active edge 是否稳定。
(7)如果发现 ETF、指数或不相关股票混入 basket,应重新分析或手动调整 prompt。
(8)切换 trade 模式前,应先确认风控参数和仓位比例。
(9)建议从小仓位开始,不建议直接重仓运行。
(10)定期复盘信号命中率、滑点、资金费率和移动止损效果。

Source
Python
# -*- coding: utf-8 -*-
"""
AI产业链 TradFi EQUITY Basket 策略 - 发明者量化 Python V2

核心变化:
1. 只筛选 underlyingType == EQUITY 的 TradFi 合约。
2. LLM 对每个股票建模时允许分配到多个 basket。
   例如 GOOGL 可以同时属于 ai_cloud、ai_model_application、ai_ads_platform。
3. 每个 basket 独立做实时涨跌、ZScore、leader/follower 分析。
4. 每个 basket 内再做 K线统计:相关性、突破后跟随概率、最佳滞后。
5. 每一步都写入 store 并记录 step_logs,便于复盘和断点恢复。

默认只通知,不实盘。
"""

import json
import math
import re
import time
import urllib.parse
import urllib.request


# =========================
# 参数区
# =========================

# 以下常量由平台参数注入(PARAMS_UPDATE),此处不再硬编码:
#   LLM_API_KEY / LLM_BASE_URL / LLM_MODEL / BRAVE_KEY / TRADE_MODE
#   CHECK_INTERVAL_S / MARKET_REFRESH_S / REALTIME_REFRESH_S / STATS_REFRESH_S
#   INTEL_REFRESH_S / PRICE_NEWS_COOLDOWN_S / SIGNAL_COOLDOWN_S
#   KLINE_LIMIT / SYNC_WINDOW / RETURN_WINDOW / MAX_LAG_BARS
#   BALANCE_THRESHOLD / BREAKOUT_THRESHOLD / MIN_CORR / MIN_FOLLOW_SUCCESS
#   MIN_EDGE_SCORE / MIN_NEWS_CONFIDENCE / BASE_POS_PCT / MAX_SINGLE_POS_PCT
#   MAX_TOTAL_POSITIONS / LEVERAGE / HARD_STOP_PCT / TRAIL_ACTIVATE_PCT / TRAIL_GIVEBACK_PCT

# 非参数常量(结构化/枚举型,保留在代码中)
STORE_KEY = "ai_equity_basket_store_v2"
KLINE_PERIOD = PERIOD_H1

BASKET_DEFINITIONS = [
    {"id": "ai_compute", "name": "AI算力芯片", "desc": "GPU、CPU、AI加速器、ASIC、SoC"},
    {"id": "ai_memory", "name": "AI存储", "desc": "HBM、DRAM、NAND、企业存储"},
    {"id": "ai_server", "name": "AI服务器", "desc": "AI服务器、整机、ODM、企业硬件"},
    {"id": "ai_cloud", "name": "AI云和算力租赁", "desc": "公有云、GPU云、AI基础设施租赁"},
    {"id": "ai_model_application", "name": "AI模型和应用", "desc": "大模型、企业软件、AI应用、Agent平台"},
    {"id": "ai_semicap", "name": "半导体设备", "desc": "光刻、刻蚀、沉积、量测、先进封装设备"},
    {"id": "ai_foundry", "name": "晶圆代工", "desc": "先进制程、CoWoS、晶圆制造"},
    {"id": "ai_optical_networking", "name": "光通信和网络", "desc": "光模块、交换机、网络设备、互联芯片"},
    {"id": "ai_power_cooling", "name": "电力和散热", "desc": "数据中心电力、液冷、散热、能源基础设施"},
    {"id": "ai_platform_ads", "name": "AI平台广告", "desc": "搜索、推荐、广告、内容平台中的AI变现"},
]


# =========================
# 主循环
# =========================

def main():
    Log("AI Equity Basket V2 启动 | mode:", TRADE_MODE, "#00AAFF")
    store = load_store()
    last_dashboard = 0

    while True:
        try:
            handle_command(store)
            now = int(time.time())

            if now - store["last_market_refresh"] > MARKET_REFRESH_S:
                refresh_equity_universe(store, force_reanalyze=False)
                store["last_market_refresh"] = now
                save_store(store)

            if now - store["last_realtime_refresh"] > REALTIME_REFRESH_S:
                refresh_basket_realtime(store)
                store["last_realtime_refresh"] = now
                save_store(store)
                show_dashboard(store)
                last_dashboard = now

            if now - store["last_stats_refresh"] > STATS_REFRESH_S:
                rebuild_basket_stats(store)
                store["last_stats_refresh"] = now
                save_store(store)

            if BRAVE_KEY and now - store["last_intel_refresh"] > INTEL_REFRESH_S:
                refresh_basket_intelligence(store)
                store["last_intel_refresh"] = now
                save_store(store)

            scan_price_breakouts(store)
            monitor_positions(store)

            if now - last_dashboard > 60:
                show_dashboard(store)
                last_dashboard = now

            save_store(store)

        except Exception as e:
            Log("主循环异常:", str(e), "#FF0000")

        Sleep(CHECK_INTERVAL_S * 1000)


# =========================
# Store
# =========================

def new_store():
    return {
        "markets": {},              # symbol -> market
        "profiles": {},             # symbol -> LLM profile
        "baskets": {},              # basket_id -> basket object
        "realtime": {},             # basket_id -> realtime analysis
        "edges": [],                # basket内统计关系
        "basket_intel": {},         # basket_id -> module intelligence
        "news_events": [],          # price-triggered news explanation
        "news_probe": {},           # symbol -> last probe timestamp
        "signals": [],
        "trades": [],
        "signal_seen": {},
        "step_logs": [],
        "last_market_refresh": 0,
        "last_realtime_refresh": 0,
        "last_stats_refresh": 0,
        "last_intel_refresh": 0,
    }


def load_store():
    store = _G(STORE_KEY)
    if not store:
        return new_store()
    return migrate_store(store)


def migrate_store(store):
    fresh = new_store()
    for key, value in fresh.items():
        if key not in store:
            store[key] = value
    return store


def save_store(store):
    _G(STORE_KEY, store)


def record_step(store, step, data, color="#999999"):
    row = {
        "time": int(time.time()),
        "step": step,
        "data": data,
    }
    store["step_logs"].insert(0, row)
    store["step_logs"] = store["step_logs"][:500]
    Log("STEP | {} | {}".format(step, json.dumps(data, ensure_ascii=False)[:500]), color)
    save_store(store)


def handle_command(store):
    cmd = GetCommand()
    if not cmd:
        return
    if cmd == "reanalyze:all":
        store.clear()
        store.update(new_store())
        refresh_equity_universe(store, force_reanalyze=True)
        refresh_basket_realtime(store)
        rebuild_basket_stats(store)
        if BRAVE_KEY:
            refresh_basket_intelligence(store)
        save_store(store)
        Log("已重新分析全部EQUITY合约", "#00CC66")
    elif cmd == "refresh:markets":
        refresh_equity_universe(store, force_reanalyze=False)
        save_store(store)
    elif cmd == "refresh:realtime":
        refresh_basket_realtime(store)
        save_store(store)
    elif cmd == "refresh:stats":
        rebuild_basket_stats(store)
        save_store(store)
    elif cmd == "refresh:intel":
        refresh_basket_intelligence(store)
        save_store(store)
    elif cmd == "clear:store":
        store.clear()
        store.update(new_store())
        save_store(store)
        Log("已清空V2 store", "#FFAA00")


# =========================
# 1. EQUITY 合约筛选 + 股票画像
# =========================

def refresh_equity_universe(store, force_reanalyze):
    record_step(store, "start_equity_universe_refresh", {"force_reanalyze": force_reanalyze}, "#00AAFF")
    ms = exchange.GetMarkets()
    symbols = []

    for key, market in ms.items():
        info = market.get("Info", {}) or {}
        sub_type = info.get("underlyingSubType", []) or []
        underlying_type = info.get("underlyingType", "")

        if ".swap" not in key:
            continue
        if "TradFi" not in sub_type:
            continue
        if underlying_type != "EQUITY":
            continue

        symbol = normalize_symbol(key)
        store["markets"][symbol] = {
            "symbol": symbol,
            "contract": key,
            "underlyingType": underlying_type,
            "amountPrecision": market.get("AmountPrecision", 0),
            "pricePrecision": market.get("PricePrecision", 2),
            "ctVal": market.get("CtVal", 1) or 1,
            "minQty": market.get("MinQty", 0) or 0,
        }
        symbols.append(symbol)

    symbols = sorted(symbols)
    todo = [s for s in symbols if force_reanalyze or s not in store["profiles"]]
    record_step(store, "equity_contracts_filtered", {"total_equity": len(symbols), "to_analyze": len(todo)}, "#00AAFF")

    for i, symbol in enumerate(todo):
        market = store["markets"][symbol]
        Log("LLM建模 {}/{} | {} | {}".format(i + 1, len(todo), symbol, market["contract"]), "#00AAFF")
        profile = analyze_equity_profile(symbol, market)
        store["profiles"][symbol] = profile
        record_step(store, "profile_saved", {
            "symbol": symbol,
            "company_name": profile.get("company_name"),
            "baskets": profile.get("baskets"),
            "tradable": profile.get("tradable"),
            "model_confidence": profile.get("model_confidence"),
            "reason": profile.get("reason"),
        }, "#00CC66" if profile.get("tradable") else "#999999")
        Sleep(1000)

    rebuild_baskets_from_profiles(store)
    record_step(store, "universe_refresh_done", {
        "profiles": len(store["profiles"]),
        "baskets": {bid: len(b["members"]) for bid, b in store["baskets"].items()},
    }, "#00AAFF")


def analyze_equity_profile(symbol, market):
    if not LLM_API_KEY:
        return fallback_profile(symbol, market)

    schema = {
        "symbol": symbol,
        "asset_type": "stock|adr|etf|leveraged_etf|index|fund|other",
        "company_name": "",
        "sector": "",
        "business_summary": "",
        "tradable": False,
        "model_confidence": 0,
        "baskets": [
            {
                "id": "ai_cloud",
                "exposure": 0.0,
                "role": "leader|follower|both|observer",
                "sub_theme": "",
                "reason": ""
            }
        ],
        "search_names": [],
        "event_keywords": [],
        "negative_keywords": [],
        "possible_leaders": [],
        "possible_followers": [],
        "brave_queries": [],
        "reason": ""
    }

    system_prompt = (
        "你是美股AI产业链建模器。你只处理EQUITY股票合约。"
        "一个公司可以属于多个basket,例如GOOGL同时属于ai_cloud、ai_model_application、ai_platform_ads。"
        "只输出JSON,不要markdown。所有reason、summary、sub_theme必须使用中文。"
        "字段名必须和schema完全一致,不允许输出confidence替代model_confidence。"
    )

    user_prompt = "\n".join([
        "为该股票合约建模:",
        "symbol: " + symbol,
        "可选basket定义:",
        json.dumps(BASKET_DEFINITIONS, ensure_ascii=False),
        "输出schema:",
        json.dumps(schema, ensure_ascii=False),
        "要求:",
        "0. 必须先判断asset_type。只有普通股票或ADR可以进入basket;ETF、杠杆ETF、指数、基金、商品/能源/地区ETF必须tradable=false且baskets=[]。",
        "1. baskets可以有多个,但只填真正有业务/股价驱动关系的basket。",
        "2. exposure 0-1,表示该公司对该basket的AI产业链暴露。",
        "3. exposure < 0.35 的basket不要填。",
        "4. role说明在该basket中更像leader、follower、both还是observer。",
        "5. tradable=true 表示至少有一个basket exposure>=0.35。",
        "6. search_names使用公司名,不要只用ticker。",
        "7. possible_leaders/possible_followers按产业链和股价传导填写ticker。",
        "8. brave_queries要围绕公司名+AI业务关键词。",
        "9. reason、baskets[].reason、baskets[].sub_theme 必须是中文。",
        "10. 字段必须严格一致:只能使用 model_confidence,禁止使用 confidence。",
    ])

    data = call_llm_json(system_prompt, user_prompt, 1400)
    if not data:
        return fallback_profile(symbol, market)
    return sanitize_profile(symbol, market, normalize_profile_llm_output(data))


def normalize_profile_llm_output(data):
    if "model_confidence" not in data and "confidence" in data:
        data["model_confidence"] = data.get("confidence", 0)
    return data


def sanitize_profile(symbol, market, data):
    valid_ids = {x["id"] for x in BASKET_DEFINITIONS}
    asset_type = str(data.get("asset_type", "other") or "other").lower()
    baskets = []
    if asset_type in ["stock", "adr"]:
        for item in as_list(data.get("baskets")):
            if not isinstance(item, dict):
                continue
            bid = item.get("id")
            exposure = float(item.get("exposure", 0) or 0)
            if bid not in valid_ids or exposure < 0.35:
                continue
            baskets.append({
                "id": bid,
                "exposure": round(min(1, max(0, exposure)), 3),
                "role": normalize_role(item.get("role", "observer")),
                "sub_theme": item.get("sub_theme", "") or "",
                "reason": item.get("reason", "") or "",
            })

    return {
        "symbol": symbol,
        "contract": market["contract"],
        "asset_type": asset_type,
        "company_name": data.get("company_name", "") or "",
        "sector": data.get("sector", "") or "",
        "business_summary": data.get("business_summary", "") or "",
        "tradable": asset_type in ["stock", "adr"] and bool(data.get("tradable")) and len(baskets) > 0,
        "model_confidence": normalize_confidence(data.get("model_confidence", 0)),
        "baskets": baskets,
        "search_names": as_list(data.get("search_names")),
        "event_keywords": as_list(data.get("event_keywords")),
        "negative_keywords": as_list(data.get("negative_keywords")),
        "possible_leaders": unique_upper(as_list(data.get("possible_leaders"))),
        "possible_followers": unique_upper(as_list(data.get("possible_followers"))),
        "brave_queries": as_list(data.get("brave_queries")),
        "reason": data.get("reason", "") or "",
        "updated_at": iso_time(),
    }


def normalize_role(role):
    role = str(role or "observer")
    if role not in ["leader", "follower", "both", "observer"]:
        return "observer"
    return role


def normalize_confidence(value):
    try:
        v = float(value or 0)
    except Exception:
        return 0
    if 0 < v <= 1:
        v = v * 100
    return int(round(max(0, min(100, v))))


def fallback_profile(symbol, market):
    return {
        "symbol": symbol,
        "contract": market["contract"],
        "asset_type": "other",
        "company_name": "",
        "sector": "",
        "business_summary": "fallback; LLM not configured",
        "tradable": False,
        "model_confidence": 0,
        "baskets": [],
        "search_names": [symbol],
        "event_keywords": [],
        "negative_keywords": [],
        "possible_leaders": [],
        "possible_followers": [],
        "brave_queries": [],
        "reason": "fallback",
        "updated_at": iso_time(),
    }


# =========================
# 2. Basket 构建与实时分析
# =========================

def rebuild_baskets_from_profiles(store):
    baskets = {}
    for definition in BASKET_DEFINITIONS:
        baskets[definition["id"]] = {
            "id": definition["id"],
            "name": definition["name"],
            "desc": definition["desc"],
            "members": [],
        }

    for symbol, profile in store["profiles"].items():
        if not profile.get("tradable"):
            continue
        for b in profile.get("baskets", []):
            bid = b["id"]
            if bid not in baskets:
                continue
            baskets[bid]["members"].append({
                "symbol": symbol,
                "exposure": b["exposure"],
                "role": b["role"],
                "sub_theme": b["sub_theme"],
                "reason": b["reason"],
            })

    store["baskets"] = {bid: b for bid, b in baskets.items() if b["members"]}
    save_store(store)


def refresh_basket_realtime(store):
    record_step(store, "start_realtime_analysis", {"baskets": len(store["baskets"])}, "#00AAFF")
    realtime = {}

    for bid, basket in store["baskets"].items():
        symbols = [m["symbol"] for m in basket["members"] if m["symbol"] in store["markets"]]
        if len(symbols) < 2:
            continue

        zdata = calc_basket_zscores(store, symbols)
        members = []
        for symbol in symbols:
            pct = symbol_change_pct(store, symbol)
            profile = store["profiles"][symbol]
            member_info = basket_member_info(basket, symbol)
            members.append({
                "symbol": symbol,
                "company": profile.get("company_name", ""),
                "pct": round(pct, 3),
                "z": zdata["zscores"].get(symbol, 0) if zdata else 0,
                "exposure": member_info.get("exposure", 0),
                "role": member_info.get("role", ""),
                "sub_theme": member_info.get("sub_theme", ""),
            })

        members.sort(key=lambda x: abs(x["z"]), reverse=True)
        leader = members[0]["symbol"] if members else ""
        state = "INIT"
        max_abs = abs(members[0]["z"]) if members else 0
        if zdata:
            state = "WATCH_BREAKOUT" if zdata["maxAbs"] < BALANCE_THRESHOLD else "BREAKOUT_DETECTED" if zdata["maxAbs"] >= BREAKOUT_THRESHOLD else "BALANCE_SEEKING"

        realtime[bid] = {
            "id": bid,
            "name": basket["name"],
            "state": state,
            "balanced": zdata["maxAbs"] < BALANCE_THRESHOLD if zdata else False,
            "maxAbs": round(zdata["maxAbs"], 3) if zdata else 0,
            "leader": leader,
            "avgPct": round(mean([x["pct"] for x in members]), 3) if members else 0,
            "members": members,
            "updatedAt": int(time.time()),
        }

        record_step(store, "basket_realtime_saved", {
            "basket": bid,
            "state": state,
            "leader": leader,
            "maxAbs": realtime[bid]["maxAbs"],
            "avgPct": realtime[bid]["avgPct"],
        }, "#FF0000" if state == "BREAKOUT_DETECTED" else "#00CC66")

    store["realtime"] = realtime
    save_store(store)


def basket_member_info(basket, symbol):
    for m in basket["members"]:
        if m["symbol"] == symbol:
            return m
    return {}


# =========================
# 2.5 BraveSearch 板块情报
# =========================

def refresh_basket_intelligence(store):
    if not BRAVE_KEY:
        record_step(store, "basket_intel_skipped", {"reason": "BRAVE_KEY missing"}, "#FFAA00")
        return
    if not LLM_API_KEY:
        record_step(store, "basket_intel_skipped", {"reason": "LLM_API_KEY missing"}, "#FFAA00")
        return

    record_step(store, "start_basket_intelligence", {"baskets": len(store["baskets"])}, "#00AAFF")
    intel = {}

    for bid, basket in store["baskets"].items():
        members = sorted(basket["members"], key=lambda x: x.get("exposure", 0), reverse=True)
        top_symbols = [m["symbol"] for m in members[:8]]
        basket_news = fetch_basket_news(basket, top_symbols)
        stock_news = {}

        for symbol in top_symbols[:6]:
            profile = store["profiles"].get(symbol, {})
            stock_news[symbol] = fetch_stock_news(profile, bid)
            Sleep(800)

        analysis = analyze_basket_intel_with_llm(store, basket, top_symbols, basket_news, stock_news)
        if analysis:
            analysis["basket"] = bid
            analysis["basket_name"] = basket["name"]
            analysis["symbols"] = top_symbols
            analysis["source_counts"] = {
                "basket_news": len(basket_news),
                "stock_news": sum(len(v) for v in stock_news.values()),
            }
            analysis["updatedAt"] = int(time.time())
            intel[bid] = analysis
            record_step(store, "basket_intel_saved", {
                "basket": bid,
                "trend": analysis.get("trend"),
                "confidence": analysis.get("confidence"),
                "leaders": analysis.get("leader_candidates", []),
            }, "#00CC66")

        Sleep(1000)

    store["basket_intel"] = intel
    save_store(store)


def fetch_basket_news(basket, symbols):
    queries = [
        '"{}" AI stocks supply chain demand'.format(basket["name"]),
        '"{}" data center AI demand'.format(basket["desc"]),
        '"{}" earnings guidance AI'.format(" ".join(symbols[:5])),
    ]
    return brave_news_search(queries, 4)


def fetch_stock_news(profile, basket_id):
    names = profile.get("search_names", []) or [profile.get("company_name") or profile.get("symbol")]
    keywords = profile.get("event_keywords", [])[:4]
    queries = []

    for name in names[:2]:
        if not name:
            continue
        queries.append('"{}" AI earnings guidance'.format(name))
        queries.append('"{}" data center demand'.format(name))

    for q in profile.get("brave_queries", [])[:2]:
        queries.append(q)

    if keywords:
        queries.append('"{}" {}'.format(names[0], " ".join(keywords[:3])))

    return brave_news_search(queries[:5], 3)


def brave_news_search(queries, count):
    out = []
    seen = {}

    for q in queries:
        try:
            url = (
                "https://api.search.brave.com/res/v1/news/search?q=" +
                urllib.parse.quote(q) +
                "&count=" + str(count) +
                "&freshness=pw"
            )
            raw = http_request(url, "GET", {
                "X-Subscription-Token": BRAVE_KEY,
                "Accept": "application/json",
            }, None, 20)
            data = json.loads(raw)
            for item in data.get("results", []):
                title = item.get("title", "") or ""
                desc = item.get("description", "") or ""
                link = item.get("url", "") or title
                if not link or link in seen:
                    continue
                seen[link] = True
                source = item.get("profile", {}) or {}
                out.append({
                    "title": title,
                    "desc": desc,
                    "url": item.get("url", "") or "",
                    "age": item.get("age", "") or "",
                    "source": source.get("name", "") or "",
                    "query": q,
                })
        except Exception as e:
            Log("BraveSearch失败 | {} | {}".format(q, str(e)), "#FFAA00")
        Sleep(1000)

    return out


def analyze_basket_intel_with_llm(store, basket, top_symbols, basket_news, stock_news):
    schema = {
        "trend": "bullish|bearish|mixed|neutral",
        "confidence": 0,
        "summary": "",
        "key_catalysts": [],
        "key_risks": [],
        "leader_candidates": [{"symbol": "", "reason": ""}],
        "follower_candidates": [{"symbol": "", "reason": ""}],
        "watch_events": [],
        "relations_to_verify": [{"source": "", "target": "", "reason": ""}],
    }

    profiles = {}
    for symbol in top_symbols:
        p = store["profiles"].get(symbol, {})
        profiles[symbol] = {
            "company_name": p.get("company_name"),
            "sector": p.get("sector"),
            "business_summary": p.get("business_summary"),
            "basket_exposure": basket_member_info(basket, symbol),
        }

    user_prompt = "\n".join([
        "请基于BraveSearch抓取的信息,分析这个AI产业链板块。",
        "basket:",
        json.dumps(basket, ensure_ascii=False),
        "member_profiles:",
        json.dumps(profiles, ensure_ascii=False),
        "basket_level_news:",
        json.dumps(basket_news[:15], ensure_ascii=False),
        "stock_level_news:",
        json.dumps(stock_news, ensure_ascii=False),
        "输出schema:",
        json.dumps(schema, ensure_ascii=False),
        "要求:",
        "1. 只基于给定新闻和画像分析,不要凭空编造。",
        "2. leader_candidates/follower_candidates只能从member_profiles里选。",
        "3. relations_to_verify用于后续K线统计重点验证。",
        "4. watch_events写未来需要关注的财报、指引、订单、监管、供需事件。",
        "5. 所有summary、reason、key_catalysts、key_risks、watch_events必须使用中文。",
        "6. 字段名必须和schema完全一致,不要新增或改名。",
    ])

    return call_llm_json("你是AI产业链板块研究员,只输出JSON。所有解释字段必须使用中文。", user_prompt, 1500)


def calc_basket_zscores(store, symbols):
    returns = {}
    min_len = 999999
    for symbol in symbols:
        bars = get_records(store["markets"][symbol]["contract"], KLINE_PERIOD, SYNC_WINDOW + 10)
        if len(bars) < SYNC_WINDOW + 2:
            return None
        r = calc_returns(bars)[-SYNC_WINDOW:]
        returns[symbol] = r
        min_len = min(min_len, len(r))

    latest = {s: returns[s][-1] for s in symbols}
    avg_latest = mean(list(latest.values()))
    dev_series = {s: [] for s in symbols}

    for i in range(min_len):
        row = [returns[s][-min_len + i] for s in symbols]
        row_avg = mean(row)
        for s in symbols:
            dev_series[s].append(returns[s][-min_len + i] - row_avg)

    zscores = {}
    max_abs = 0
    for s in symbols:
        sigma = std(dev_series[s])
        z = (latest[s] - avg_latest) / sigma if sigma > 0 else 0
        zscores[s] = round(z, 3)
        max_abs = max(max_abs, abs(z))
    return {"zscores": zscores, "maxAbs": max_abs}


# =========================
# 3. Basket 内K线统计
# =========================

def rebuild_basket_stats(store):
    record_step(store, "start_kline_stats", {"baskets": len(store["baskets"])}, "#00AAFF")
    edges = []

    for bid, basket in store["baskets"].items():
        symbols = [m["symbol"] for m in basket["members"] if m["symbol"] in store["markets"]]
        if len(symbols) < 2:
            continue

        for source in symbols:
            for target in symbols:
                if source == target:
                    continue
                metric = validate_pair(store, source, target)
                if not metric:
                    continue
                source_exp = basket_member_info(basket, source).get("exposure", 0)
                target_exp = basket_member_info(basket, target).get("exposure", 0)
                score = clamp(metric["corr"] * 0.3 + metric["follow"] * 0.4 + min(source_exp, target_exp) * 0.3, 0, 1)
                edge = {
                    "basket": bid,
                    "source": source,
                    "target": target,
                    "corr": round(metric["corr"], 3),
                    "follow": round(metric["follow"], 3),
                    "lag": metric["lag"],
                    "avgReturn": round(metric["avgReturn"], 6),
                    "sample": metric["sample"],
                    "score": round(score, 3),
                    "active": score >= MIN_EDGE_SCORE and metric["corr"] >= MIN_CORR and metric["follow"] >= MIN_FOLLOW_SUCCESS,
                    "updatedAt": int(time.time()),
                }
                edges.append(edge)
                Log("统计关系[{}] | {} | {}->{} | corr:{} follow:{} lag:{} score:{} sample:{}".format(
                    "通过" if edge["active"] else "淘汰",
                    bid,
                    source,
                    target,
                    edge["corr"],
                    edge["follow"],
                    edge["lag"],
                    edge["score"],
                    edge["sample"],
                ), "#00CC66" if edge["active"] else "#999999")
        Sleep(300)

    store["edges"] = edges
    record_step(store, "kline_stats_saved", {
        "edges": len(edges),
        "active": len([e for e in edges if e["active"]]),
    }, "#00AAFF")
    save_store(store)


def validate_pair(store, source, target):
    a = get_records(store["markets"][source]["contract"], KLINE_PERIOD, KLINE_LIMIT)
    b = get_records(store["markets"][target]["contract"], KLINE_PERIOD, KLINE_LIMIT)
    a, b = align_bars(a, b)
    if len(a) < RETURN_WINDOW + MAX_LAG_BARS + 10:
        return None

    ar = calc_returns(a)
    br = calc_returns(b)
    corr = max(0, pearson(ar[-RETURN_WINDOW:], br[-RETURN_WINDOW:]))

    best = {"lag": 1, "follow": 0, "avgReturn": 0, "sample": 0}
    for lag in range(1, MAX_LAG_BARS + 1):
        stat = follow_stat(ar, br, lag)
        if stat["follow"] > best["follow"]:
            best = stat
            best["lag"] = lag
    best["corr"] = corr
    return best


def follow_stat(source_returns, target_returns, lag):
    sigma = std(source_returns)
    if sigma <= 0:
        return {"follow": 0, "avgReturn": 0, "sample": 0}
    threshold = 1.5 * sigma
    hit, total, ret_sum = 0, 0, 0
    for i in range(0, len(source_returns) - lag):
        if abs(source_returns[i]) < threshold:
            continue
        direction = 1 if source_returns[i] > 0 else -1
        ret = target_returns[i + lag] * direction
        if ret > 0:
            hit += 1
        ret_sum += ret
        total += 1
    return {"follow": hit / total if total else 0, "avgReturn": ret_sum / total if total else 0, "sample": total}


# =========================
# 4. 突破扫描
# =========================

def scan_price_breakouts(store):
    for bid, rt in store["realtime"].items():
        if rt["state"] != "BREAKOUT_DETECTED":
            continue
        leader = rt["leader"]
        if not leader:
            continue
        z = 0
        for m in rt["members"]:
            if m["symbol"] == leader:
                z = m["z"]
        direction = "long" if z > 0 else "short"
        edges = [e for e in store["edges"] if e["active"] and e["basket"] == bid and e["source"] == leader]
        if not edges:
            record_step(store, "breakout_skip_no_active_edges", {
                "basket": bid,
                "leader": leader,
                "direction": direction,
                "leaderZ": z,
            }, "#FFAA00")
            continue
        followers = []
        for e in edges:
            target_rt = find_realtime_member(rt, e["target"])
            if target_rt and abs(target_rt["z"]) < BREAKOUT_THRESHOLD * 0.75:
                followers.append(e["target"])
        if not followers:
            record_step(store, "breakout_skip_no_lagging_followers", {
                "basket": bid,
                "leader": leader,
                "direction": direction,
                "edge_targets": [e["target"] for e in edges],
            }, "#FFAA00")
            continue

        key = "{}:{}:{}".format(bid, leader, direction)
        if int(time.time()) - store.get("signal_seen", {}).get(key, 0) < SIGNAL_COOLDOWN_S:
            record_step(store, "breakout_skip_signal_cooldown", {
                "basket": bid,
                "leader": leader,
                "direction": direction,
            }, "#999999")
            continue

        event = explain_breakout_with_news(store, bid, leader, direction, z, followers, edges)
        if not event:
            signal = {
                "time": int(time.time()),
                "status": "price_only",
                "basket": bid,
                "leader": leader,
                "direction": direction,
                "leaderZ": z,
                "followers": followers,
                "reason": "价格异常,但新闻追因未确认",
            }
            store["signals"].insert(0, signal)
            store["signals"] = store["signals"][:100]
            record_step(store, "price_breakout_unconfirmed", signal, "#FFAA00")
            continue

        confirmed_followers = []
        for item in event.get("affected_followers", []):
            symbol = item.get("symbol")
            if symbol in followers and float(item.get("impact", 0) or 0) >= 0.45:
                confirmed_followers.append({
                    "symbol": symbol,
                    "impact": float(item.get("impact", 0) or 0),
                    "reason": item.get("reason", ""),
                })

        if not confirmed_followers:
            record_step(store, "news_confirmed_no_followers", {
                "basket": bid,
                "leader": leader,
                "event": event.get("event_type"),
            }, "#FFAA00")
            continue

        signal = {
            "time": int(time.time()),
            "status": "confirmed",
            "basket": bid,
            "leader": leader,
            "direction": direction,
            "leaderZ": z,
            "followers": [x["symbol"] for x in confirmed_followers],
            "follower_details": confirmed_followers,
            "event": event,
            "reason": event.get("reason", ""),
        }
        store["signals"].insert(0, signal)
        store["signals"] = store["signals"][:100]
        store.setdefault("signal_seen", {})[key] = int(time.time())
        record_step(store, "confirmed_trade_signal", signal, "#FF0000")
        execute_confirmed_signal(store, signal)


def find_realtime_member(rt, symbol):
    for m in rt.get("members", []):
        if m["symbol"] == symbol:
            return m
    return None


def explain_breakout_with_news(store, bid, leader, direction, leader_z, followers, edges):
    if not BRAVE_KEY:
        record_step(store, "breakout_news_skipped", {"leader": leader, "reason": "BRAVE_KEY missing"}, "#FFAA00")
        return None
    if not LLM_API_KEY:
        record_step(store, "breakout_news_skipped", {"leader": leader, "reason": "LLM_API_KEY missing"}, "#FFAA00")
        return None

    now = int(time.time())
    probe_key = "{}:{}".format(bid, leader)
    if now - store.get("news_probe", {}).get(probe_key, 0) < PRICE_NEWS_COOLDOWN_S:
        record_step(store, "breakout_news_cooldown", {
            "basket": bid,
            "leader": leader,
            "remain_s": PRICE_NEWS_COOLDOWN_S - (now - store.get("news_probe", {}).get(probe_key, 0)),
        }, "#999999")
        return None
    store.setdefault("news_probe", {})[probe_key] = now

    profile = store["profiles"].get(leader, {})
    news = fetch_stock_news(profile, bid)
    if not news:
        record_step(store, "breakout_news_empty", {"basket": bid, "leader": leader}, "#999999")
        return None

    event = analyze_breakout_news_with_llm(store, bid, leader, direction, leader_z, followers, edges, news)
    if not event:
        return None

    event["basket"] = bid
    event["leader"] = leader
    event["news"] = news[:8]
    event["createdAt"] = now
    store["news_events"].insert(0, event)
    store["news_events"] = store["news_events"][:100]
    save_store(store)

    if not event.get("explains_price_move"):
        record_step(store, "breakout_news_not_explained", {
            "basket": bid,
            "leader": leader,
            "event_type": event.get("event_type"),
            "confidence": event.get("confidence"),
            "reason": event.get("reason"),
        }, "#FFAA00")
        return None
    if event.get("direction") != ("bullish" if direction == "long" else "bearish"):
        record_step(store, "breakout_news_direction_mismatch", {
            "basket": bid,
            "leader": leader,
            "price_direction": direction,
            "news_direction": event.get("direction"),
        }, "#FFAA00")
        return None
    event["confidence"] = normalize_confidence(event.get("confidence", 0))
    if event["confidence"] < MIN_NEWS_CONFIDENCE:
        return None
    if not event.get("trade_allowed"):
        record_step(store, "breakout_news_trade_not_allowed", {
            "basket": bid,
            "leader": leader,
            "event_type": event.get("event_type"),
            "reason": event.get("reason"),
        }, "#FFAA00")
        return None

    record_step(store, "breakout_news_confirmed", {
        "basket": bid,
        "leader": leader,
        "event_type": event.get("event_type"),
        "confidence": event.get("confidence"),
        "followers": event.get("affected_followers"),
    }, "#00CC66")
    return event


def analyze_breakout_news_with_llm(store, bid, leader, direction, leader_z, followers, edges, news):
    schema = {
        "explains_price_move": False,
        "event_type": "earnings|guidance|order|supply_chain|analyst|product|macro|other",
        "direction": "bullish|bearish|neutral",
        "confidence": 0,
        "affected_followers": [{"symbol": "", "impact": 0, "reason": ""}],
        "trade_allowed": False,
        "reason": ""
    }

    basket = store["baskets"].get(bid, {})
    rt = store["realtime"].get(bid, {})
    intel = store.get("basket_intel", {}).get(bid, {})
    edge_pack = []
    for e in edges:
        if e["target"] in followers:
            edge_pack.append(e)

    prompt = "\n".join([
        "价格异常后进行新闻追因。",
        "basket:",
        json.dumps(basket, ensure_ascii=False),
        "basket_intel:",
        json.dumps(intel, ensure_ascii=False),
        "realtime:",
        json.dumps(rt, ensure_ascii=False),
        "leader:",
        leader,
        "price_direction:",
        direction,
        "leader_z:",
        str(leader_z),
        "candidate_followers:",
        json.dumps(followers, ensure_ascii=False),
        "validated_edges:",
        json.dumps(edge_pack, ensure_ascii=False),
        "leader_news_from_brave:",
        json.dumps(news[:12], ensure_ascii=False),
        "输出schema:",
        json.dumps(schema, ensure_ascii=False),
        "规则:",
        "1. 只基于Brave新闻、板块情报和统计关系判断。",
        "2. explains_price_move=true 只给能解释leader价格异常的明确新闻。",
        "3. affected_followers只能从candidate_followers中选择。",
        "4. impact 0-1,>=0.45才适合交易。",
        "5. 如果新闻方向和价格方向不一致,trade_allowed=false。",
        "6. reason 和 affected_followers[].reason 必须使用中文。",
        "7. 字段名必须和schema完全一致,不要新增或改名。",
    ])

    return call_llm_json("你是价格异动新闻追因和产业链传导分析器,只输出JSON,所有reason必须使用中文。", prompt, 1400)


def execute_confirmed_signal(store, signal):
    for follower in signal.get("followers", []):
        detail = find_follower_detail(signal, follower)
        if not detail:
            continue
        execute_symbol_trade(store, signal, follower, detail)
        Sleep(500)


def find_follower_detail(signal, symbol):
    for x in signal.get("follower_details", []):
        if x.get("symbol") == symbol:
            return x
    return None


def execute_symbol_trade(store, signal, symbol, detail):
    if symbol not in store["markets"]:
        return False

    direction = signal["direction"]
    if TRADE_MODE != "trade":
        return open_or_add_position(store, signal, symbol, detail, add_mode=False)

    pos = get_position_by_symbol(symbol)
    if pos:
        pos_dir = position_direction(pos)
        if pos_dir == direction:
            Log("同向已有持仓,执行加仓 | {} | {}".format(symbol, direction), "#00CC66")
            return open_or_add_position(store, signal, symbol, detail, add_mode=True)
        Log("反向已有持仓,先平仓再反手 | {} | old:{} new:{}".format(symbol, pos_dir, direction), "#FFAA00")
        close_existing_position(store, symbol, pos, "reverse_signal")
        Sleep(1000)

    return open_or_add_position(store, signal, symbol, detail, add_mode=False)


def open_or_add_position(store, signal, symbol, detail, add_mode):
    if TRADE_MODE != "trade":
        record_step(store, "trade_notify", {
            "symbol": symbol,
            "direction": signal["direction"],
            "add_mode": add_mode,
            "basket": signal["basket"],
            "leader": signal["leader"],
            "impact": detail.get("impact"),
            "reason": detail.get("reason"),
        }, "#00CC66")
        return True

    if open_position_count() >= MAX_TOTAL_POSITIONS and not add_mode:
        record_step(store, "trade_blocked", {"symbol": symbol, "reason": "max positions"}, "#FFAA00")
        return False

    market = store["markets"][symbol]
    acc = exchange.GetAccount()
    ticker = exchange.GetTicker(market["contract"])
    if not acc or not ticker or ticker["Last"] <= 0:
        return False

    size_pct = BASE_POS_PCT * clamp(float(detail.get("impact", 0.5) or 0.5), 0.4, 1.0)
    qty = _N(get_equity(acc) * size_pct * LEVERAGE / ticker["Last"] / market["ctVal"], market["amountPrecision"])
    if qty <= 0 or qty < market["minQty"]:
        record_step(store, "trade_blocked", {"symbol": symbol, "reason": "qty too small", "qty": qty}, "#FFAA00")
        return False

    # 单标的最大仓位约束(含已有持仓)
    equity = get_equity(acc)
    existing = _G(position_state_key(symbol)) or {}
    cur_qty = existing.get("qty", 0) if add_mode else 0
    max_qty = _N(equity * MAX_SINGLE_POS_PCT * LEVERAGE / ticker["Last"] / market["ctVal"], market["amountPrecision"])
    if cur_qty + qty > max_qty:
        qty = _N(max(0, max_qty - cur_qty), market["amountPrecision"])
    if qty <= 0 or qty < market["minQty"]:
        record_step(store, "trade_blocked", {"symbol": symbol, "reason": "single position cap reached", "cur": cur_qty, "max": max_qty}, "#FFAA00")
        return False

    side = "buy" if signal["direction"] == "long" else "sell"
    oid = exchange.CreateOrder(market["contract"], side, -1, qty)
    if not oid:
        record_step(store, "trade_failed", {"symbol": symbol, "side": side, "qty": qty}, "#FF0000")
        return False

    state = _G(position_state_key(symbol)) or {
        "symbol": symbol,
        "direction": signal["direction"],
        "qty": 0,
        "entry": ticker["Last"],
        "peakPnlPct": 0,
        "trailActive": False,
        "sources": [],
    }
    state["direction"] = signal["direction"]
    state["qty"] = state.get("qty", 0) + qty
    state["entry"] = ticker["Last"]
    state["stop"] = ticker["Last"] * (1 - HARD_STOP_PCT / 100.0) if signal["direction"] == "long" else ticker["Last"] * (1 + HARD_STOP_PCT / 100.0)
    state["sources"].append({
        "basket": signal["basket"],
        "leader": signal["leader"],
        "event": signal.get("event", {}).get("event_type"),
        "time": int(time.time()),
    })
    _G(position_state_key(symbol), state)

    trade = {
        "time": int(time.time()),
        "type": "add" if add_mode else "open",
        "symbol": symbol,
        "direction": signal["direction"],
        "qty": qty,
        "price": ticker["Last"],
        "basket": signal["basket"],
        "leader": signal["leader"],
        "reason": detail.get("reason"),
    }
    store["trades"].insert(0, trade)
    store["trades"] = store["trades"][:100]
    record_step(store, "trade_opened", trade, "#00CC66")
    return True


def monitor_positions(store):
    for pos in exchange.GetPosition() or []:
        symbol = position_symbol(pos)
        if symbol not in store["markets"]:
            continue
        state = _G(position_state_key(symbol)) or {}
        market = store["markets"][symbol]
        ticker = exchange.GetTicker(market["contract"])
        if not ticker or pos["Price"] <= 0:
            continue

        direction = position_direction(pos)
        pnl_pct = (ticker["Last"] - pos["Price"]) / pos["Price"] * 100 * (1 if direction == "long" else -1)
        if pnl_pct > state.get("peakPnlPct", 0):
            state["peakPnlPct"] = pnl_pct
        if not state.get("trailActive") and state.get("peakPnlPct", 0) >= TRAIL_ACTIVATE_PCT:
            state["trailActive"] = True

        hard_stop = False
        if state.get("stop"):
            hard_stop = ticker["Last"] <= state["stop"] if direction == "long" else ticker["Last"] >= state["stop"]

        trail_hit = False
        if state.get("trailActive"):
            giveback = state.get("peakPnlPct", 0) - pnl_pct
            allowed = max(1.5, state.get("peakPnlPct", 0) * TRAIL_GIVEBACK_PCT / 100.0)
            trail_hit = giveback >= allowed

        if hard_stop or trail_hit:
            close_existing_position(store, symbol, pos, "hard_stop" if hard_stop else "trailing_stop")
        else:
            _G(position_state_key(symbol), state)


def close_existing_position(store, symbol, pos, reason):
    market = store["markets"].get(symbol)
    if not market:
        return False
    side = "closebuy" if position_direction(pos) == "long" else "closesell"
    oid = exchange.CreateOrder(market["contract"], side, -1, pos["Amount"])
    if not oid:
        record_step(store, "trade_close_failed", {"symbol": symbol, "reason": reason}, "#FF0000")
        return False
    _G(position_state_key(symbol), None)
    trade = {
        "time": int(time.time()),
        "type": "close",
        "symbol": symbol,
        "reason": reason,
        "amount": pos["Amount"],
        "profit": pos.get("Profit", 0) or 0,
    }
    store["trades"].insert(0, trade)
    store["trades"] = store["trades"][:100]
    record_step(store, "trade_closed", trade, "#FFAA00")
    return True


def get_position_by_symbol(symbol):
    for pos in exchange.GetPosition() or []:
        if position_symbol(pos) == symbol:
            return pos
    return None


def open_position_count():
    return len(exchange.GetPosition() or [])


def position_state_key(symbol):
    return "ai_basket_pos_v2_" + symbol


def position_symbol(pos):
    return normalize_symbol(pos.get("ContractType") or pos.get("Symbol") or "")


def position_direction(pos):
    long_types = []
    if "PD_LONG" in globals():
        long_types.append(PD_LONG)
    if "PD_LONG_YD" in globals():
        long_types.append(PD_LONG_YD)
    if pos.get("Type") in long_types:
        return "long"
    return "short"


def get_equity(acc):
    return acc.get("Equity") or acc.get("Balance") or acc.get("Stocks") or 0


# =========================
# Dashboard
# =========================

def show_dashboard(store):
    active_edges = len([e for e in store["edges"] if e["active"]])
    header = "AI Equity Basket V2 | {} | mode:{}\n".format(time.strftime("%Y-%m-%d %H:%M:%S"), TRADE_MODE)

    overview = make_table("系统概览", ["项目", "数值"], [
        ["EQUITY合约", len(store["markets"])],
        ["股票画像", len(store["profiles"])],
        ["Basket数量", len(store["baskets"])],
        ["统计关系", "{}/{}".format(active_edges, len(store["edges"]))],
        ["信号", len(store["signals"])],
        ["交易记录", len(store.get("trades", []))],
    ])

    basket_rows = []
    for bid, rt in store["realtime"].items():
        top = []
        for m in rt.get("members", [])[:6]:
            top.append("{}({}%/z{}/e{})".format(m["symbol"], fmt(m["pct"]), fmt(m["z"]), fmt(m["exposure"])))
        basket_rows.append([
            bid,
            rt.get("name", ""),
            rt.get("state", ""),
            fmt(rt.get("avgPct", 0)),
            rt.get("leader", ""),
            fmt(rt.get("maxAbs", 0)),
            len(rt.get("members", [])),
            " ".join(top),
        ])
    baskets_table = make_table("Baskets实时", ["板块", "名称", "状态", "均涨跌%", "Leader", "MaxAbs", "成员", "Top成员"], basket_rows)

    intel_rows = []
    for bid, intel in store.get("basket_intel", {}).items():
        leaders = []
        for x in intel.get("leader_candidates", [])[:3]:
            if isinstance(x, dict):
                leaders.append("{}:{}".format(x.get("symbol", ""), short_text(x.get("reason", ""), 18)))
        intel_rows.append([
            bid,
            intel.get("trend", ""),
            fmt(intel.get("confidence", 0)),
            short_text(intel.get("summary", ""), 46),
            " | ".join(leaders),
            " | ".join([short_text(x, 24) for x in intel.get("key_catalysts", [])[:2]]),
            " | ".join([short_text(x, 24) for x in intel.get("key_risks", [])[:2]]),
        ])
    if not intel_rows:
        intel_rows = [["-", "-", "-", "暂无;配置BRAVE_KEY后自动更新,或发送 refresh:intel", "-", "-", "-"]]
    intel_table = make_table("BraveSearch板块情报", ["板块", "趋势", "置信", "摘要", "候选Leader", "催化", "风险"], intel_rows)

    signal_rows = []
    for s in store.get("signals", [])[:8]:
        signal_rows.append([
            time.strftime("%H:%M:%S", time.localtime(s.get("time", 0))),
            s.get("status", ""),
            s.get("basket", ""),
            s.get("leader", ""),
            s.get("direction", ""),
            fmt(s.get("leaderZ", 0)),
            ",".join(s.get("followers", [])),
            short_text(s.get("reason", ""), 40),
        ])
    if not signal_rows:
        signal_rows = [["-", "-", "-", "-", "-", "-", "-", "暂无"]]
    signals_table = make_table("最近信号", ["时间", "状态", "板块", "Leader", "方向", "Z", "Followers", "原因"], signal_rows)

    trade_rows = []
    for t in store.get("trades", [])[:8]:
        trade_rows.append([
            time.strftime("%H:%M:%S", time.localtime(t.get("time", 0))),
            t.get("type", ""),
            t.get("symbol", ""),
            t.get("direction", ""),
            fmt(t.get("qty", t.get("amount", ""))),
            fmt(t.get("price", "")),
            short_text(t.get("reason", ""), 40),
        ])
    if not trade_rows:
        trade_rows = [["-", "-", "-", "-", "-", "-", "暂无"]]
    trades_table = make_table("最近交易", ["时间", "类型", "标的", "方向", "数量", "价格", "原因"], trade_rows)

    step_rows = []
    for row in store.get("step_logs", [])[:8]:
        step_rows.append([
            time.strftime("%H:%M:%S", time.localtime(row.get("time", 0))),
            row.get("step", ""),
            short_text(json.dumps(row.get("data", {}), ensure_ascii=False), 90),
        ])
    if not step_rows:
        step_rows = [["-", "-", "暂无"]]
    steps_table = make_table("最近步骤", ["时间", "步骤", "内容"], step_rows)

    LogStatus(
        header +
        status_table(overview) +
        status_table(baskets_table) +
        status_table(intel_table) +
        status_table(signals_table) +
        status_table(trades_table) +
        status_table(steps_table)
    )


def make_table(title, cols, rows):
    return {
        "type": "table",
        "title": title,
        "cols": cols,
        "rows": rows,
    }


def status_table(table):
    return "\n`" + json.dumps(table, ensure_ascii=False) + "`\n"


def short_text(text, n):
    text = str(text or "").replace("\n", " ").replace("\r", " ")
    return text[:n] + ("..." if len(text) > n else "")


def fmt(x):
    try:
        return round(float(x), 3)
    except Exception:
        return x


# =========================
# 工具
# =========================

def call_llm_json(system_prompt, user_prompt, max_tokens):
    body = json.dumps({
        "model": LLM_MODEL,
        "temperature": 0.1,
        "max_tokens": max_tokens,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
    })
    try:
        raw = http_request(LLM_BASE_URL, "POST", {
            "Content-Type": "application/json",
            "Authorization": "Bearer " + LLM_API_KEY,
        }, body, 40)
        data = json.loads(raw)
        if data.get("error"):
            Log("LLM错误: " + json.dumps(data["error"], ensure_ascii=False), "#FF0000")
            return None
        return json.loads(clean_json_text(data["choices"][0]["message"]["content"]))
    except Exception as e:
        Log("LLM解析失败: " + str(e), "#FFAA00")
        return None


def http_request(url, method, headers, body, timeout):
    data = body.encode("utf-8") if body else None
    req = urllib.request.Request(url, data=data, headers=headers or {}, method=method)
    resp = urllib.request.urlopen(req, timeout=timeout)
    try:
        return resp.read().decode("utf-8", errors="ignore")
    finally:
        resp.close()


def clean_json_text(text):
    text = re.sub(r"<think>[\s\S]*?</think>", "", text or "").strip()
    text = re.sub(r"```json\s*|```\s*", "", text).strip()
    first = text.find("{")
    last = text.rfind("}")
    return text[first:last + 1] if first >= 0 and last > first else text


def get_records(contract, period, limit):
    try:
        bars = exchange.GetRecords(contract, period) or []
        return bars[-limit:]
    except Exception as e:
        Log("GetRecords失败 | {} | {}".format(contract, str(e)), "#FFAA00")
        return []


def symbol_change_pct(store, symbol):
    bars = get_records(store["markets"][symbol]["contract"], KLINE_PERIOD, 2)
    if len(bars) >= 2 and bars[-2]["Close"] > 0:
        return (bars[-1]["Close"] - bars[-2]["Close"]) / bars[-2]["Close"] * 100
    return 0


def align_bars(a, b):
    mp = {}
    for x in a:
        mp[x["Time"]] = x
    aa, bb = [], []
    for y in b:
        if y["Time"] in mp:
            aa.append(mp[y["Time"]])
            bb.append(y)
    return aa, bb


def calc_returns(bars):
    out = []
    for i in range(1, len(bars)):
        prev = bars[i - 1]["Close"]
        cur = bars[i]["Close"]
        if prev > 0:
            out.append((cur - prev) / prev)
    return out


def normalize_symbol(contract):
    return str(contract or "").replace("_USDT.swap", "").replace(".swap", "").replace("USDT", "").upper()


def as_list(x):
    if x is None:
        return []
    if isinstance(x, list):
        return x
    return [x]


def unique_upper(arr):
    seen, out = {}, []
    for x in arr:
        v = str(x or "").upper().strip()
        if v and v not in seen:
            seen[v] = True
            out.append(v)
    return out


def mean(arr):
    return sum(arr) / len(arr) if arr else 0


def std(arr):
    if len(arr) < 2:
        return 0
    m = mean(arr)
    return math.sqrt(sum((x - m) ** 2 for x in arr) / (len(arr) - 1))


def pearson(a, b):
    n = min(len(a), len(b))
    if n < 3:
        return 0
    a = a[-n:]
    b = b[-n:]
    ma, mb = mean(a), mean(b)
    num = sum((a[i] - ma) * (b[i] - mb) for i in range(n))
    da = sum((x - ma) ** 2 for x in a)
    db = sum((x - mb) ** 2 for x in b)
    return num / math.sqrt(da * db) if da > 0 and db > 0 else 0


def clamp(x, lo, hi):
    return max(lo, min(hi, x))


def iso_time():
    return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
Strategy parameters
Strategy parameters
LLM API Key (Optional)
LLM Base URL (Optional)
LLM Model (Optional)
Brave Search Key (Optional)
Trade Mode
Loop Interval(s)
Market Refresh(s)
Realtime Refresh(s)
Stats Refresh(s)
Intel Refresh(s)
News Probe Cooldown(s)
Signal Cooldown(s)
Kline Limit
Sync Window
Return Window
Max Lag Bars
Balance Threshold
Breakout Threshold
Min Correlation
Min Follow Success
Min Edge Score
Min News Confidence
Base Position %
Max Single Position %
Max Total Positions
Leverage
Hard Stop %
Trail Activate %
Trail Giveback %
Commands
Reanalyze All
reanalyze:all
Refresh Markets
refresh:markets
Refresh Realtime
refresh:realtime
Refresh Stats
refresh:stats
Refresh Intel
refresh:intel
Clear Store
clear:store
Comment
All comments (0)
No data
No data
  • 1
iPhone Download
Forums
PINE Language
© 2015 - ∞ INVENTOR PTE LTD (SG)