Angesichts der rasanten Entwicklung der Finanzmärkte und der Popularität des quantitativen Handels verlassen sich immer mehr Händler beim Handel auf automatisierte Strategien. Dabei kommt der Kommunikation und Abstimmung der Strategien eine besondere Bedeutung zu. FMZ (Quantitative Trading Platform) unterstützt Händler bei der nahtlosen Strategieintegration und dem Echtzeit-Datenaustausch, indem es ein effizientes Kommunikationsprotokoll zwischen realen Handelsstrategien bereitstellt.
In diesem Artikel wird das Echtzeit-Kommunikationsprotokoll von Handelsstrategien auf der FMZ-Plattform eingehend untersucht und sein Designkonzept, seine Funktionsmerkmale und seine Vorteile in der praktischen Anwendung vorgestellt. Anhand einer detaillierten Fallanalyse zeigen wir, wie Sie mit diesem Protokoll eine effiziente und stabile Strategiekommunikation erreichen und die Ausführung und Rentabilität von Handelsstrategien verbessern können.
Egal, ob Sie ein quantitativer Handelsenthusiast sind, der gerade erst mit FMZ anfängt, oder ein erfahrener professioneller Programmierer, dieser Artikel wird Ihnen wertvolle Einblicke und praktische Bedienungshandbücher liefern. Lassen Sie uns die leistungsstarken Funktionen von FMZ erkunden und erfahren, wie Sie durch effiziente Kommunikationsprotokolle eine Zusammenarbeit zwischen Strategien erreichen, die Handelseffizienz verbessern und Marktchancen nutzen können.
Nachfrageszenario
-
- Kollaborativer Handel mit mehreren Strategien
Bedarfsszenario:
In einem komplexen Marktumfeld ist eine einzelne Strategie möglicherweise nicht in der Lage, mit verschiedenen Notfällen und Marktveränderungen umzugehen. Händler möchten mehrere Strategien gleichzeitig ausführen, beispielsweise Trendfolgestrategien, Mean-Reversion-Strategien und Arbitragestrategien, und diese Strategien in Echtzeit kommunizieren lassen, um Marktinformationen und Handelssignale auszutauschen und so die allgemeine Handelseffizienz und -stabilität zu verbessern.
- Kollaborativer Handel mit mehreren Strategien
-
- Cross-Market-Arbitrage
Bedarfsszenario:
Händler möchten Arbitragegeschäfte zwischen verschiedenen Handelsmärkten durchführen. Arbitrage ist beispielsweise möglich, indem man den Preisunterschied zwischen dem A-Aktienmarkt und der Börse von Hongkong ausnutzt. Wenn in einem bestimmten Markt abnormale Preise auftreten, muss die Strategie umgehend Strategien in anderen Märkten benachrichtigen, um entsprechende Kauf- und Verkaufstransaktionen durchzuführen und so Arbitragemöglichkeiten zu nutzen.
- Cross-Market-Arbitrage
-
- Risikomanagement und Absicherung
Bedarfsszenario:
Eine Strategie ist für das Auffinden und Ausführen von Handelsgeschäften mit hohem Risiko und hoher Rendite auf dem Markt zuständig, während sich eine andere Strategie auf die Überwachung des Gesamtrisikos und die Durchführung von Absicherungsgeschäften konzentriert. Um sicherzustellen, dass bei risikoreichen Transaktionen keine übermäßigen Verluste entstehen, müssen die beiden Strategien in Echtzeit kommunizieren und Daten austauschen, damit Positionen rechtzeitig angepasst und Risiken abgesichert werden können.
- Risikomanagement und Absicherung
-
- Verteiltes Handelssystem
Bedarfsszenario:
Große Handelsinstitute hoffen, verteilte Handelssysteme auf mehreren physischen Servern ausführen zu können, um die Fehlertoleranz und Leistung der Handelssysteme zu verbessern. Die Strategien auf diesen Servern müssen Daten synchronisieren und Vorgänge über Kommunikationsprotokolle koordinieren, um den stabilen und effizienten Betrieb des gesamten Handelssystems sicherzustellen.
- Verteiltes Handelssystem
-
- Marktbeobachtung und Frühwarnung
Bedarfsszenario:
Eine Strategie ist für die Echtzeitüberwachung der Marktdynamik verantwortlich. Bei größeren Veränderungen auf dem Markt (wie einem plötzlichen Preissturz oder -anstieg) muss die Strategie andere Strategien schnell benachrichtigen, um entsprechende Maßnahmen zu ergreifen, wie z. B. das Schließen von Positionen, Anpassen von Positionen oder Hinzufügen von Positionen zur Risikoreduzierung. Oder nutzen Sie eine Handelsmöglichkeit.
- Marktbeobachtung und Frühwarnung
-
- Portfoliostrategiemanagement
Bedarfsszenario:
Händler verwenden ein Portfolio von Strategien, um Investitionen in verschiedenen Anlageklassen zu verwalten, wobei sich jede Strategie auf eine bestimmte Anlageklasse (wie Aktien, Anleihen, Futures usw.) konzentriert. Diese Strategien müssen kommuniziert und koordiniert werden, um eine Gesamtoptimierung der Portfolioinvestitionen und eine Maximierung der Erträge zu erreichen.
- Portfoliostrategiemanagement
Diese Anforderungsszenarien demonstrieren die vielfältigen Möglichkeiten und Vorteile des FMZ Handelsstrategie Echtzeit-Kommunikationsprotokolls in der praktischen Anwendung. Durch eine effektive Kommunikation zwischen Strategien können Händler besser mit komplexen Marktumgebungen umgehen, Handelsstrategien optimieren und die Handelseffizienz und den Gewinn verbessern.
FMZ gekapseltes Kommunikationsprotokoll und Wählfunktion
Nachdem wir die Kommunikationsanforderungen zwischen realen Festplatten verstanden haben, müssen wir überlegen, wie wir diese Anforderungen realisieren können. Es handelt sich lediglich darum, dass der reale Fall A einen Informationsaustausch mit dem realen Fall B erhofft, auch wenn die Forderung einfach erscheint. Allerdings müssen verschiedene Details mithilfe einer Reihe von Kommunikationsprotokollen vereinbart werden. FMZ hat mehrere gängige Kommunikationsprotokolle gekapselt.
mqtt / nats / amqp / kafka
Kommunikationsarchitektur
Die Kommunikationsarchitektur ist:
- Server (Proxy).
Um Nachrichten zwischen Abonnenten und Herausgebern weiterzuleiten, muss ein Kommunikationsprotokollserver ausgeführt werden. Dieser Server kann lokal auf dem System des Hosts bereitgestellt werden (für lokale Real-Disk-Kommunikation); er kann auch ein Remote-Dienst sein (für serverübergreifende Real-Disk-Kommunikation). - Kunden (Abonnenten, Herausgeber).
Das Strategieprogramm auf FMZ kann als Client eines Kommunikationsprotokolls verstanden werden. Das Strategieprogramm kann Herausgeber (Pub) oder Abonnent (Sub) sein.
Wählfunktion
Wenn diese Protokolle auf der FMZ-Plattform angewendet werden, kann man sie einfach als mqtt/nats/amqp/kafka verstehen. Diese Protokolle sind integriert inDial()Verwenden Sie in der FunktionDial()Funktionen führen Vorgänge wie die Veröffentlichung und das Abonnieren von Nachrichten aus. Diese veröffentlichten Nachrichten werden über den Protokollserver an die abonnierte reale Festplatte weitergeleitet, daher muss zuerst ein Protokollserver ausgeführt werden. Zur Vereinfachung der Demonstration verwenden wir in den folgenden Beispielen verschiedene Bereitstellungen von Protokollserver-Images.
Abschnitt zur API-Dokumentation der Wählfunktion: https://www.fmz.com/syntax-guide#fun_dial
Denken Sie daran, vor der Bereitstellung des Docker-Image zuerst die Docker-Software zu installieren.
Lassen Sie uns als Nächstes die von FMZ unterstützten Kommunikationsprotokollanwendungen erkunden und üben.
Praxis des Echtzeit-Kommunikationsprotokolls der FMZ-Plattform
MQTT-Protokoll
MQTT (Message Queuing Telemetry Transport) ist ein leichtes Nachrichtenübertragungsprotokoll, das sich besonders für Netzwerkumgebungen mit geringer Bandbreite, hoher Latenz oder unzuverlässigen Netzwerkumgebungen eignet. Es wurde 1999 von Andy Stanford-Clark und Arlen Nipper von IBM vorgeschlagen und wurde später zum ISO-Standard (ISO/IEC PRF 20922).
Die Hauptfunktionen des MQTT-Protokolls: Publish/Subscribe-Modus
- Veröffentlichen: Der Nachrichtenproduzent sendet die Nachricht an das Thema.
- Abonnement: Ein Nachrichtenkonsument abonniert ein Thema, das ihn interessiert, und erhält dadurch zu diesem Thema veröffentlichte Nachrichten.
- Broker: MQTT verwendet einen Nachrichtenbroker als Vermittler zum Weiterleiten von Nachrichten und stellt so die Entkopplung zwischen Herausgebern und Abonnenten sicher.
Nachrichtenveröffentlichung und -abonnement
Da wir zum Bereitstellen des MQTT-Proxyservers das Docker-Image (Eclipse-Mosquitto-Image) einer Software verwenden, die das MQTT-Protokoll unterstützt, haben wir Docker im Voraus installiert und gehen später nicht näher darauf ein.
Bevor wir den Befehl zum Bereitstellen des Images ausführen, müssen wir eine Proxyserver-Konfigurationsdatei schreibenmosquitto.conf。
mosquitto.conf
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Führen Sie dann den Bereitstellungsbefehl aus:
终端
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Nachdem das Proxy-Server-Image ausgeführt wurde, wird die folgende Anzeige angezeigt:
终端
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Dann können wir die Strategie testen und in die Praxis umsetzen.
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Die Dial-Funktion wird im Strategiecode hauptsächlich wie folgt verwendet:
javascript
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Der String-Parameter der Dial-Funktion beginnt mitmqtt://Es handelt sich um den Protokollnamen, gefolgt von der Abhöradresse und dem Port. Auf das Symbol "?" folgt der Name des Abonnement-/Veröffentlichungsthemas. Der hier getestete Themenname lautet:test_topic。
Die obige Strategie veröffentlicht und abonniert gleichzeitig ein Thema. Der laufende Test sieht wie in der Abbildung dargestellt aus:
Sie können auch zwei reale Festplatten verwenden, um sich gegenseitig zu abonnieren und Themeninformationen zu veröffentlichen. Wir verwenden ein solches Beispiel im Praxisabschnitt des NATS-Protokolls und werden diese Methode in anderen Protokollen nicht wiederholen.
NATS-Protokoll
Das NATS-Protokoll ist ein einfaches, textbasiertes Publish/Subscribe-Protokoll. Der Client stellt eine Verbindung zu gnatsd (NATS-Server) her und kommuniziert mit gnatsd. Die Kommunikation basiert auf normalen TCP/IP-Sockets und definiert eine sehr kleine Anzahl von Operationen. Ein Zeilenumbruch zeigt die Beendigung an. Im Gegensatz zu herkömmlichen Nachrichtensystemen, die binäre Nachrichtenformate verwenden, vereinfacht das textbasierte NATS-Protokoll die Client-Implementierung und kann problemlos in einer Vielzahl von Programmiersprachen oder Skriptsprachen implementiert werden.
Jedes Protokoll hat seine eigenen Merkmale. Sie können auf die spezifischen Dokumente und Materialien verweisen, auf die hier nicht näher eingegangen wird.
Stellen Sie den NATS-Protokollserver bereit:
docker run --name nats --rm -p 4222:4222 -p 8222:8222 nats --http_port 8222 --auth admin
Dieser Docker-Befehl lädt das NAT-Image automatisch herunter und führt es aus. Port 4222 ist der Port, auf den der Client zugreifen möchte. Nachdem das Image bereitgestellt wurde, wird auf Port 8222 ein HTTP-Monitor geöffnet.
终端
Listening for client connections on 0.0.0.0:4222
Server is ready
Das NAT-Server-Image wird gestartet und lauscht auf Port 4222.
Kommunikation zwischen lokalen Geräten in Echtzeitstrategien
Wir müssen zwei Strategien erstellen (echtes Trading), nennen wir sie vorerst Strategie A und Strategie B. Die Codes dieser beiden Strategien sind grundsätzlich gleich. Geschrieben in Javascript, der am einfachsten zu verwendenden Sprache auf der FMZ-Plattform.
-
Strategie A
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() } -
Strategie B
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() }
Diese beiden Strategien sind grundsätzlich gleich, mit der Ausnahme, dass sie sich gegenseitig veröffentlichen und abonnieren und dass die abonnierten Themen, die veröffentlichten Themen und die veröffentlichten Informationen unterschiedlich sind.
Nehmen wir Strategie B als Beispiel:
-
- Verwendung
Dial()Funktion erstellt ein Client-VerbindungsserverobjektconnPub, wird zum Veröffentlichen von Themennachrichten verwendet:
var connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
Der Parameterstring der Dial-Funktion beginnt mit
nats://Zeigt an, dass das NATS-Protokoll für die Kommunikation verwendet wird.adminEs handelt sich um eine einfache Überprüfungsinformation, die beim Bereitstellen des Docker-Image festgelegt wirdauth admin, verwenden Sie das Zeichen "@", um den folgenden Inhalt zu trennen, und dann die Serviceadresse und den Port127.0.0.1:4222und schließlich das Publizieren/Abonnieren-Thema:topic=pubRobotBBeachten Sie, dass zwischen der vorherigen Adresse ein „?“-Symbol steht. - Verwendung
-
- Verwendung
Dial()Funktion erstellt ein Client-VerbindungsserverobjektconnSub, wird für das Abonnement von Themennachrichten verwendet:
var connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
Der einzige Unterschied
topic=pubRobotAAnders, weil Sie das Thema abonnieren müssen, an das Strategie A Informationen sendetpubRobotA。 - Verwendung
Die Erstellung und Verwendung von Abonnement- und Veröffentlichungsverbindungsobjekten in Strategie A sind dieselben wie oben beschrieben.
Auf diese Weise wird ein einfaches Beispiel einer NATS-Protokollanwendung implementiert, bei der die reale Festplatte A und die reale Festplatte B Nachrichten abonnieren und veröffentlichen, um miteinander zu kommunizieren.
AMQP-Protokoll
AMQP-Protokollwarteschlange
Bei der asynchronen Kommunikation erreicht die Nachricht den Empfänger nicht sofort, sondern wird in einem Container gespeichert. Wenn bestimmte Bedingungen erfüllt sind, wird die Nachricht vom Container an den Empfänger gesendet. Dieser Container ist die Nachrichtenwarteschlange. Um diese Funktion abzuschließen , müssen beide Parteien Der Container und seine Komponenten müssen einheitliche Konventionen und Regeln einhalten. AMQP ist ein solches Protokoll. Sowohl der Absender als auch der Empfänger von Nachrichten können durch die Einhaltung dieses Protokolls eine asynchrone Kommunikation erreichen. Dieses Protokoll gibt das Nachrichtenformat und ihre Funktionsweise an.
Jedes Protokoll hat seine eigenen Merkmale. Sie können auf die spezifischen Dokumente und Materialien verweisen, auf die hier nicht näher eingegangen wird.
Stellen Sie den AMQP-Protokollserver bereit:
docker run --rm --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Beim Bereitstellen eines Docker-Images wird es automatisch heruntergeladen und bereitgestellt. Nach Abschluss wird Folgendes angezeigt:
终端
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
Nachdem das Server-Image bereitgestellt wurde, schreiben Sie einen Beispieltest:
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Beachten Sie bei Verwendung der AMQP-Protokollwarteschlange, dass die veröffentlichten Nachrichten in der Warteschlange verbleiben. Lassen Sie uns beispielsweise den obigen Beispielcode einmal ausführen. 10 Nachrichten werden in die Warteschlange geschrieben. Wenn wir es dann zum zweiten Mal ausführen, können wir feststellen, dass beim Lesen die beim ersten Mal geschriebenen Informationen erneut gelesen werden. Wie in der Abbildung gezeigt:
Sie können sehen, dass die beiden Protokollmeldungen, auf die im Screenshot die roten Pfeile zeigen, inkonsistente Zeiten aufweisen. Der Grund dafür ist, dass die rote Meldung diejenige ist, die gelesen und in die Warteschlange geschrieben wurde, als der Strategiecode zum ersten Mal ausgeführt wurde.
Basierend auf dieser Funktion können einige Anforderungen erfüllt werden. Beispielsweise können nach dem Neustart der Strategie die aufgezeichneten Marktdaten weiterhin aus der Warteschlange für Initialisierungsberechnungen und andere Vorgänge abgerufen werden.
Kafka-Protokoll
Apache Kafka ist ein verteilter Datenspeicher, der für die Aufnahme und Verarbeitung von Streaming-Daten in Echtzeit optimiert ist. Beim Streaming-Daten handelt es sich um Daten, die kontinuierlich von Tausenden von Datenquellen generiert werden, wobei Datensätze häufig gleichzeitig gesendet werden. Die Streaming-Plattform muss diese kontinuierlich fließenden Daten verarbeiten und sie Schritt für Schritt nacheinander abarbeiten.
Kafka bietet seinen Benutzern drei Hauptfunktionen:
- Veröffentlichen und Abonnieren von Datensatz-Streams
- Effizientes Speichern eines Datensatzstroms in der Reihenfolge, in der er generiert wurde
- Datenströme in Echtzeit verarbeiten
Kafka wird hauptsächlich zum Erstellen von Echtzeit-Streaming-Datenpipelines und Anwendungen verwendet, die sich an Datenströme anpassen. Es kombiniert Messaging-, Speicher- und Stream-Verarbeitungsfunktionen, um sowohl historische als auch Echtzeitdaten zu speichern.
Nachrichtenveröffentlichung und -abonnement
Stellen Sie das Docker-Image des Kafka-Proxys bereit:
终端
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Testen Sie mit dem Testcode:
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Sehen wir uns an, wie das Kafka-Protokoll zum Veröffentlichen und Abonnieren von Nachrichten in der Dial-Funktion verwendet wird.
javascript
Dial("kafka://localhost:9092/test_topic")
Wie bei mehreren anderen Protokollen ist der erste Teil der Protokollname. Anschließend folgt die Höradresse:localhost:9092. Dann verwenden Sie das Symbol "/" als Trennzeichen, gefolgt vom Thema des Abonnements/der Veröffentlichung. Hier wird das Testthema auftest_topic。
Testergebnisse:
- 1







