[TOC]

Với sự phát triển nhanh chóng của thị trường tài chính và sự phổ biến của giao dịch định lượng, ngày càng nhiều nhà giao dịch bắt đầu dựa vào các chiến lược giao dịch tự động. Trong quá trình này, việc giao tiếp và phối hợp giữa các chiến lược có vai trò đặc biệt quan trọng. FMZ (Nền tảng giao dịch định lượng) giúp các nhà giao dịch đạt được sự tích hợp chiến lược liền mạch và chia sẻ dữ liệu theo thời gian thực bằng cách cung cấp giao thức truyền thông hiệu quả giữa các chiến lược giao dịch thực tế.
Bài viết này sẽ đi sâu tìm hiểu về giao thức truyền thông thời gian thực của các chiến lược giao dịch trên nền tảng FMZ và giới thiệu khái niệm thiết kế, các tính năng chức năng và lợi thế của nó trong các ứng dụng thực tế. Thông qua phân tích trường hợp chi tiết, chúng tôi sẽ chứng minh cách sử dụng giao thức này để đạt được mục tiêu truyền đạt chiến lược hiệu quả và ổn định, đồng thời cải thiện việc thực hiện và lợi nhuận của các chiến lược giao dịch.
Cho dù bạn là người đam mê giao dịch định lượng mới bắt đầu sử dụng FMZ hay là một lập trình viên chuyên nghiệp giàu kinh nghiệm, bài viết này sẽ cung cấp cho bạn những hiểu biết có giá trị và hướng dẫn vận hành thực tế. Hãy cùng khám phá những chức năng mạnh mẽ của FMZ và tìm hiểu cách đạt được sự hợp tác giữa các chiến lược thông qua các giao thức truyền thông hiệu quả, cải thiện hiệu quả giao dịch và nắm bắt cơ hội thị trường.
Các kịch bản nhu cầu này chứng minh những khả năng và lợi thế khác nhau của giao thức truyền thông thời gian thực chiến lược giao dịch FMZ trong các ứng dụng thực tế. Thông qua việc giao tiếp hiệu quả giữa các chiến lược, các nhà giao dịch có thể ứng phó tốt hơn với môi trường thị trường phức tạp, tối ưu hóa các chiến lược giao dịch và cải thiện hiệu quả giao dịch cũng như lợi nhuận.
Sau khi hiểu được các yêu cầu giao tiếp giữa các đĩa thực, chúng ta cần xem xét cách thực hiện các yêu cầu này. Nó không có gì hơn là trường hợp thực tế A hy vọng trao đổi thông tin với trường hợp thực tế B, mặc dù yêu cầu này có vẻ đơn giản. Tuy nhiên, cần phải thống nhất nhiều chi tiết khác nhau bằng cách sử dụng một bộ giao thức truyền thông. FMZ đã đóng gói một số giao thức truyền thông phổ biến.
mqtt / nats / amqp / kafka
Kiến trúc truyền thông là:
Khi áp dụng các giao thức này trên nền tảng FMZ, có thể hiểu đơn giản là mqtt/nats/amqp/kafka. Các giao thức này được tích hợp vàoDial()Trong hàm, sử dụngDial()Các hàm thực hiện các hoạt động như đăng tin nhắn và đăng ký. Những thông điệp được công bố này được chuyển tiếp đến đĩa thực đã đăng ký thông qua máy chủ giao thức, do đó trước tiên phải chạy máy chủ giao thức. Để dễ trình bày, chúng tôi sử dụng nhiều triển khai hình ảnh máy chủ giao thức khác nhau trong các ví dụ sau.
Phần tài liệu API chức năng quay số: https://www.fmz.com/syntax-guide#fun_dial
Trước khi triển khai ảnh docker, hãy nhớ cài đặt phần mềm docker trước.

Tiếp theo, chúng ta hãy khám phá và thực hành các ứng dụng giao thức truyền thông được FMZ hỗ trợ.
MQTT (Message Queuing Telemetry Transport) là một giao thức truyền tin nhẹ, đặc biệt phù hợp với môi trường mạng băng thông thấp, độ trễ cao hoặc không đáng tin cậy. Tiêu chuẩn này được Andy Stanford-Clark và Arlen Nipper của IBM đề xuất vào năm 1999 và sau đó trở thành tiêu chuẩn ISO (ISO/IEC PRF 20922).
Các tính năng chính của giao thức MQTT: chế độ xuất bản/đăng ký
Vì chúng tôi sử dụng hình ảnh docker (hình ảnh eclipse-mosquitto) của phần mềm hỗ trợ giao thức MQTT để triển khai máy chủ proxy MQTT nên chúng tôi đã cài đặt docker trước và sẽ không đi sâu vào chi tiết sau.
Trước khi chạy lệnh để triển khai hình ảnh, chúng ta cần viết một tệp cấu hình máy chủ proxymosquitto.conf。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Sau đó thực hiện lệnh triển khai:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Sau khi hình ảnh máy chủ proxy chạy, màn hình hiển thị sau sẽ hiển thị:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Sau đó, chúng ta có thể thử nghiệm chiến lược để đưa nó vào thực tế.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Công dụng chính của hàm Dial trong mã chiến lược là:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Tham số chuỗi của hàm Dial bắt đầu bằngmqtt://Đây là tên giao thức, theo sau là địa chỉ lắng nghe và cổng. Biểu tượng “?” theo sau là tên chủ đề đăng ký/xuất bản. Tên chủ đề được thử nghiệm ở đây là:test_topic。
Chiến lược trên xuất bản và đăng ký một chủ đề cùng một lúc. Bài kiểm tra đang chạy như trong hình:

Bạn cũng có thể sử dụng hai đĩa thực để đăng ký với nhau và xuất bản thông tin chủ đề. Chúng tôi sử dụng ví dụ như vậy trong phần thực hành giao thức NATS và sẽ không lặp lại phương pháp này trong các giao thức khác.
Giao thức NATS là một giao thức theo kiểu xuất bản/đăng ký đơn giản dựa trên văn bản. Máy khách kết nối với gnatsd (máy chủ NATS) và giao tiếp với gnatsd. Giao tiếp dựa trên socket TCP/IP thông thường và xác định một tập hợp rất nhỏ các hoạt động. Dòng mới biểu thị sự kết thúc. Không giống như các hệ thống nhắn tin truyền thống sử dụng định dạng tin nhắn nhị phân, giao thức NATS dựa trên văn bản giúp việc triển khai máy khách trở nên đơn giản và có thể dễ dàng triển khai bằng nhiều ngôn ngữ lập trình hoặc ngôn ngữ kịch bản.
Mỗi giao thức có những đặc điểm riêng. Bạn có thể tham khảo các tài liệu và vật liệu cụ thể, sẽ không trình bày chi tiết ở đây.
Triển khai máy chủ giao thức NATS:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Lệnh docker này sẽ tự động tải xuống và chạy hình ảnh nats. Cổng 4222 là cổng mà máy khách muốn truy cập. Sau khi hình ảnh được triển khai, trình giám sát http sẽ được mở trên cổng 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
Hình ảnh máy chủ nats bắt đầu chạy, lắng nghe trên cổng 4222.
Chúng ta cần tạo ra hai chiến lược (giao dịch thực), tạm gọi là Chiến lược A và Chiến lược B. Mã của hai chiến lược này về cơ bản là giống nhau. Được viết bằng Javascript, ngôn ngữ dễ sử dụng nhất trên nền tảng FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Hai chiến lược này về cơ bản giống nhau, ngoại trừ việc chúng xuất bản và đăng ký lẫn nhau, còn các chủ đề đã đăng ký, chủ đề đã xuất bản và thông tin đã xuất bản là khác nhau.
Lấy Chiến lược B làm ví dụ:
Dial()Chức năng tạo ra một đối tượng máy chủ kết nối máy kháchconnPub, được sử dụng để xuất bản tin nhắn chủ đề:var connPub = Dial(“nats://[email protected]:4222?topic=pubRobotB”)
Chuỗi tham số của hàm Dial bắt đầu bằngnats://Chỉ ra rằng giao thức NATS được sử dụng để giao tiếp, sau đóadminĐây là một tập thông tin xác minh đơn giản khi triển khai hình ảnh dockerauth admin, sử dụng ký tự “@” để phân tách nội dung sau, sau đó là địa chỉ dịch vụ và cổng127.0.0.1:4222và cuối cùng là chủ đề xuất bản/đăng ký:topic=pubRobotBLưu ý rằng có một ký hiệu “?” giữa địa chỉ trước đó.
Dial()Chức năng tạo ra một đối tượng máy chủ kết nối máy kháchconnSub, được sử dụng để đăng ký tin nhắn chủ đề:var connSub = Dial(“nats://[email protected]:4222?topic=pubRobotA”)
Sự khác biệt duy nhấttopic=pubRobotAKhác nhau, vì bạn cần đăng ký chủ đề mà chiến lược A gửi thông tinpubRobotA。
Việc tạo và sử dụng các đối tượng kết nối đăng ký và xuất bản trong chiến lược A giống như mô tả ở trên.


Theo cách này, một ví dụ đơn giản về ứng dụng giao thức NATS được triển khai trong đó đĩa thực A và đĩa thực B đăng ký và phát hành tin nhắn để giao tiếp với nhau.
Trong giao tiếp không đồng bộ, tin nhắn sẽ không đến được người nhận ngay lập tức mà sẽ được lưu trữ trong một container. Khi đáp ứng được một số điều kiện nhất định, tin nhắn sẽ được container gửi đến người nhận. Container này là hàng đợi tin nhắn. Để hoàn thành chức năng này , cả hai bên đều cần Container và các thành phần của nó tuân thủ các quy ước và quy tắc thống nhất. AMQP là một giao thức như vậy. Cả người gửi và người nhận tin nhắn đều có thể đạt được giao tiếp không đồng bộ bằng cách tuân thủ giao thức này. Giao thức này chỉ định định dạng của tin nhắn và cách chúng hoạt động.
Mỗi giao thức có những đặc điểm riêng. Bạn có thể tham khảo các tài liệu và vật liệu cụ thể, sẽ không trình bày chi tiết ở đây.
Triển khai máy chủ giao thức amqp:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Khi triển khai hình ảnh docker, nó sẽ tự động tải xuống và triển khai, và khi hoàn tất sẽ hiển thị:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Sau khi hình ảnh máy chủ được triển khai, hãy viết một ví dụ thử nghiệm:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Khi sử dụng hàng đợi giao thức AMQP, xin lưu ý rằng các tin nhắn đã xuất bản sẽ vẫn nằm trong hàng đợi. Ví dụ, hãy chạy mã ví dụ ở trên một lần. 10 tin nhắn sẽ được ghi vào hàng đợi. Sau đó, khi chạy lại lần thứ hai, chúng ta có thể thấy rằng khi đọc, thông tin được viết lần đầu sẽ được đọc lại. Như thể hiện trong hình:

Bạn có thể thấy rằng hai thông báo nhật ký được trỏ bởi các mũi tên màu đỏ trong ảnh chụp màn hình có thời gian không nhất quán. Lý do là thông báo màu đỏ là thông báo đã được đọc và ghi vào hàng đợi khi mã chiến lược được chạy lần đầu tiên.
Dựa trên tính năng này, một số yêu cầu có thể được đáp ứng. Ví dụ, sau khi chiến lược được khởi động lại, dữ liệu thị trường đã ghi lại vẫn có thể được lấy từ hàng đợi để tính toán khởi tạo và các hoạt động khác.
Apache Kafka là kho lưu trữ dữ liệu phân tán được tối ưu hóa để thu thập và xử lý dữ liệu phát trực tuyến theo thời gian thực. Dữ liệu phát trực tuyến là dữ liệu được tạo liên tục bởi hàng nghìn nguồn dữ liệu, thường gửi các bản ghi dữ liệu cùng lúc. Nền tảng phát trực tuyến cần xử lý dữ liệu liên tục này và xử lý từng bước theo trình tự.
Kafka cung cấp ba chức năng chính cho người dùng:
Kafka chủ yếu được sử dụng để xây dựng các đường ống dữ liệu phát trực tuyến theo thời gian thực và các ứng dụng thích ứng với các luồng dữ liệu. Nó kết hợp các khả năng nhắn tin, lưu trữ và xử lý luồng để lưu trữ cả dữ liệu lịch sử và dữ liệu thời gian thực.
Triển khai hình ảnh docker của proxy Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Kiểm tra bằng cách sử dụng mã kiểm tra:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Chúng ta hãy cùng xem cách sử dụng giao thức Kafka để xuất bản và đăng ký tin nhắn trong hàm Dial.
Dial("kafka://localhost:9092/test_topic")
Giống như một số giao thức khác, phần đầu tiên là tên giao thức. Sau đó theo địa chỉ nghe:localhost:9092. Sau đó sử dụng ký hiệu “/” làm dấu phân cách, theo sau là chủ đề đăng ký/xuất bản. Ở đây, chủ đề kiểm tra được đặt thànhtest_topic。
Kết quả thử nghiệm:
