
Wenn Sie im ursprünglichen FMZ-Strategiedesign asynchrone gleichzeitige Operationen verwenden müssen, können Sie nurexchange.Go()Die Funktion wird verwendet, um die Parallelität der FMZ-Kapselungsschnittstelle zu implementieren, und einige benutzerdefinierte Vorgänge (Funktionen) können nicht gleichzeitig ausgeführt werden. Obwohl dieser Entwurf die Effizienz der Ausführung von Richtlinienprogrammen erheblich verbessert, ist er für Studierende, die Erfahrung mit parallelem Entwurf in nativen Programmiersprachen haben, immer noch sehr ungewohnt.
Selbst einige neue Studenten, die neu im quantitativen Handel mit FMZ sind, verstehen nichtexchange.Go()Nutzung von Funktionen, Nutzungexchange.Go()Es sieht immer noch so aus, als würden Anweisungen im sequenziell ausgeführten Code einzeln ausgeführt. In diesem Artikel erkunden wir die neuen Concurrent-Threading-Funktionen der FMZ-Plattform:__Thread()Die Verwendung einer Reihe von Funktionen und Strategieprogrammen erfolgt asynchron.
Wenn wir einen untergeordneten Thread parallel zur Ausführung einer benutzerdefinierten Funktion ausführen möchten, die wir geschrieben haben, während der Hauptthread der Strategie ausgeführt wird, können wir ein Design verwenden, das dem folgenden Code ähnelt. Anpassen einer Funktion im StrategiecodeGetTickerAsync(), schreiben Sie die spezifische Funktionalität dieser Funktion. Diese Funktion führt eine Endlosschleife aus.whileDie FMZ API Schnittstelle wird in einer Schleife fortlaufend aufgerufen:GetTicker()Um Marktdaten zu erhalten.
Dann verwenden Sie__threadSetData(0, "ticker", t)Dieser Satz schreibt Daten in den Haupt-Thread. Der Datenname istticker, der Datenwert isttIm AugenblickGetTicker()Der Rückgabewert von .
__threadSetData(0, "ticker", t)
Nachdem wir die benutzerdefinierte Funktion für die gleichzeitige Ausführung von Threads entworfen haben, können wir schreibenmain()Der Code in der Funktion istmain()Zu Beginn der Funktion verwenden wir:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
Erstellen Sie einen parallelen Thread, der mit der Ausführung beginnt.GetTickerAsync()Funktion. Dannmain()Die Funktion beginnt mit der Ausführung ihrerwhileSchleife, Empfang in SchleifeGetTickerAsync()Die Funktion aktualisiert die Daten und druckt dann:
var t = __threadGetData(0, "ticker")
Log(t)
Vollständiges Codebeispiel:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
Echter Festplattenbetriebstest:

Dies ist das einfachste Anwendungsdesign. Als nächstes schauen wir uns einige andere Nachfragedesigns an.
Eine Funktion kann so konzipiert werden, dass 10 Threads gleichzeitig erstellt werden und jeder Thread eine Order-Operation-Funktion ausführt. existierenmain()Entwerfen einer FunktionwhileSchleife, Anweisungen zur Interaktion mit der Erkennungsstrategie. Erhalten Sie interaktive Anweisungen:placeMultipleOrdersRufen Sie einfach diese gleichzeitige Bestellfunktion auftestPlaceMultipleOrders()。
if (cmd == "placeMultipleOrders") {
// ...
}
Fügen Sie auf der Strategiebearbeitungsseite das Strategieinteraktionsdesign hinzu und legen Sie eine Schaltfläche mit dem Befehl „placeMultipleOrders“ fest.

Vollständiges Codebeispiel:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}



Diese Anforderung wurde von einem FMZ-Benutzer gestellt, der ein einfaches Beispiel zur Demonstration der Verwendung in gleichzeitigen Threads wünschte.WebSocketVerbinden und entwerfen Sie, wie Daten an den Hauptthread übergeben werdenmain()Funktion.
Dies ist eigentlich ganz einfach und ähnelt der Erstellung gleichzeitiger Threads im vorherigen Beispiel. Es wird nur die Kommunikation zwischen Threads genutzt__threadPeekMessage()Funktionen und__threadPostMessage()Funktion. Am Beispiel des WebSocket-API-Schnittstellenaufrufs von Binance Exchange müssen wir beim Entwurf auch auf den Schließvorgang der WebSocket-Verbindung achten. Das folgende Beispiel zeigt auch, wie ein parallel laufender Thread benachrichtigt wird, um ihn zu stoppen.
Vollständiges Codebeispiel:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
Echter Festplattenbetriebstest:

Sie können sehenmain()Die Funktion empfängt kontinuierlich Marktdaten von der vom gleichzeitigen Thread erstellten WebSocket-Verbindung.
Wenn die Strategie gestoppt wird, beginnt die Sweep-Funktion zu arbeiten:
