资源加载中... loading...

探索FMZ:交易策略实盘间通信协议实践

Author: 发明者量化-小小梦, Created: 2024-08-06 14:13:40, Updated: 2024-08-07 15:30:13

[TOC]

img

随着金融市场的快速发展和量化交易的普及,越来越多的交易者开始依赖自动化策略进行交易。在这个过程中,策略之间的通信和协调显得尤为重要。FMZ(量化交易平台)通过提供高效的交易策略实盘间通信协议,帮助交易者实现策略的无缝对接和实时数据共享。

本篇文章将深入探讨FMZ平台中的交易策略实盘间通信协议,介绍其设计理念、功能特点以及在实际应用中的优势。我们将通过详细的案例分析,展示如何利用这一协议实现高效、稳定的策略通信,提升交易策略的执行力和收益表现。

无论你是刚入门FMZ的量化交易爱好者,还是经验丰富的专业程序员老手,本篇文章都将为你提供有价值的见解和实用的操作指南。让我们一同探索FMZ的强大功能,了解如何通过高效的通信协议,实现策略间的协同合作,提升交易效率,捕捉市场机遇。


需求场景

    1. 多策略协同交易 需求场景: 在复杂的市场环境中,单一策略可能无法应对各种突发情况和市场变化。交易者希望同时运行多个策略,如趋势跟踪策略、均值回归策略和套利策略,并让这些策略之间进行实时通信,以便共享市场信息和交易信号,从而提高整体交易效率和稳定性。
    1. 跨市场套利 需求场景: 交易者希望在不同的交易市场之间进行套利交易。例如,利用A股市场和港股市场之间的价差进行套利。当某一市场出现价格异常时,策略需要及时通知其他市场的策略进行相应的买卖操作,以实现套利机会的捕捉。
    1. 风险管理与对冲 需求场景: 某策略负责在市场中寻找和执行高风险高回报的交易,而另一个策略则专注于监控整体风险并执行对冲操作。为了确保在高风险交易过程中不会遭受过多损失,这两个策略需要进行实时通信和数据共享,以便及时调整仓位和对冲风险。
    1. 分布式交易系统 需求场景: 大型交易机构希望在多个物理服务器上运行分布式交易系统,以提高交易系统的容错性和性能。这些服务器上的策略需要通过通信协议进行数据同步和协调操作,从而确保整体交易系统的稳定和高效运行。
    1. 市场监控与预警 需求场景: 某策略专门负责实时监控市场动态,当市场出现重大变动(如突然的价格暴跌或飙升)时,策略需要迅速通知其他策略进行相应的应对操作,例如平仓、调仓或加仓,以降低风险或抓住交易机会。
    1. 组合策略管理 需求场景: 交易者使用一个策略组合来管理不同资产类别的投资,每个策略专注于特定的资产类别(如股票、债券、期货等)。这些策略需要进行通信和协调,以实现组合投资的整体优化和收益最大化。

这些需求场景展示了FMZ交易策略实盘间通信协议在实际应用中的多种可能性和优势。通过有效的策略间通信,交易者可以更好地应对复杂的市场环境,优化交易策略,提升交易效率和收益。


FMZ封装的通信协议与Dial函数

了解过实盘之间的通信需求后,就是要考虑如何实现这些需求了。无非就是实盘A希望可以和实盘B交互信息,虽然看上去需求很简单。不过其中有各种细节就需要使用一套通信协议来约定,FMZ已经封装了几种比较流行的通信协议。

mqtt / nats / amqp / kafka

通信架构

通信架构为:

  • 服务端(代理)。 需要运行起来一个通信协议的服务端,用来中继订阅者、发布者之间传递的消息。这个服务端可以部署在托管者所在系统本地(本地实盘间通信);也可以是远程的服务(用来跨服务器实盘间通信)。
  • 客户端(订阅者、发布者)。 FMZ上的策略实盘程序可以理解为一个通信协议的客户端,策略实盘可以是发布者(pub),也可以是订阅者(sub)。

Dial函数

在FMZ平台上应用这些协议时,可以简单理解为 mqtt / nats / amqp / kafka 这些协议都集成在了Dial()函数中,使用Dial()函数进行消息发布、订阅等操作。这些发布的消息通过协议的服务端代理(中继)给订阅的实盘,所以首先要运行起来一个协议的服务端。为了便于演示,以下例子中我们都使用各种协议服务端镜像部署。

Dial函数所在API文档章节: https://www.fmz.com/syntax-guide#fun_dial

在部署docker镜像之前,要记得先安装docker软件。

img

接下来就让我们一起来探索、实践FMZ支持的通信协议应用。


FMZ平台实盘通信协议实践

mqtt 协议

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,特别适用于低带宽、高延迟或不可靠的网络环境。它由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 在 1999 年提出,后来成为 ISO 标准(ISO/IEC PRF 20922)。

MQTT 协议的主要特点:发布/订阅模式

  • 发布:消息生产者将消息发送到主题(Topic)上。
  • 订阅:消息消费者订阅感兴趣的主题,从而接收发布到该主题的消息。
  • 中介:MQTT 使用消息代理(Broker)作为中介来转发消息,确保发布者和订阅者之间的解耦。

消息发布与订阅

因为我们使用支持MQTT协议的软件的docker镜像(eclipse-mosquitto镜像)部署MQTT代理服务器,所以提前安装好docker,后续不在赘述。

在运行部署镜像的命令之前,我们需要先写好一个代理服务器配置文件mosquitto.conf

# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true

然后执行部署命令:

docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

代理服务器镜像运行起来后有如下显示:

1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running

然后我们可以测试策略,来实践一下。

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

策略代码中主要是Dial函数的使用:

Dial("mqtt://127.0.0.1:1883?topic=test_topic")

Dial函数的字符串参数开头的mqtt://是协议名称,然后紧跟监听地址、端口。符号"?"后是订阅/发布主题名称,这里测试的主题名称为:test_topic

以上策略针对一个主题一边发布、一边订阅,运行测试如图:

img

也可以使用2个实盘相互订阅、发布主题信息,在nats协议实践章节我们使用这样的例子,其它的协议中不再赘述该方式。


nats 协议

NATS的协议是一个简单的、基于文本的发布/订阅风格的协议。客户端连接到 gnatsd(NATS服务器),并与 gnatsd 进行通信,通信基于普通的 TCP/IP 套接字,并定义了很小的操作集,换行表示终止。与传统的、使用了二进制消息格式的消息通信系统不同,使用了基于文本的 NATS 协议,使得客户端实现很简单,可以方便地选择多种编程语言或脚本语言来实现。

每个协议都有各自的特点,可以查阅具体文档、资料,这里不在赘述。

部署 nats 协议服务端:

docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin

这条docker命令会自动下载、运行 nats 镜像,4222端口为客户端所要访问的端口。镜像部署好之后也会有一个http monitor在8222端口开放。

Listening for client connections on 0.0.0.0:4222
Server is ready

nats服务端镜像就开始运行了,监听4222端口。

本地设备实盘策略间通信

我们需要创建两个策略(实盘),暂且就命名策略A、策略B吧,这两个策略代码基本一样。使用FMZ平台上最易上手的Javascript语言编写。

  • 策略A

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    
  • 策略B

    var connPub = null 
    var connSub = null
    
    function main() {
        var robotId = _G()
        Log("当前实盘robotId:", robotId)
    
        connPub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotB")
        if (!connPub) {
            Log("通信失败!")
            return 
        }
    
        connSub = Dial("nats://admin@127.0.0.1:4222?topic=pubRobotA")
        if (!connSub) {
            Log("通信失败!")
            return 
        }
    
        while (true) {
            connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
            var msgRead = connSub.read(10000)
            if (msgRead) {
                Log("msgRead:", msgRead)
            }
    
            LogStatus(_D())
            Sleep(10000)
        }
    }
    
    function onexit() {
        connPub.close()
        connSub.close()
    }
    

这两个策略基本上一样,只不过是相互发布、订阅,订阅主题、发布主题、发布信息不一样而已。

以策略B举例:

  • 1、使用Dial()函数创建客户端连接服务端对象connPub,用于主题消息发布:

    var connPub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotB”)

    Dial函数的参数字符串,开头的nats://表示使用的是nats协议通信,然后admin是在部署docker镜像时设置的简单校验信息auth admin,使用字符"@“与后面的内容间隔,然后是服务地址和端口127.0.0.1:4222,最后是发布/订阅主题:topic=pubRobotB注意与前面的地址之间用”?"符号间隔。

  • 2、使用Dial()函数创建客户端连接服务端对象connSub,用于主题消息订阅:

    var connSub = Dial(“nats://admin@127.0.0.1:4222?topic=pubRobotA”)

    差别只是topic=pubRobotA不同,因为需要订阅策略A发送信息的主题pubRobotA

对于策略A中的订阅、发布连接对象的创建和使用与以上描述同理。

  • 策略A运行

    img

  • 策略B运行

    img

这样就实现了一个简单的实盘A与实盘B之间相互订阅、发布消息进行通信的nats协议应用例子。


amqp 协议

amqp 协议的 queue (队列)

在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

每个协议都有各自的特点,可以查阅具体文档、资料,这里不在赘述。

部署 amqp 协议服务端:

docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management

部署docker镜像时,会自动下载部署,完成以后会显示:

2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms

服务端镜像部署好之后,编写测试例子测试:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("amqp://q:admin@127.0.0.1:5672/?queue=robotA_Queue")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 读取
        Log("read:", conn.read(1000), "#FF0000")
        
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

使用amqp协议的queue需要注意,发布后的消息会持久存在于queue中,例如我们先运行一次上面的例子代码。会向queue写入10条信息。然后我们第二次运行可以发现读取的时候会再次读取第一次写入的信息。如图所示:

img

可以看到截图中红色箭头指向的两条日志信息,时间并不一致,原因是红色的这条是读取出的消息是策略代码第一次运行时写入queue的。

基于此特性可以实现一些需求,例如:策略实盘重启后,依然可以从queue中获取已记录的行情数据,用于初始化计算等操作。


kafka 协议

Apache Kafka 是一种分布式数据存储,经过优化以实时提取和处理流数据。流数据是指由数千个数据源持续生成的数据,通常可同时发送数据记录。流平台需要处理这些持续流入的数据,按照顺序逐步处理。

Kafka 为其用户提供三项主要功能:

  • 发布和订阅记录流
  • 按照记录的生成顺序高效地存储记录流
  • 实时处理记录流

Kafka 主要用于构建适应数据流的实时流数据管道和应用程序。它结合了消息收发、存储和流处理功能,能够存储历史和实时数据。

消息发布与订阅

部署Kafka代理的docker镜像:

docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
        -e KAFKA_CFG_NODE_ID=0 \
        -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
        -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
        -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
        -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
        -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
        -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
        bitnami/kafka:latest

使用测试代码测试:

var conn = null

function main() {
    LogReset(1)
    var robotId = _G()
    Log("当前实盘robotId:", robotId)

    conn = Dial("kafka://localhost:9092/test_topic")
    if (!conn) {
        Log("通信失败!")
        return 
    }

    for (var i = 0; i < 10; i++) {
        // 写入
        var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()        
        conn.write(msg)
        Log("向testQueue写入消息:", msg)

        // 读取
        Log("read:", conn.read(1000), "#FF0000")

        Sleep(1000)
    }    
}

function onexit() {
    conn.close()
    Log("关闭conn")
}

我们来看一下如何在Dial函数中使用kafka协议进行消息发布与订阅。

Dial("kafka://localhost:9092/test_topic")

与其它几种协议一样,开头部分是协议名称。然后紧跟监听地址:localhost:9092。接着使用符号"/"作为间隔,后面写订阅/发布的主题,这里测试主题设置为test_topic

测试结果:

img


More