
В оригинальной стратегии FMZ, если вам нужно использовать асинхронные параллельные операции, вы можете использовать толькоexchange.Go()Функция используется для реализации параллелизма интерфейса инкапсуляции FMZ, и некоторые пользовательские операции (функции) не могут выполняться одновременно. Хотя эта конструкция значительно повышает эффективность выполнения программной политики, она все еще очень незнакома студентам, имеющим опыт параллельного проектирования на родных языках программирования.
Даже некоторые новые студенты, которые только начинают изучать количественную торговлю с использованием FMZ, не понимаютexchange.Go()Использование функций, использованиеexchange.Go()По-прежнему создается впечатление, что операторы выполняются один за другим в последовательно выполняемом коде. В этой статье давайте рассмотрим новые возможности параллельной обработки на платформе FMZ:__Thread()Использование ряда функций и стратегии асинхронного проектирования программы.
Если мы хотим запустить дочерний поток одновременно для выполнения пользовательской функции, которую мы написали, пока выполняется основной поток стратегии, мы можем использовать конструкцию, похожую на следующий код. Настройте функцию в коде стратегииGetTickerAsync(), напишите конкретную функциональность этой функции. Эта функция выполняет бесконечный цикл.whileИнтерфейс API FMZ вызывается непрерывно в цикле:GetTicker()Для получения рыночных данных.
Тогда используйте__threadSetData(0, "ticker", t)Это предложение записывает данные в основной поток. Имя данных —ticker, значение данных равноtПрямо сейчасGetTicker()Возвращаемое значение .
__threadSetData(0, "ticker", t)
После разработки пользовательской функции для одновременного выполнения потоков мы можем написатьmain()Код в функции:main()В начале функции мы используем:
__Thread(GetTickerAsync, 0) // GetTickerAsync为需要并发执行的自定义函数,0为这个传入GetTickerAsync函数的参数
Создайте параллельный поток, который начнет выполнятьсяGetTickerAsync()функция. затемmain()Функция начинает выполнятьсяwhileЦикл, прием в циклеGetTickerAsync()Функция обновляет данные, а затем печатает:
var t = __threadGetData(0, "ticker")
Log(t)
Полный пример кода:
function GetTickerAsync(index) {
while (true) {
var t = exchanges[index].GetTicker()
__threadSetData(0, "ticker", t)
Sleep(500)
}
}
function main() {
__Thread(GetTickerAsync, 0)
while(true) {
var t = __threadGetData(0, "ticker")
Log(t)
Sleep(1000)
}
}
Тест реальной работы диска:

Это простейший дизайн приложения. Далее рассмотрим некоторые другие дизайны спроса.
Функцию можно разработать для создания 10 потоков одновременно, и каждый поток выполняет функцию операции заказа. существоватьmain()Разработать функциюwhileЦикл, инструкции по взаимодействию стратегии обнаружения. Получите интерактивные инструкции:placeMultipleOrdersПросто вызовите эту функцию параллельного заказаtestPlaceMultipleOrders()。
if (cmd == "placeMultipleOrders") {
// ...
}
Добавьте дизайн взаимодействия стратегии на страницу редактирования стратегии и установите кнопку с командой: placeMultipleOrders

Полный пример кода:
function placeOrder(exIndex, type, price, amount) {
var id = null
if (type == "Buy") {
id = exchanges[exIndex].Buy(price, amount)
} else if (type == "Sell") {
id = exchanges[exIndex].Sell(price, amount)
} else {
throw "type error! type:" + type
}
}
function testPlaceMultipleOrders(index, beginPrice, endPrice, step, type, amount) {
Log("beginPrice:", beginPrice, ", endPrice:", endPrice, ", step:", step, ", type:", type, ", amount:", amount)
var tids = []
for (var p = beginPrice; p <= endPrice; p += step) {
var tid = __Thread(placeOrder, index, type, p, amount)
tids.push(tid)
Sleep(10)
}
Sleep(1000)
for (var i = 0; i < tids.length; i++) {
__threadTerminate(tids[i])
}
}
function main() {
while(true) {
LogStatus(_D())
var cmd = GetCommand()
if (cmd) {
if (cmd == "placeMultipleOrders") {
var t = _C(exchange.GetTicker)
var beginPrice = t.Last * 0.8
var endPrice = t.Last * 0.9
var step = t.Last * 0.01
testPlaceMultipleOrders(0, beginPrice, endPrice, step, "Buy", 0.01)
var orders = exchange.GetOrders()
for (var i = 0; i < orders.length; i++) {
Log(orders[i])
}
}
}
Sleep(1000)
}
}



Это требование было выдвинуто пользователем FMZ, который хотел получить простой пример, демонстрирующий, как использовать его в параллельных потоках.WebSocketПодключите и спроектируйте, как передавать данные в основной потокmain()функция.
На самом деле это очень просто и похоже на создание параллельных потоков в предыдущем примере. Используется только связь между потоками.__threadPeekMessage()Функции и__threadPostMessage()функция. Если взять вызов интерфейса WebSocket API Binance Exchange в качестве примера, то нам также нужно обратить внимание на операцию закрытия соединения WebSocket в дизайне. Следующий пример также показывает, как уведомить параллельный поток о необходимости его остановки.
Полный пример кода:
var tid = null
function createWS() {
// wss://stream.binance.com:9443/ws/<streamName> , <symbol>@ticker
var stream = "wss://stream.binance.com:9443/ws/btcusdt@ticker"
var ws = Dial(stream)
Log("创建WS连接:", stream)
while (true) {
var data = ws.read()
if (data) {
__threadPostMessage(0, data)
}
Log("接收到WS链接推送的数据,data:", data)
// __threadPeekMessage 超时参数设置-1,不阻塞
var msg = __threadPeekMessage(-1)
if (msg) {
if (msg == "stop") {
Log("并发线程Id:", __threadId(), "接收到stop指令")
break
}
}
}
Log("并发线程执行完毕,关闭ws连接")
ws.close()
}
function main() {
tid = __Thread(createWS)
Log("创建并发线程,线程Id:", tid)
while(true) {
// __threadPeekMessage 的超时参数设置为0,阻塞等待数据
var data = __threadPeekMessage(0)
Log("接收到并发线程", ", Id:", tid, ", 发送的数据,data:", data, "#FF0000")
var tbl = {
type : "table",
title : "<symbol>@ticker频道推送消息",
cols : ["事件类型", "事件时间", "交易对", "24小时价格变化", "24小时价格变化百分比", "平均价格", "最新成交价格", "24小时内成交量", "24小时内成交额"],
rows : []
}
try {
data = JSON.parse(data)
tbl.rows.push([data.e, _D(data.E), data.s, data.p, data.P, data.w, data.c, data.v, data.q])
} catch (e) {
Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
}
LogStatus(_D(), "\n`" + JSON.stringify(tbl) + "`")
}
}
function onexit() {
Log("扫尾函数,向Id为", tid, "的并发线程发送stop指令")
__threadPostMessage(tid, "stop")
Log("等待Id为", tid, "的并发线程停止")
__threadJoin(tid)
Log("扫尾函数执行完毕")
}
Тест реальной работы диска:

Вы можете увидетьmain()Функция непрерывно получает рыночные данные из соединения WebSocket, созданного параллельным потоком.
После остановки стратегии начнет работать функция подчистки:
