策略实盘间通信

功能概述

策略实盘间通信功能允许不同的实盘策略之间进行数据共享和状态同步。通过频道机制,一个实盘可以将自身的状态数据广播给其他实盘,实现跨实盘、跨托管者、跨服务器的数据通信。

核心概念

  • 频道(Channel):每个实盘都拥有一个独立的频道,频道ID即为实盘ID
  • 广播端:使用SetChannelData()函数在频道上发布数据的实盘
  • 订阅端:使用GetChannelData()函数订阅其他实盘频道数据的实盘
  • 状态覆盖:频道仅保存最新状态,新数据会覆盖旧数据,而非消息队列机制

主要特性

  • 非阻塞通信:所有函数调用均为非阻塞式,不会影响策略主流程执行
  • 跨平台支持:支持跨实盘、跨托管者、跨服务器的数据传输
  • 多频道订阅:单个实盘可同时订阅多个不同实盘的频道
  • 灵活的数据格式:支持任何可JSON序列化的数据结构

应用场景

  • 主从策略协同:主策略分析市场并广播交易信号,从策略接收信号并执行交易
  • 多账户同步:在多个交易账户之间同步交易信号和仓位信息
  • 策略监控:监控策略广播运行状态,监控实盘订阅并进行展示或告警
  • 数据共享:共享行情分析、指标计算等结果,避免重复计算

基本用法

广播端示例 - 发布市场数据

function main() {
    var updateId = 0
    var robotId = _G()  // 获取当前实盘ID

    while(true) {
        // 获取市场数据
        var ticker = exchange.GetTicker("BTC_USDT")
        if (!ticker) {
            Sleep(5000)
            continue
        }

        // 准备通道状态数据
        var channelState = {
            robotId: robotId,
            updateId: ++updateId,
            timestamp: Date.now(),
            symbol: "BTC_USDT",
            lastPrice: ticker.Last,
            volume: ticker.Volume,
            high: ticker.High,
            low: ticker.Low
        }

        // 在通道上发布最新状态(覆盖旧状态)
        SetChannelData(channelState)

        // 显示当前通道状态
        LogStatus("通道广播端 [实盘ID: " + robotId + "]\n" +
                  "更新ID: #" + channelState.updateId + "\n" +
                  "时间: " + _D(channelState.timestamp) + "\n" +
                  "交易对: " + channelState.symbol + "\n" +
                  "最新价: $" + channelState.lastPrice.toFixed(2))

        Sleep(60000)  // 每分钟更新一次通道状态
    }
}```
```python
def main():
    updateId = 0
    robotId = _G()  # 获取当前实盘ID

    while True:
        # 获取市场数据
        ticker = exchange.GetTicker("BTC_USDT")
        if not ticker:
            Sleep(5000)
            continue

        # 准备通道状态数据
        channelState = {
            "robotId": robotId,
            "updateId": updateId + 1,
            "timestamp": time.time() * 1000,
            "symbol": "BTC_USDT",
            "lastPrice": ticker["Last"],
            "volume": ticker["Volume"],
            "high": ticker["High"],
            "low": ticker["Low"]
        }
        updateId += 1

        # 在通道上发布最新状态(覆盖旧状态)
        SetChannelData(channelState)

        # 显示当前通道状态
        LogStatus("通道广播端 [实盘ID: {}]\n".format(robotId) +
                  "更新ID: #{}\n".format(channelState["updateId"]) +
                  "时间: {}\n".format(_D(channelState["timestamp"])) +
                  "最新价: ${:.2f}".format(channelState["lastPrice"]))

        Sleep(60000)  # 每分钟更新一次通道状态```
```cpp
function main() {
    // 需要订阅的两个频道ID(根据实际情况修改)
    var channelId1 = "632799"  // 频道1的实盘ID
    var channelId2 = "632800"  // 频道2的实盘ID

    while(true) {
        // 订阅频道1的当前状态
        var state1 = GetChannelData(channelId1)

        // 订阅频道2的当前状态
        var state2 = GetChannelData(channelId2)

        // 构建状态显示
        var statusMsg = "频道订阅端 - 当前订阅状态\n\n"

        // 显示频道1状态
        statusMsg += "═══ 频道1 [" + channelId1 + "] ═══\n"
        if (state1 !== null) {
            statusMsg += "更新ID: #" + state1.updateId + "\n"
            statusMsg += "时间: " + _D(state1.timestamp) + "\n"
            statusMsg += "交易对: " + state1.symbol + "\n"
            statusMsg += "最新价: $" + state1.lastPrice.toFixed(2) + "\n"
        } else {
            statusMsg += "状态: 等待中... (首次调用返回 null)\n"
        }

        statusMsg += "\n"

        // 显示频道2状态
        statusMsg += "═══ 频道2 [" + channelId2 + "] ═══\n"
        if (state2 !== null) {
            statusMsg += "更新ID: #" + state2.updateId + "\n"
            statusMsg += "时间: " + _D(state2.timestamp) + "\n"
            statusMsg += "最新价: $" + state2.lastPrice.toFixed(2) + "\n"
        } else {
            statusMsg += "状态: 等待中... (首次调用返回 null)\n"
        }

        LogStatus(statusMsg)

        Sleep(5000)  // 每5秒订阅一次频道
    }
}```
```python
def main():
    # 需要订阅的两个频道ID(根据实际情况修改)
    channelId1 = "632799"  # 频道1的实盘ID
    channelId2 = "632800"  # 频道2的实盘ID

    while True:
        # 订阅频道1的当前状态
        state1 = GetChannelData(channelId1)

        # 订阅频道2的当前状态
        state2 = GetChannelData(channelId2)

        # 构建状态显示
        statusMsg = "频道订阅端 - 当前订阅状态\n\n"

        # 显示频道1状态
        statusMsg += "═══ 频道1 [{}] ═══\n".format(channelId1)
        if state1 is not None:
            statusMsg += "更新ID: #{}\n".format(state1["updateId"])
            statusMsg += "时间: {}\n".format(_D(state1["timestamp"]))
            statusMsg += "最新价: ${:.2f}\n".format(state1["lastPrice"])
        else:
            statusMsg += "状态: 等待中... (首次调用返回 None)\n"

        statusMsg += "\n"

        # 显示频道2状态
        statusMsg += "═══ 频道2 [{}] ═══\n".format(channelId2)
        if state2 is not None:
            statusMsg += "更新ID: #{}\n".format(state2["updateId"])
            statusMsg += "时间: {}\n".format(_D(state2["timestamp"]))
            statusMsg += "最新价: ${:.2f}\n".format(state2["lastPrice"])
        else:
            statusMsg += "状态: 等待中... (首次调用返回 None)\n"

        LogStatus(statusMsg)

        Sleep(5000)  # 每5秒订阅一次频道```
```cpp

订阅端示例 - 订阅多个频道

function main() {
    var robotId = _G()
    Log("主策略启动,实盘ID:", robotId)

    while(true) {
        // 分析市场,生成交易信号
        var records = exchange.GetRecords("BTC_USDT")
        if (!records || records.length < 20) {
            Sleep(5000)
            continue
        }

        // 简单的均线策略
        var ma5 = TA.MA(records, 5)
        var ma20 = TA.MA(records, 20)
        var signal = "HOLD"

        if (ma5[ma5.length-1] > ma20[ma20.length-1] &&
            ma5[ma5.length-2] <= ma20[ma20.length-2]) {
            signal = "BUY"
        } else if (ma5[ma5.length-1] < ma20[ma20.length-1] &&
                   ma5[ma5.length-2] >= ma20[ma20.length-2]) {
            signal = "SELL"
        }

        // 广播交易信号
        var signalData = {
            timestamp: Date.now(),
            symbol: "BTC_USDT",
            signal: signal,
            price: records[records.length-1].Close,
            ma5: ma5[ma5.length-1],
            ma20: ma20[ma20.length-1]
        }

        SetChannelData(signalData)
        LogStatus("主策略 - 信号广播\n" +
                  "信号: " + signal + "\n" +
                  "价格: $" + signalData.price.toFixed(2) + "\n" +
                  "MA5: " + signalData.ma5.toFixed(2) + "\n" +
                  "MA20: " + signalData.ma20.toFixed(2))

        Sleep(60000)
    }
}```
```python
def main():
    robotId = _G()
    Log("主策略启动,实盘ID:", robotId)

    while True:
        # 分析市场,生成交易信号
        records = exchange.GetRecords("BTC_USDT")
        if not records or len(records) < 20:
            Sleep(5000)
            continue

        # 简单的均线策略
        ma5 = TA.MA(records, 5)
        ma20 = TA.MA(records, 20)
        signal = "HOLD"

        if ma5[-1] > ma20[-1] and ma5[-2] <= ma20[-2]:
            signal = "BUY"
        elif ma5[-1] < ma20[-1] and ma5[-2] >= ma20[-2]:
            signal = "SELL"

        # 广播交易信号
        signalData = {
            "timestamp": time.time() * 1000,
            "symbol": "BTC_USDT",
            "signal": signal,
            "price": records[-1]["Close"],
            "ma5": ma5[-1],
            "ma20": ma20[-1]
        }

        SetChannelData(signalData)
        LogStatus("主策略 - 信号广播\n" +
                  "信号: {}\n".format(signal) +
                  "价格: ${:.2f}\n".format(signalData["price"]) +
                  "MA5: {:.2f}\n".format(signalData["ma5"]) +
                  "MA20: {:.2f}".format(signalData["ma20"]))

        Sleep(60000)```
```cpp

实际应用场景

场景1:主从策略协同交易

主策略(信号广播端)

function main() {
    var masterRobotId = "632799"  // 主策略的实盘ID
    var lastSignal = null

    Log("从策略启动,订阅主策略:", masterRobotId)

    while(true) {
        // 获取主策略的信号
        var signalData = GetChannelData(masterRobotId)

        if (signalData === null) {
            LogStatus("等待主策略信号...")
            Sleep(5000)
            continue
        }

        // 检查是否有新信号
        if (lastSignal !== signalData.signal) {
            Log("收到新信号:", signalData.signal, "价格:", signalData.price)

            // 执行交易
            if (signalData.signal === "BUY") {
                var ticker = exchange.GetTicker(signalData.symbol)
                if (ticker) {
                    exchange.Buy(ticker.Last, 0.01)
                    Log("执行买入,价格:", ticker.Last)
                }
            } else if (signalData.signal === "SELL") {
                var ticker = exchange.GetTicker(signalData.symbol)
                if (ticker) {
                    exchange.Sell(ticker.Last, 0.01)
                    Log("执行卖出,价格:", ticker.Last)
                }
            }

            lastSignal = signalData.signal
        }

        LogStatus("从策略 - 跟随主策略\n" +
                  "当前信号: " + signalData.signal + "\n" +
                  "信号价格: $" + signalData.price.toFixed(2) + "\n" +
                  "信号时间: " + _D(signalData.timestamp))

        Sleep(5000)
    }
}```
```python
def main():
    masterRobotId = "632799"  # 主策略的实盘ID
    lastSignal = None

    Log("从策略启动,订阅主策略:", masterRobotId)

    while True:
        # 获取主策略的信号
        signalData = GetChannelData(masterRobotId)

        if signalData is None:
            LogStatus("等待主策略信号...")
            Sleep(5000)
            continue

        # 检查是否有新信号
        if lastSignal != signalData["signal"]:
            Log("收到新信号:", signalData["signal"], "价格:", signalData["price"])

            # 执行交易
            if signalData["signal"] == "BUY":
                ticker = exchange.GetTicker(signalData["symbol"])
                if ticker:
                    exchange.Buy(ticker["Last"], 0.01)
                    Log("执行买入,价格:", ticker["Last"])
            elif signalData["signal"] == "SELL":
                ticker = exchange.GetTicker(signalData["symbol"])
                if ticker:
                    exchange.Sell(ticker["Last"], 0.01)
                    Log("执行卖出,价格:", ticker["Last"])

            lastSignal = signalData["signal"]

        LogStatus("从策略 - 跟随主策略\n" +
                  "当前信号: {}\n".format(signalData["signal"]) +
                  "信号价格: ${:.2f}\n".format(signalData["price"]) +
                  "信号时间: {}".format(_D(signalData["timestamp"])))

        Sleep(5000)```
```cpp

从策略(信号接收执行端)

function main() {
    // 需要监控的策略实盘ID列表
    var monitorList = ["632799", "632800", "632801"]

    while(true) {
        var table = {
            type: "table",
            title: "策略运行状态监控",
            cols: ["实盘ID", "状态", "最后更新", "交易对", "当前价格", "盈亏"],
            rows: []
        }

        for (var i = 0; i < monitorList.length; i++) {
            var robotId = monitorList[i]
            var data = GetChannelData(robotId)

            if (data !== null) {
                var updateTime = _D(data.timestamp)
                var timeDiff = Date.now() - data.timestamp
                var status = timeDiff < 120000 ? "运行中" : "异常"

                table.rows.push([
                    robotId,
                    status,
                    updateTime,
                    data.symbol || "-",
                    data.lastPrice ? "$" + data.lastPrice.toFixed(2) : "-",
                    data.profit ? data.profit.toFixed(2) + "%" : "-"
                ])
            } else {
                table.rows.push([
                    robotId,
                    "等待数据",
                    "-",
                    "-",
                    "-",
                    "-"
                ])
            }
        }

        LogStatus("`" + JSON.stringify(table) + "`")
        Sleep(10000)
    }
}```
```python
def main():
    # 需要监控的策略实盘ID列表
    monitorList = ["632799", "632800", "632801"]

    while True:
        table = {
            "type": "table",
            "title": "策略运行状态监控",
            "cols": ["实盘ID", "状态", "最后更新", "交易对", "当前价格", "盈亏"],
            "rows": []
        }

        for robotId in monitorList:
            data = GetChannelData(robotId)

            if data is not None:
                updateTime = _D(data["timestamp"])
                timeDiff = time.time() * 1000 - data["timestamp"]
                status = "运行中" if timeDiff < 120000 else "异常"

                table["rows"].append([
                    robotId,
                    status,
                    updateTime,
                    data.get("symbol", "-"),
                    "${:.2f}".format(data["lastPrice"]) if "lastPrice" in data else "-",
                    "{:.2f}%".format(data["profit"]) if "profit" in data else "-"
                ])
            else:
                table["rows"].append([
                    robotId,
                    "等待数据",
                    "-",
                    "-",
                    "-",
                    "-"
                ])

        LogStatus("`" + json.dumps(table) + "`")
        Sleep(10000)```
```cpp

场景2:多策略状态监控

监控策略

API函数说明

SetChannelData(data)

功能:在频道上发布最新状态数据

参数: - data:需要发布的数据,可以是任何可JSON序列化的数据结构

返回值:无

特性: - 非阻塞调用 - 覆盖之前的数据,不累积历史记录 - 自动使用当前实盘ID作为频道ID

数据长度限制: - JSON序列化后不超过1024字节 - 建议仅传输必要的状态信息

详细文档SetChannelData

GetChannelData(robotId)

功能:订阅指定实盘的频道数据

参数: - robotId:要订阅的实盘ID(字符串或数字)

返回值: - 首次调用返回null,需要重试 - 成功后返回频道的最新数据

特性: - 非阻塞调用 - 可以订阅多个频道 - 可以订阅自己的频道

详细文档GetChannelData

注意事项

  • 首次调用返回nullGetChannelData()函数首次调用时会返回null,这是正常现象,需要等待数据同步完成。建议在代码中进行null值判断。

  • 数据覆盖机制:频道上仅保存最新状态,调用SetChannelData()会覆盖之前的数据。如需保存历史数据,应在订阅端自行记录。

  • 非阻塞特性:所有频道通信函数均为非阻塞调用,不会影响策略的主流程执行。但这也意味着无法保证数据的实时性。

  • 数据大小限制:传入SetChannelData的数据在JSON序列化后不得超过1024字节。应仅传输必要的状态信息,如交易信号、价格、持仓等关键数据,避免传输完整的K线数组或大量历史数据。

  • 实盘环境限制:频道通信功能主要适用于实盘环境,在回测系统中可能受限或不可用。

  • 实盘ID获取:可通过_G()函数获取当前实盘ID,也可在平台界面查看实盘ID。

  • 安全性考虑:频道数据可能被其他有权限的实盘订阅,请勿在频道中传输敏感信息(如API密钥等)。

最佳实践

  • 合理的更新频率:根据实际需求设置数据更新频率,避免过于频繁的更新造成资源浪费。

  • 数据结构设计:设计清晰的数据结构,包含必要的元数据(如时间戳、版本号等),便于订阅端处理。

  • 错误处理:订阅端应处理null返回值,广播端应确保数据格式正确。

  • 状态版本控制:在数据中包含版本号或更新ID,帮助订阅端判断是否有新数据。

  • 监控与告警:对于关键的通信链路,建议实现超时监控和告警机制。

  • 测试验证:在正式使用前,先在测试环境中验证频道通信的稳定性和延迟。

  • 文档记录:记录频道数据格式和通信协议,便于后续维护和多人协作。

{@fun/Global/SetChannelData SetChannelData} {@fun/Global/GetChannelData GetChannelData} {@fun/Global/G _G}