[TOC]

Dengan pesatnya perkembangan pasar keuangan dan popularitas perdagangan kuantitatif, semakin banyak pedagang mulai mengandalkan strategi otomatis untuk perdagangan. Dalam proses ini, komunikasi dan koordinasi antar strategi sangatlah penting. FMZ (Platform Perdagangan Kuantitatif) membantu pedagang mencapai integrasi strategi yang mulus dan berbagi data waktu nyata dengan menyediakan protokol komunikasi yang efisien antara strategi perdagangan nyata.
Artikel ini akan mengupas secara mendalam protokol komunikasi real-time dari strategi perdagangan di platform FMZ, dan memperkenalkan konsep desain, fitur fungsional, dan keunggulannya dalam aplikasi praktis. Melalui analisis kasus terperinci, kami akan menunjukkan cara menggunakan protokol ini untuk mencapai komunikasi strategi yang efisien dan stabil serta meningkatkan eksekusi dan profitabilitas strategi perdagangan.
Apakah Anda seorang penggemar perdagangan kuantitatif yang baru memulai dengan FMZ, atau seorang programmer profesional yang berpengalaman, artikel ini akan memberi Anda wawasan berharga dan panduan operasi praktis. Mari kita jelajahi fungsi hebat FMZ dan pelajari cara mencapai kolaborasi antar strategi melalui protokol komunikasi yang efisien, meningkatkan efisiensi perdagangan, dan menangkap peluang pasar.
Skenario permintaan ini menunjukkan berbagai kemungkinan dan keuntungan dari protokol komunikasi real-time strategi perdagangan FMZ dalam aplikasi praktis. Melalui komunikasi yang efektif antar strategi, pedagang dapat mengatasi lingkungan pasar yang kompleks dengan lebih baik, mengoptimalkan strategi perdagangan, dan meningkatkan efisiensi dan keuntungan perdagangan.
Setelah memahami persyaratan komunikasi antara disk nyata, kita perlu mempertimbangkan cara mewujudkan persyaratan ini. Tidak lain hanyalah kasus nyata A yang berharap untuk bertukar informasi dengan kasus nyata B, meskipun permintaannya tampaknya sederhana. Namun, berbagai rincian perlu disetujui dengan menggunakan seperangkat protokol komunikasi. FMZ telah merangkum beberapa protokol komunikasi yang populer.
mqtt / nats / amqp / kafka
Arsitektur komunikasi adalah:
Ketika menerapkan protokol ini pada platform FMZ, dapat dipahami secara sederhana sebagai mqtt/nats/amqp/kafka. Protokol ini terintegrasi ke dalamDial()Dalam fungsi, gunakanDial()Fungsi digunakan untuk menerbitkan dan berlangganan pesan. Pesan-pesan yang diterbitkan ini diproksikan (diteruskan) ke disk nyata yang berlangganan melalui server protokol, sehingga server protokol harus dijalankan terlebih dahulu. Untuk memudahkan demonstrasi, kami menggunakan berbagai penerapan citra server protokol dalam contoh berikut.
Bagian dokumentasi API fungsi Dial: https://www.fmz.com/syntax-guide#fun_dial
Sebelum menyebarkan citra docker, ingatlah untuk menginstal perangkat lunak docker terlebih dahulu.

Selanjutnya, mari kita jelajahi dan praktikkan aplikasi protokol komunikasi yang didukung oleh FMZ.
MQTT (Message Queuing Telemetry Transport) adalah protokol transmisi pesan ringan yang sangat cocok untuk lingkungan jaringan dengan bandwidth rendah, latensi tinggi, atau tidak dapat diandalkan. Diusulkan oleh Andy Stanford-Clark dan Arlen Nipper dari IBM pada tahun 1999 dan kemudian menjadi standar ISO (ISO/IEC PRF 20922).
Fitur utama protokol MQTT: mode publikasi/berlangganan
Karena kami menggunakan citra docker (citra eclipse-mosquitto) dari perangkat lunak yang mendukung protokol MQTT untuk menyebarkan server proxy MQTT, kami telah menginstal docker terlebih dahulu dan tidak akan membahas detailnya nanti.
Sebelum menjalankan perintah untuk menyebarkan gambar, kita perlu menulis file konfigurasi server proxymosquitto.conf。
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Kemudian jalankan perintah deployment:
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Setelah gambar server proxy berjalan, tampilan berikut ditampilkan:
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Setelah itu, kita dapat menguji strategi tersebut untuk mempraktikkannya.
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Penggunaan utama fungsi Dial dalam kode strategi adalah:
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Parameter string dari fungsi Dial dimulai denganmqtt://Ini adalah nama protokol, diikuti oleh alamat dan port pendengaran. Simbol “?” diikuti oleh nama topik langganan/penerbitan. Nama topik yang diuji di sini adalah:test_topic。
Strategi di atas menerbitkan dan berlangganan topik pada saat yang sama. Pengujian yang sedang berjalan seperti yang ditunjukkan pada gambar:

Anda juga dapat menggunakan dua disk nyata untuk saling berlangganan dan menerbitkan informasi topik. Kami menggunakan contoh seperti itu di bagian praktik protokol NATS, dan tidak akan mengulangi metode ini di protokol lain.
Protokol NATS adalah protokol gaya terbitkan/berlangganan sederhana berbasis teks. Klien terhubung ke gnatsd (server NATS) dan berkomunikasi dengan gnatsd. Komunikasi ini didasarkan pada soket TCP/IP biasa dan mendefinisikan serangkaian operasi yang sangat kecil. Baris baru menunjukkan penghentian. Tidak seperti sistem pesan tradisional yang menggunakan format pesan biner, protokol NATS berbasis teks membuat implementasi klien menjadi sederhana dan dapat dengan mudah diimplementasikan dalam berbagai bahasa pemrograman atau bahasa skrip.
Setiap protokol memiliki karakteristiknya sendiri. Anda dapat merujuk pada dokumen dan materi tertentu, yang tidak akan diuraikan di sini.
Terapkan server protokol NATS:
docker run –name nats –rm -p 4222:4222 -p 8222:8222 nats –http_port 8222 –auth admin
Perintah docker ini akan mengunduh dan menjalankan image nats secara otomatis. Port 4222 adalah port yang ingin diakses oleh klien. Setelah gambar disebarkan, monitor http akan dibuka pada port 8222.
Listening for client connections on 0.0.0.0:4222
Server is ready
Citra server nats mulai berjalan, mendengarkan pada port 4222.
Kita perlu membuat dua strategi (perdagangan riil), sebut saja Strategi A dan Strategi B untuk saat ini. Kode kedua strategi ini pada dasarnya sama. Ditulis dalam Javascript, bahasa termudah untuk digunakan pada platform FMZ.
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
var connPub = null
var connSub = null
function main() {
var robotId = _G()
Log("当前实盘robotId:", robotId)
connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
if (!connPub) {
Log("通信失败!")
return
}
connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
if (!connSub) {
Log("通信失败!")
return
}
while (true) {
connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D())
var msgRead = connSub.read(10000)
if (msgRead) {
Log("msgRead:", msgRead)
}
LogStatus(_D())
Sleep(10000)
}
}
function onexit() {
connPub.close()
connSub.close()
}
Kedua strategi ini pada dasarnya sama, kecuali bahwa keduanya saling menerbitkan dan berlangganan, dan topik yang dilanggan, topik yang diterbitkan, dan informasi yang diterbitkan berbeda.
Ambil Strategi B sebagai contoh:
Dial()Fungsi membuat objek server koneksi klienconnPub, digunakan untuk penerbitan pesan topik:var connPub = Dial(“nats://[email protected]:4222?topic=pubRobotB”)
String parameter fungsi Dial dimulai dengannats://Menunjukkan bahwa protokol NATS digunakan untuk komunikasi, makaadminIni adalah kumpulan informasi verifikasi sederhana saat menyebarkan citra dockerauth admin, gunakan karakter “@” untuk memisahkan konten berikut, lalu alamat layanan dan port127.0.0.1:4222, dan terakhir topik publikasikan/berlangganan:topic=pubRobotBPerhatikan bahwa ada simbol “?” di antara alamat sebelumnya.
Dial()Fungsi membuat objek server koneksi klienconnSub, digunakan untuk berlangganan pesan topik:var connSub = Dial(“nats://[email protected]:4222?topic=pubRobotA”)
Satu-satunya perbedaantopic=pubRobotABerbeda, karena Anda perlu berlangganan topik tempat strategi A mengirimkan informasipubRobotA。
Pembuatan dan penggunaan objek koneksi berlangganan dan penerbitan dalam strategi A sama seperti yang dijelaskan di atas.


Dengan cara ini, contoh sederhana penerapan protokol NATS diimplementasikan di mana disk nyata A dan disk nyata B berlangganan dan menerbitkan pesan untuk berkomunikasi satu sama lain.
Dalam komunikasi asinkron, pesan tidak akan langsung sampai ke penerima, tetapi akan disimpan dalam wadah. Jika kondisi tertentu terpenuhi, pesan akan dikirim ke penerima oleh wadah. Wadah ini adalah antrean pesan. Untuk melengkapi fungsi ini , kedua belah pihak perlu mematuhi konvensi dan aturan yang seragam. AMQP adalah salah satu protokol tersebut. Baik pengirim maupun penerima pesan dapat mencapai komunikasi asinkron dengan mematuhi protokol ini. Protokol ini menentukan format pesan dan cara kerjanya.
Setiap protokol memiliki karakteristiknya sendiri. Anda dapat merujuk pada dokumen dan materi tertentu, yang tidak akan diuraikan di sini.
Terapkan server protokol amqp:
docker run –rm –hostname my-rabbit –name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Ketika menyebarkan citra docker, citra tersebut akan otomatis diunduh dan disebarkan, dan setelah selesai akan ditampilkan:
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Setelah citra server disebarkan, tulis contoh pengujian:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Saat menggunakan antrean protokol AMQP, harap perhatikan bahwa pesan yang dipublikasikan akan tetap ada dalam antrean. Misalnya, mari kita jalankan kode contoh di atas sekali. 10 pesan akan ditulis ke antrian. Lalu ketika kita menjalankannya untuk kedua kalinya, kita dapat menemukan bahwa ketika membaca, informasi yang ditulis pertama kali akan terbaca lagi. Seperti yang ditunjukkan pada gambar:

Anda dapat melihat bahwa dua pesan log yang ditunjukkan oleh panah merah pada gambar tangkapan layar memiliki waktu yang tidak konsisten. Alasannya adalah bahwa pesan merah adalah pesan yang dibaca dan ditulis ke antrean saat kode strategi pertama kali dijalankan.
Berdasarkan fitur ini, beberapa persyaratan dapat dipenuhi. Misalnya, setelah strategi dimulai ulang, data pasar yang terekam masih dapat diperoleh dari antrean untuk perhitungan inisialisasi dan operasi lainnya.
Apache Kafka adalah penyimpanan data terdistribusi yang dioptimalkan untuk menyerap dan memproses data streaming secara real-time. Data streaming merujuk pada data yang dihasilkan secara terus-menerus oleh ribuan sumber data, yang sering kali mengirimkan rekaman data secara serentak. Platform streaming perlu memproses data yang mengalir terus menerus ini dan memprosesnya langkah demi langkah secara berurutan.
Kafka menyediakan tiga fungsi utama bagi penggunanya:
Kafka terutama digunakan untuk membangun jaringan data streaming waktu nyata dan aplikasi yang beradaptasi dengan aliran data. Menggabungkan kemampuan pengiriman pesan, penyimpanan, dan pemrosesan aliran untuk menyimpan data historis dan data waktu nyata.
Terapkan citra docker dari proxy Kafka:
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Uji menggunakan kode uji:
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Mari kita lihat cara menggunakan protokol Kafka untuk menerbitkan dan berlangganan pesan dalam fungsi Dial.
Dial("kafka://localhost:9092/test_topic")
Seperti beberapa protokol lainnya, bagian pertama adalah nama protokol. Kemudian ikuti alamat mendengarkannya:localhost:9092. Kemudian gunakan simbol “/” sebagai pemisah, diikuti dengan topik langganan/penerbitan. Di sini topik pengujian ditetapkan ketest_topic。
Hasil pengujian:
