
オリジナルのFMZ戦略設計では、非同期並行操作を使用する必要がある場合は、exchange.Go()関数は FMZ カプセル化インターフェースの同時実行性を実装するために使用され、一部のカスタム操作 (関数) は同時に実行できません。この設計により、ポリシー プログラムの実行効率が大幅に向上しますが、ネイティブ プログラミング言語での並行設計の経験がある学生にとってはまだ馴染みのないものです。
FMZを使った定量取引を初めて学ぶ学生の中には、exchange.Go()機能の使用、使用exchange.Go()順次実行されるコードでは、ステートメントが 1 つずつ実行されているように見えます。この記事では、FMZ プラットフォームの新しい同時スレッド機能について説明します。__Thread()一連の機能と戦略プログラム非同期設計の使用。
戦略メイン スレッドの実行中に、子スレッドを同時に実行して、記述したカスタム関数を実行する場合は、次のコードに似た設計を使用できます。戦略コード内の関数をカスタマイズするGetTickerAsync()、この関数の具体的な機能を記述します。この関数は無限ループを実行します。whileFMZ API インターフェースはループ内で継続的に呼び出されます。GetTicker()市場データを取得するため。
次に__threadSetData(0, "ticker", t)この文はメインスレッドにデータを書き込みます。データ名はticker、データ値はt今すぐGetTicker()の戻り値。
__threadSetData(0, "ticker", t)
スレッドの同時実行のためのカスタム関数を設計した後、次のように記述できます。main()関数内のコードはmain()関数の先頭では、以下を使用します。
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
同時実行スレッドを作成し、実行を開始するGetTickerAsync()関数。それからmain()関数は実行を開始し、whileループ、ループ内で受信GetTickerAsync()この関数はデータを更新し、次を出力します:
var t = __threadGetData(0, "ticker")
Log(t)
完全なコード例:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
実際のディスク動作テスト:

これは最もシンプルなアプリケーション設計です。次に、他の需要設計を見てみましょう。
関数は同時に 10 個のスレッドを作成するように設計でき、各スレッドは順序操作関数を実行します。存在するmain()機能を設計するwhileループ、検出戦略の相互作用の指示。対話型の指示を受け取る:placeMultipleOrdersこの同時実行命令関数を呼び出すだけですtestPlaceMultipleOrders()。
if (cmd == "placeMultipleOrders") {
// ...
}
戦略編集ページに戦略インタラクションデザインを追加し、コマンドplaceMultipleOrdersでボタンを設定します。

完全なコード例:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}



この要件は、同時スレッドでの使用方法を示す簡単な例を希望した FMZ ユーザーによって提起されました。WebSocketメインスレッドにデータを渡す方法を接続して設計するmain()関数。
これは実際には非常に単純で、前の例で同時スレッドを作成するのと似ています。スレッド間の通信のみが使用されます__threadPeekMessage()機能と__threadPostMessage()関数。 Binance Exchange の WebSocket API インターフェース呼び出しを例にとると、設計では WebSocket 接続の終了操作にも注意を払う必要があります。次の例では、同時実行スレッドに停止を通知する方法も示しています。
完全なコード例:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
実際のディスク動作テスト:

見ることができますmain()この関数は、同時実行スレッドによって作成された WebSocket 接続から市場データを継続的に受信します。
戦略が停止されると、スイープ機能が動作を開始します。
