[TOC]

금융 시장의 급속한 발전과 양적 거래의 인기로 인해 점점 더 많은 거래자가 자동화된 거래 전략에 의존하기 시작했습니다. 이 과정에서는 전략 간의 의사소통과 조정이 특히 중요합니다. FMZ(양적 거래 플랫폼)는 실제 거래 전략 간에 효율적인 통신 프로토콜을 제공함으로써 거래자가 원활한 전략 통합과 실시간 데이터 공유를 달성할 수 있도록 돕습니다.
이 글에서는 FMZ 플랫폼의 거래 전략에 대한 실시간 통신 프로토콜을 심층적으로 살펴보고, 실제 적용 시의 설계 개념, 기능적 특징과 장점을 소개합니다. 자세한 사례 분석을 통해 이 프로토콜을 사용하여 효율적이고 안정적인 전략적 커뮤니케이션을 달성하고 거래 전략의 실행과 수익성을 개선하는 방법을 보여드리겠습니다.
FMZ를 처음 사용하는 양적 거래 애호가이든, 경험이 풍부한 전문 프로그래머이든, 이 글은 귀하에게 귀중한 통찰력과 실용적인 운영 가이드를 제공할 것입니다. FMZ의 강력한 기능을 살펴보고 효율적인 커뮤니케이션 프로토콜을 통해 전략 간 협업을 달성하고, 거래 효율성을 개선하며, 시장 기회를 포착하는 방법을 알아보겠습니다.
이러한 수요 시나리오는 실제 응용 분야에서 FMZ 거래 전략 실시간 통신 프로토콜의 다양한 가능성과 이점을 보여줍니다. 전략 간 효과적인 소통을 통해 트레이더는 복잡한 시장 환경에 더 잘 대처하고, 거래 전략을 최적화하고, 거래 효율성과 수익을 개선할 수 있습니다.
실제 디스크 간의 통신 요구 사항을 이해한 후에는 이러한 요구 사항을 실현하는 방법을 고려해야 합니다. 요구 사항이 간단해 보이지만, 실제 사례 A가 실제 사례 B와 정보를 교환하기를 바랄 뿐일 뿐입니다. 그러나 다양한 세부 사항은 일련의 통신 프로토콜을 사용하여 합의해야 합니다. FMZ는 여러 인기 있는 통신 프로토콜을 캡슐화했습니다.
mqtt / nats / amqp / kafka
통신 아키텍처는 다음과 같습니다.
FMZ 플랫폼에 이러한 프로토콜을 적용할 때, 간단히 mqtt/nats/amqp/kafka로 이해할 수 있습니다. 이러한 프로토콜은 다음에 통합됩니다.Dial()함수에서 사용하세요Dial()함수는 메시지 게시, 구독 등의 작업을 수행합니다. 이렇게 게시된 메시지는 프로토콜 서버를 통해 구독된 실제 디스크로 프록시(릴레이)되므로 먼저 프로토콜 서버를 실행해야 합니다. 다음 예제에서는 데모의 편의성을 위해 다양한 프로토콜 서버 이미지 배포를 사용합니다.
다이얼 기능 API 문서 섹션: https://www.fmz.com/syntax-guide#fun_dial
Docker 이미지를 배포하기 전에 먼저 Docker 소프트웨어를 설치하세요.

다음으로, FMZ가 지원하는 통신 프로토콜 애플리케이션을 살펴보고 연습해 보겠습니다.
MQTT(Message Queuing Telemetry Transport)는 대역폭이 낮고 지연 시간이 길거나 신뢰할 수 없는 네트워크 환경에 특히 적합한 가벼운 메시지 전송 프로토콜입니다. 이는 1999년에 IBM의 Andy Stanford-Clark와 Arlen Nipper가 제안하였고 나중에 ISO 표준(ISO/IEC PRF 20922)이 되었습니다.
MQTT 프로토콜의 주요 특징: 게시/구독 모드
MQTT 프로토콜을 지원하는 소프트웨어의 docker 이미지(eclipse-mosquitto 이미지)를 사용하여 MQTT 프록시 서버를 배포하기 때문에 docker를 미리 설치하였고 나중에 자세한 내용은 설명하지 않겠습니다.
이미지를 배포하기 위한 명령을 실행하기 전에 프록시 서버 구성 파일을 작성해야 합니다.mosquitto.conf。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
그런 다음 배포 명령을 실행합니다.
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
프록시 서버 이미지가 실행되면 다음과 같은 화면이 표시됩니다.
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
그런 다음 전략을 실제로 적용해 테스트해 볼 수 있습니다.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
전략 코드에서 Dial 함수의 주요 용도는 다음과 같습니다.
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial 함수의 문자열 매개변수는 다음으로 시작합니다.mqtt://프로토콜 이름 뒤에 수신 주소와 포트가 옵니다. “?” 기호 뒤에 구독/게시 주제 이름이 옵니다. 여기서 테스트한 주제 이름은 다음과 같습니다.test_topic。
위의 전략은 토픽을 동시에 게시하고 구독합니다. 실행 테스트는 그림과 같습니다.

두 개의 실제 디스크를 사용하여 서로를 구독하고 주제 정보를 게시할 수도 있습니다. NATS 프로토콜 연습 섹션에서 이러한 예를 사용하며, 다른 프로토콜에서는 이 방법을 반복하지 않습니다.
NATS 프로토콜은 간단한 텍스트 기반의 게시/구독 스타일 프로토콜입니다. 클라이언트는 gnatsd(NATS 서버)에 연결하고 gnatsd와 통신합니다. 통신은 일반적인 TCP/IP 소켓을 기반으로 하며 매우 작은 작업 집합을 정의합니다. 줄바꿈은 종료를 나타냅니다. 이진 메시지 형식을 사용하는 기존 메시징 시스템과 달리, 텍스트 기반 NATS 프로토콜은 클라이언트 구현을 간소화하고 다양한 프로그래밍 언어나 스크립팅 언어로 쉽게 구현할 수 있습니다.
각 프로토콜에는 고유한 특성이 있습니다. 여기서 자세히 설명하지 않을 구체적인 문서와 자료를 참조할 수 있습니다.
NATS 프로토콜 서버를 배포합니다.
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
이 docker 명령어는 자동으로 nats 이미지를 다운로드하고 실행합니다. 포트 4222는 클라이언트가 액세스하려는 포트입니다. 이미지가 배포되면 8222 포트에서 http 모니터가 열립니다.
Listening for client connections on 0.0.0.0:4222
Server is ready
nats 서버 이미지가 실행되기 시작하고 포트 4222에서 수신 대기합니다.
우리는 두 가지 전략(실제 거래)을 만들어야 합니다. 지금은 전략 A와 전략 B라고 이름 붙입니다. 이 두 전략의 코드는 기본적으로 동일합니다. FMZ 플랫폼에서 가장 사용하기 쉬운 언어인 Javascript로 작성되었습니다.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
이 두 가지 전략은 기본적으로 동일하지만, 서로를 게시하고 구독한다는 점이 다르고, 구독된 주제, 게시된 주제, 게시된 정보가 다릅니다.
전략 B를 예로 들어 보겠습니다.
Dial()함수는 클라이언트 연결 서버 객체를 생성합니다.connPub, 주제 메시지 게시에 사용됨:var connPub = Dial(“nats://[email protected]:4222?topic=pubRobotB”)
Dial 함수의 매개변수 문자열은 다음으로 시작합니다.nats://NATS 프로토콜이 통신에 사용됨을 나타냅니다.admin도커 이미지를 배포할 때 설정하는 간단한 검증 정보입니다.auth admin, 다음 내용을 구분하기 위해 문자 “@“를 사용하고 그 다음에 서비스 주소와 포트를 입력합니다.127.0.0.1:4222마지막으로 게시/구독 주제:topic=pubRobotB이전 주소 사이에 “?” 기호가 있다는 점에 주의하세요.
Dial()함수는 클라이언트 연결 서버 객체를 생성합니다.connSub, 주제 메시지 구독에 사용됨:var connSub = Dial(“nats://[email protected]:4222?topic=pubRobotA”)
유일한 차이점은topic=pubRobotA다릅니다. 전략 A가 정보를 보내는 주제를 구독해야 하기 때문입니다.pubRobotA。
전략 A에서 구독 및 게시 연결 객체를 생성하고 사용하는 것은 위에서 설명한 것과 동일합니다.


이런 방식으로 NATS 프로토콜 응용 프로그램의 간단한 예가 구현되는데, 여기서 실제 디스크 A와 실제 디스크 B는 메시지를 구독하고 게시하여 서로 통신합니다.
비동기 통신에서 메시지는 수신자에게 즉시 도달하지 않고 컨테이너에 저장됩니다. 특정 조건이 충족되면 컨테이너가 메시지를 수신자에게 보냅니다. 이 컨테이너는 메시지 큐입니다. 이 기능을 완료하려면 , 양측은 컨테이너와 그 구성 요소가 통합된 관례와 규칙을 준수해야 합니다. AMQP는 그러한 프로토콜입니다. 메시지의 발신자와 수신자는 모두 이 프로토콜을 준수함으로써 비동기 통신을 달성할 수 있습니다. 이 프로토콜은 메시지의 형식과 작동 방식을 지정합니다.
각 프로토콜에는 고유한 특성이 있습니다. 여기서 자세히 설명하지 않을 구체적인 문서와 자료를 참조할 수 있습니다.
amqp 프로토콜 서버를 배포합니다.
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Docker 이미지를 배포하면 자동으로 다운로드되어 배포되며, 완료되면 다음과 같이 표시됩니다.
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
서버 이미지가 배포된 후 테스트 예제 테스트를 작성합니다.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
AMQP 프로토콜 큐를 사용할 때 게시된 메시지는 큐에 유지된다는 점에 유의하세요. 예를 들어, 위의 예제 코드를 한 번 실행해 보겠습니다. 10개의 메시지가 대기열에 작성됩니다. 그런 다음 두 번째로 실행하면 읽을 때 처음에 작성된 정보가 다시 읽히는 것을 확인할 수 있습니다. 그림에서 보는 바와 같이:

스크린샷에서 빨간색 화살표가 가리키는 두 로그 메시지의 시간이 일치하지 않는 것을 볼 수 있습니다. 그 이유는 빨간색 메시지가 전략 코드가 처음 실행되었을 때 큐에 읽혀지고 쓰여진 메시지이기 때문입니다.
이 기능을 기반으로 일부 요구 사항을 충족할 수 있습니다. 예를 들어, 전략이 재시작된 후에도 기록된 시장 데이터는 초기화 계산 및 기타 작업을 위해 대기열에서 계속 가져올 수 있습니다.
Apache Kafka는 실시간으로 스트리밍 데이터를 수집하고 처리하도록 최적화된 분산 데이터 저장소입니다. 스트리밍 데이터란 수천 개의 데이터 소스에서 지속적으로 생성되는 데이터를 말하며, 종종 동시에 데이터 레코드를 전송합니다. 스트리밍 플랫폼은 이처럼 지속적으로 흐르는 데이터를 처리하고, 단계별로 순서대로 처리해야 합니다.
카프카는 사용자에게 세 가지 주요 기능을 제공합니다.
카프카는 주로 데이터 스트림에 적응하는 실시간 스트리밍 데이터 파이프라인과 애플리케이션을 구축하는 데 사용됩니다. 메시징, 스토리지, 스트림 처리 기능을 결합하여 과거 데이터와 실시간 데이터를 모두 저장합니다.
Kafka 프록시의 Docker 이미지를 배포합니다.
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
테스트 코드를 사용하여 테스트하세요.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Dial 함수에서 Kafka 프로토콜을 사용하여 메시지를 게시하고 구독하는 방법을 살펴보겠습니다.
Dial("kafka://localhost:9092/test_topic")
다른 여러 프로토콜과 마찬가지로 첫 번째 부분은 프로토콜 이름입니다. 그 다음에는 청취 주소를 따르세요.localhost:9092. 그런 다음 구분 기호로 “/“를 사용하고 구독/게시 주제를 뒤에 붙입니다. 여기서 테스트 주제는 다음과 같이 설정됩니다.test_topic。
테스트 결과:
