[TOC]

Com o rápido desenvolvimento dos mercados financeiros e a popularidade da negociação quantitativa, cada vez mais traders estão começando a confiar em estratégias automatizadas para negociação. Nesse processo, a comunicação e a coordenação entre estratégias são particularmente importantes. A FMZ (Quantitative Trading Platform) ajuda os traders a obter integração estratégica perfeita e compartilhamento de dados em tempo real, fornecendo um protocolo de comunicação eficiente entre estratégias de negociação reais.
Este artigo explorará em profundidade o protocolo de comunicação em tempo real de estratégias de negociação na plataforma FMZ e apresentará seu conceito de design, recursos funcionais e vantagens em aplicações práticas. Por meio de uma análise detalhada de casos, demonstraremos como usar este protocolo para obter uma comunicação estratégica eficiente e estável e melhorar a execução e a lucratividade das estratégias de negociação.
Seja você um entusiasta de negociação quantitativa que está apenas começando com FMZ ou um programador profissional experiente, este artigo fornecerá insights valiosos e guias práticos de operação. Vamos explorar as funções poderosas do FMZ e aprender como alcançar a colaboração entre estratégias por meio de protocolos de comunicação eficientes, melhorar a eficiência das negociações e capturar oportunidades de mercado.
Esses cenários de demanda demonstram as diversas possibilidades e vantagens do protocolo de comunicação em tempo real da estratégia de negociação FMZ em aplicações práticas. Por meio de uma comunicação eficaz entre estratégias, os traders podem lidar melhor com ambientes de mercado complexos, otimizar estratégias de negociação e melhorar a eficiência e os lucros das negociações.
Depois de entender os requisitos de comunicação entre discos reais, precisamos considerar como atender a esses requisitos. Nada mais é do que o caso real A espera trocar informações com o caso real B, embora a demanda pareça simples. No entanto, vários detalhes precisam ser acordados usando um conjunto de protocolos de comunicação. O FMZ encapsulou vários protocolos de comunicação populares.
mqtt / nats / amqp / kafka
A arquitetura de comunicação é:
Ao aplicar esses protocolos na plataforma FMZ, eles podem ser simplesmente entendidos como mqtt/nats/amqp/kafka. Esses protocolos são integrados emDial()Na função, useDial()As funções são usadas para publicar e assinar mensagens. Essas mensagens publicadas são enviadas por proxy (retransmitidas) para o disco real inscrito por meio do servidor de protocolo, portanto, um servidor de protocolo deve ser executado primeiro. Para facilitar a demonstração, usamos várias implantações de imagem de servidor de protocolo nos exemplos a seguir.
Seção de documentação da API da função de discagem: https://www.fmz.com/syntax-guide#fun_dial
Antes de implantar a imagem do Docker, lembre-se de instalar o software do Docker primeiro.

Em seguida, vamos explorar e praticar as aplicações do protocolo de comunicação suportadas pelo FMZ.
MQTT (Message Queuing Telemetry Transport) é um protocolo leve de transmissão de mensagens que é particularmente adequado para ambientes de rede de baixa largura de banda, alta latência ou não confiáveis. Foi proposto por Andy Stanford-Clark e Arlen Nipper da IBM em 1999 e mais tarde se tornou um padrão ISO (ISO/IEC PRF 20922).
As principais características do protocolo MQTT: modo publicar/assinar
Como usamos a imagem do docker (imagem eclipse-mosquitto) do software que suporta o protocolo MQTT para implantar o servidor proxy MQTT, instalamos o docker com antecedência e não entraremos em detalhes mais tarde.
Antes de executar o comando para implantar a imagem, precisamos escrever um arquivo de configuração do servidor proxymosquitto.conf。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Em seguida, execute o comando de implantação:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Após a execução da imagem do servidor proxy, a seguinte tela será exibida:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Então podemos testar a estratégia para colocá-la em prática.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
O principal uso da função Dial no código de estratégia é:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
O parâmetro string da função Dial começa commqtt://É o nome do protocolo, seguido do endereço de escuta e da porta. O símbolo “?” é seguido pelo nome do tópico de assinatura/publicação. O nome do tópico testado aqui é:test_topic。
A estratégia acima publica e assina um tópico ao mesmo tempo. O teste em execução é como mostrado na figura:

Você também pode usar dois discos reais para assinar um ao outro e publicar informações de tópicos. Usamos esse exemplo na seção de prática do protocolo NATS e não repetiremos esse método em outros protocolos.
O protocolo NATS é um protocolo simples, baseado em texto, do tipo publicar/assinar. O cliente se conecta ao gnatsd (servidor NATS) e se comunica com o gnatsd. A comunicação é baseada em soquetes TCP/IP comuns e define um conjunto muito pequeno de operações. Nova linha indica término. Ao contrário dos sistemas de mensagens tradicionais que usam formatos de mensagens binárias, o protocolo NATS baseado em texto simplifica a implementação do cliente e pode ser facilmente implementado em uma variedade de linguagens de programação ou linguagens de script.
Cada protocolo tem suas próprias características. Você pode consultar os documentos e materiais específicos, que não serão elaborados aqui.
Implante o servidor de protocolo NATS:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Este comando docker baixará e executará automaticamente a imagem nats. A porta 4222 é a porta que o cliente deseja acessar. Após a implantação da imagem, um monitor http será aberto na porta 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
A imagem do servidor nats começa a ser executada, escutando na porta 4222.
Precisamos criar duas estratégias (negociação real), vamos chamá-las de Estratégia A e Estratégia B por enquanto. Os códigos dessas duas estratégias são basicamente os mesmos. Escrito em Javascript, a linguagem mais fácil de usar na plataforma FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Essas duas estratégias são basicamente as mesmas, exceto que elas publicam e assinam uma à outra, e os tópicos assinados, tópicos publicados e informações publicadas são diferentes.
Tomemos como exemplo a Estratégia B:
Dial()A função cria um objeto de servidor de conexão de clienteconnPub, usado para publicação de mensagens de tópico:var connPub = Dial(“nats://[email protected]:4222?topic=pubRobotB”)
A sequência de parâmetros da função Dial começa comnats://Indica que o protocolo NATS é usado para comunicação, entãoadminÉ um conjunto simples de informações de verificação ao implantar a imagem do dockerauth admin, use o caractere “@” para separar o conteúdo a seguir e, em seguida, o endereço do serviço e a porta127.0.0.1:4222, e finalmente o tópico publicar/assinar:topic=pubRobotBObserve que há um símbolo “?” entre o endereço anterior.
Dial()A função cria um objeto de servidor de conexão de clienteconnSub, usado para assinatura de mensagem de tópico:var connSub = Dial(“nats://[email protected]:4222?topic=pubRobotA”)
A única diferençatopic=pubRobotADiferente, porque você precisa se inscrever no tópico onde a estratégia A envia informaçõespubRobotA。
A criação e o uso de objetos de conexão de assinatura e publicação na estratégia A são os mesmos descritos acima.


Dessa forma, é implementado um exemplo simples de aplicação do protocolo NATS, no qual o disco real A e o disco real B assinam e publicam mensagens para se comunicarem entre si.
Na comunicação assíncrona, a mensagem não chegará ao receptor imediatamente, mas será armazenada em um contêiner. Quando certas condições forem atendidas, a mensagem será enviada ao receptor pelo contêiner. Este contêiner é a fila de mensagens. Para concluir esta função , ambas as partes precisam O contêiner e seus componentes obedecem a convenções e regras unificadas. AMQP é um desses protocolos. Tanto o remetente quanto o destinatário de mensagens podem obter comunicação assíncrona obedecendo a esse protocolo. Este protocolo especifica o formato das mensagens e como elas funcionam.
Cada protocolo tem suas próprias características. Você pode consultar os documentos e materiais específicos, que não serão elaborados aqui.
Implante o servidor de protocolo amqp:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Ao implantar uma imagem do Docker, ela será baixada e implantada automaticamente e, quando concluída, será exibido:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Após a implantação da imagem do servidor, escreva um exemplo de teste:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Ao usar a fila do protocolo AMQP, observe que as mensagens publicadas persistirão na fila. Por exemplo, vamos executar o código de exemplo acima uma vez. 10 mensagens serão gravadas na fila. Então, quando o executamos pela segunda vez, podemos descobrir que, ao ler, as informações escritas pela primeira vez serão lidas novamente. Conforme mostrado na figura:

Você pode ver que as duas mensagens de log apontadas pelas setas vermelhas na captura de tela têm tempos inconsistentes. O motivo é que a mensagem vermelha é a que foi lida e gravada na fila quando o código da estratégia foi executado pela primeira vez.
Com base nesse recurso, alguns requisitos podem ser atendidos. Por exemplo, após a estratégia ser reiniciada, os dados de mercado registrados ainda podem ser obtidos da fila para cálculo de inicialização e outras operações.
O Apache Kafka é um armazenamento de dados distribuído otimizado para ingestão e processamento de dados de streaming em tempo real. Dados de streaming referem-se a dados gerados continuamente por milhares de fontes de dados, geralmente enviando registros de dados simultaneamente. A plataforma de streaming precisa processar esses dados que fluem continuamente e processá-los passo a passo em sequência.
O Kafka fornece três funções principais aos seus usuários:
O Kafka é usado principalmente para criar pipelines de dados de streaming em tempo real e aplicativos que se adaptam a fluxos de dados. Ele combina recursos de mensagens, armazenamento e processamento de fluxo para armazenar dados históricos e em tempo real.
Implante a imagem docker do proxy Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Teste usando o código de teste:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Vamos dar uma olhada em como usar o protocolo Kafka para publicar e assinar mensagens na função Dial.
Dial("kafka://localhost:9092/test_topic")
Como vários outros protocolos, a primeira parte é o nome do protocolo. Em seguida, siga o endereço de escuta:localhost:9092. Em seguida, use o símbolo “/” como separador, seguido do tópico de assinatura/publicação. Aqui, o tópico de teste é definido comotest_topic。
Resultados dos testes:
