金融市場の急速な発展と定量取引の普及により、ますます多くのトレーダーが取引の自動化戦略に頼り始めています。このプロセスでは、戦略間のコミュニケーションと調整が特に重要です。 FMZ (定量取引プラットフォーム) は、実際の取引戦略間の効率的な通信プロトコルを提供することで、トレーダーがシームレスな戦略統合とリアルタイムのデータ共有を実現できるように支援します。
この記事では、FMZ プラットフォームの取引戦略のリアルタイム通信プロトコルを詳細に検討し、その設計コンセプト、機能的特徴、および実際のアプリケーションにおける利点を紹介します。詳細なケース分析を通じて、このプロトコルを使用して効率的で安定した戦略コミュニケーションを実現し、取引戦略の実行と収益性を向上させる方法を説明します。
FMZ を使い始めたばかりの定量取引愛好家であっても、経験豊富なプロのプログラマーであっても、この記事は貴重な洞察と実用的な操作ガイドを提供します。 FMZ の強力な機能を探り、効率的な通信プロトコルを通じて戦略間の連携を実現し、取引効率を向上させ、市場機会を獲得する方法を学びましょう。
需要シナリオ
-
- マルチ戦略協調取引
需要シナリオ:
複雑な市場環境では、単一の戦略ではさまざまな緊急事態や市場の変化に対処できない可能性があります。トレーダーは、トレンドフォロー戦略、平均回帰戦略、裁定取引戦略などの複数の戦略を同時に実行し、これらの戦略をリアルタイムで通信させて市場情報と取引シグナルを共有し、全体的な取引の効率と安定性を向上したいと考えています。
- マルチ戦略協調取引
-
- クロスマーケット裁定取引
需要シナリオ:
トレーダーは、異なる取引市場間で裁定取引を実行したいと考えています。例えば、A株市場と香港株式市場の価格差を利用して裁定取引を行うことができます。ある市場で異常な価格が発生した場合、裁定取引の機会を捉えるために、戦略は他の市場の戦略に速やかに通知し、対応する売買操作を実行する必要があります。
- クロスマーケット裁定取引
-
- リスク管理とヘッジ
需要シナリオ:
1 つの戦略は、市場で高リスク、高リターンの取引を見つけて実行することに責任を持ち、もう 1 つの戦略は、全体的なリスクを監視し、ヘッジ操作を実行することに重点を置いています。高リスク取引中に過度の損失が発生しないようにするには、2 つの戦略がリアルタイムで通信してデータを共有し、ポジションを調整してリスクを適時にヘッジする必要があります。
- リスク管理とヘッジ
-
- 分散型取引システム
需要シナリオ:
大規模な取引機関は、取引システムのフォールト トレランスとパフォーマンスを向上させるために、複数の物理サーバー上で分散取引システムを実行したいと考えています。これらのサーバー上の戦略では、取引システム全体の安定した効率的な運用を確保するために、通信プロトコルを通じてデータを同期し、操作を調整する必要があります。
- 分散型取引システム
-
- 市場の監視と早期警告
需要シナリオ:
戦略は、市場の動向をリアルタイムで監視する役割を担っています。市場に大きな変化(突然の価格の急落や急騰など)があった場合、戦略は他の戦略に迅速に通知して、ポジションのクローズなどの対応するアクションを実行する必要があります。ポジションを調整したり、ポジションを追加してリスクを軽減したり、取引の機会を利用したりします。
- 市場の監視と早期警告
-
- ポートフォリオ戦略管理
需要シナリオ:
トレーダーは、さまざまな資産クラスにわたる投資を管理するために戦略のポートフォリオを使用します。各戦略は、特定の資産クラス (株式、債券、先物など) に重点を置いています。ポートフォリオ投資の全体的な最適化と収益の最大化を実現するには、これらの戦略を伝え、調整する必要があります。
- ポートフォリオ戦略管理
これらの需要シナリオは、実際のアプリケーションにおける FMZ 取引戦略のリアルタイム通信プロトコルのさまざまな可能性と利点を示しています。戦略間の効果的なコミュニケーションを通じて、トレーダーは複雑な市場環境にうまく対応し、取引戦略を最適化し、取引の効率と利益を向上させることができます。
FMZカプセル化通信プロトコルとダイヤル機能
実際のディスク間の通信要件を理解した後、これらの要件をどのように実現するかを検討する必要があります。要求は単純に見えますが、それは実市場 A が実市場 B と情報交換することを望んでいるに過ぎません。ただし、一連の通信プロトコルを使用してさまざまな詳細について合意する必要があります。FMZ は、いくつかの一般的な通信プロトコルをカプセル化しています。
mqtt / nats / amqp / kafka
通信アーキテクチャ
通信アーキテクチャは次のとおりです。
- サーバー(プロキシ)。
サブスクライバーとパブリッシャーの間でメッセージを中継するには、通信プロトコル サーバーを実行する必要があります。このサーバーは、ホストのシステム上にローカルに展開できます (ローカルの実ディスク通信用)。また、リモート サービスとしても展開できます (サーバー間の実ディスク通信用)。 - クライアント(購読者、発行者)。
FMZ 上の戦略プログラムは、通信プロトコルのクライアントとして理解できます。戦略プログラムは、パブリッシャー (pub) またはサブスクライバー (sub) になることができます。
ダイヤル機能
これらのプロトコルをFMZプラットフォームに適用する場合、それは単にmqtt/nats/amqp/kafkaとして理解することができます。これらのプロトコルは、Dial()関数では、Dial()関数は、メッセージの公開やサブスクリプションなどの操作を実行します。これらの公開されたメッセージは、プロトコル サーバーを介してサブスクライブされた実際のディスクにプロキシ (中継) されるため、最初にプロトコル サーバーを実行する必要があります。デモンストレーションを簡単にするために、次の例ではさまざまなプロトコル サーバー イメージの展開を使用します。
ダイヤル機能 API ドキュメント セクション: https://www.fmz.com/syntax-guide#fun_dial
Docker イメージを展開する前に、まず Docker ソフトウェアをインストールすることを忘れないでください。
次に、FMZ でサポートされている通信プロトコル アプリケーションを調べて実践してみましょう。
FMZプラットフォームリアルタイム通信プロトコル実践
MQTTプロトコル
MQTT (Message Queuing Telemetry Transport) は、低帯域幅、高遅延、または信頼性の低いネットワーク環境に特に適した軽量のメッセージ伝送プロトコルです。これは 1999 年に IBM の Andy Stanford-Clark 氏と Arlen Nipper 氏によって提案され、後に ISO 標準 (ISO/IEC PRF 20922) になりました。
MQTTプロトコルの主な機能:パブリッシュ/サブスクライブモード
- パブリッシング: メッセージ プロデューサーがトピックにメッセージを送信します。
- サブスクリプション: メッセージ コンシューマーは関心のあるトピックをサブスクライブし、そのトピックに公開されたメッセージを受信します。
- ブローカー: MQTT はメッセージ ブローカーを仲介として使用してメッセージを転送し、パブリッシャーとサブスクライバー間の分離を保証します。
メッセージの公開とサブスクリプション
MQTT プロキシ サーバーの導入には、MQTT プロトコルに対応したソフトウェアの docker イメージ (eclipse-mosquitto イメージ) を使用するため、事前に docker をインストールしておき、後ほど詳しく説明しません。
イメージを展開するコマンドを実行する前に、プロキシサーバーの設定ファイルを作成する必要があります。mosquitto.conf。
mosquitto.conf
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
次に、デプロイメント コマンドを実行します。
终端
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
プロキシ サーバー イメージが実行されると、次の画面が表示されます。
终端
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
その後、戦略をテストして実践することができます。
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
戦略コードにおける Dial 関数の主な用途は次のとおりです。
javascript
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Dial関数の文字列パラメータはmqtt://プロトコル名の後にリスニング アドレスとポートが続きます。 「?」記号の後にサブスクリプション/公開トピック名が続きます。ここでテストされるトピック名は次のとおりです。test_topic。
上記の戦略では、トピックの公開とサブスクライブを同時に行います。実行中のテストは図のようになります。
2 つの実際のディスクを使用して相互にサブスクライブし、トピック情報を公開することもできます。NATS プロトコルの実践セクションではこのような例を使用しますが、他のプロトコルではこの方法を繰り返すことはありません。
NATSプロトコル
NATS プロトコルは、シンプルなテキストベースのパブリッシュ/サブスクライブ スタイルのプロトコルです。クライアントは gnatsd (NATS サーバー) に接続し、gnatsd と通信します。通信は通常の TCP/IP ソケットに基づいており、非常に小さな操作セットを定義します。改行は終了を示します。バイナリ メッセージ形式を使用する従来のメッセージング システムとは異なり、テキストベースの NATS プロトコルではクライアントの実装が簡単になり、さまざまなプログラミング言語やスクリプト言語で簡単に実装できます。
各プロトコルには独自の特徴があります。ここでは詳しく説明しませんので、具体的な文書や資料を参照してください。
NATS プロトコル サーバーを展開します。
docker run --name nats --rm -p 4222:4222 -p 8222:8222 nats --http_port 8222 --auth admin
この docker コマンドは、nats イメージを自動的にダウンロードして実行します。ポート 4222 は、クライアントがアクセスするポートです。イメージがデプロイされると、ポート 8222 で http モニターが開かれます。
终端
Listening for client connections on 0.0.0.0:4222
Server is ready
NATS サーバー イメージが実行を開始し、ポート 4222 でリッスンします。
ローカルデバイス間の通信リアルタイム戦略
2 つの戦略 (実際の取引) を作成する必要があります。ここでは、戦略 A と戦略 B と名付けます。これら 2 つの戦略のコードは基本的に同じです。 FMZ プラットフォームで最も使いやすい言語である JavaScript で書かれています。
-
戦略A
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() } -
戦略B
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() }
これら 2 つの戦略は、相互に公開およびサブスクライブすることと、サブスクライブされたトピック、公開されたトピック、および公開された情報が異なることを除いて、基本的に同じです。
戦略 B を例に挙げます。
-
- 使用
Dial()関数はクライアント接続サーバーオブジェクトを作成しますconnPubトピックメッセージの公開に使用されます:
var connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
Dial関数のパラメータ文字列は
nats://NATSプロトコルが通信に使用されていることを示します。adminDockerイメージをデプロイする際に設定される簡単な検証情報ですauth admin、文字「@」を使用して次のコンテンツを区切り、その後にサービスアドレスとポートを入力します。127.0.0.1:4222最後に、パブリッシュ/サブスクライブトピックです。topic=pubRobotB前のアドレスの間に「?」記号があることに注意してください。 - 使用
-
- 使用
Dial()関数はクライアント接続サーバーオブジェクトを作成しますconnSubトピックメッセージのサブスクリプションに使用されます:
var connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
唯一の違いは
topic=pubRobotA異なるのは、戦略Aが情報を送信するトピックを購読する必要があるためです。pubRobotA。 - 使用
戦略 A でのサブスクリプションおよびパブリッシング接続オブジェクトの作成と使用は、上記で説明したものと同じです。
このようにして、実ディスク A と実ディスク B がメッセージをサブスクライブおよび公開して相互に通信する、NATS プロトコル アプリケーションの簡単な例が実装されます。
amqp プロトコル
AMQP プロトコル キュー
非同期通信では、メッセージはすぐに受信者に届くのではなく、コンテナに保存されます。特定の条件が満たされると、メッセージはコンテナによって受信者に送信されます。このコンテナがメッセージキューです。この機能を完了するには、双方ともコンテナとそのコンポーネントが統一された規則とルールに準拠する必要があります。AMQP はそのようなプロトコルです。メッセージの送信者と受信者の両方がこのプロトコルに準拠することで非同期通信を実現できます。このプロトコルは、メッセージの形式とその動作方法を指定します。
各プロトコルには独自の特徴があります。ここでは詳しく説明しませんので、具体的な文書や資料を参照してください。
amqp プロトコル サーバーをデプロイします。
docker run --rm --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
Docker イメージをデプロイすると、自動的にダウンロードおよびデプロイされ、完了すると次のように表示されます。
终端
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
サーバー イメージがデプロイされたら、テスト例を記述します。
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
AMQP プロトコル キューを使用する場合、公開されたメッセージはキューに保持されることに注意してください。たとえば、上記のサンプル コードを 1 回実行してみましょう。 10 件のメッセージがキューに書き込まれます。次に 2 回目に実行すると、読み取り時に最初に書き込まれた情報が再度読み取られることがわかります。図に示すように:
スクリーンショットの赤い矢印で示されている 2 つのログ メッセージの時間が一致していないことがわかります。その理由は、赤いメッセージは、戦略コードが最初に実行されたときに読み取られ、キューに書き込まれたメッセージだからです。
この機能に基づいて、いくつかの要件を満たすことができます。たとえば、戦略を再起動した後でも、記録された市場データをキューから取得して、初期化計算やその他の操作を行うことができます。
Kafka プロトコル
Apache Kafka は、ストリーミング データをリアルタイムで取り込んで処理するために最適化された分散データ ストアです。ストリーミング データとは、何千ものデータ ソースによって継続的に生成され、多くの場合データ レコードが同時に送信されるデータのことです。ストリーミング プラットフォームは、この継続的に流れるデータを処理し、順番に段階的に処理する必要があります。
Kafka はユーザーに 3 つの主な機能を提供します。
- レコードストリームの公開と購読
- レコードのストリームを生成された順に効率的に保存する
- レコードのストリームをリアルタイムで処理する
Kafka は主に、データ ストリームに適応するリアルタイム ストリーミング データ パイプラインとアプリケーションの構築に使用されます。メッセージング、ストレージ、ストリーム処理機能を組み合わせて、履歴データとリアルタイムデータの両方を保存します。
メッセージの公開とサブスクリプション
Kafka プロキシの Docker イメージをデプロイします。
终端
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
テストコードを使用してテストします。
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Dial 関数で Kafka プロトコルを使用してメッセージを公開およびサブスクライブする方法を見てみましょう。
javascript
Dial("kafka://localhost:9092/test_topic")
他のいくつかのプロトコルと同様に、最初の部分はプロトコル名です。次に、リスニング アドレスに従います。localhost:9092。次に、記号「/」を区切りとして使用し、その後に購読/出版のトピックを続けます。ここでは、テストトピックは次のように設定されています。test_topic。
テスト結果:
- 1







