多线程异步支持函数,可以把所有支持的函数的操作变成异步并发执行。
object
exchange.Go(method)
exchange.Go(method, ...args)
```method```参数用于指定并发的函数名称,注意该参数是函数名称字符串,并非函数引用。
method
true
string
**并发执行函数**的参数,参数```arg```可能有多个。参数```arg```的类型与个数根据**并发执行函数**的参数而定。
arg
false
string、number、bool、object、array、function、空值等系统支持的所有类型
```javascript
function main(){
// 以下四种操作是并发多线程异步执行, 不会耗时, 立即返回的
var a = exchange.Go("GetTicker")
var b = exchange.Go("GetDepth")
var c = exchange.Go("Buy", 1000, 0.1)
var d = exchange.Go("GetRecords", PERIOD_H1)
// 调用wait方法等待返回异步获取ticker结果
var ticker = a.wait()
// 返回深度, 如果获取失败也是有可能返回null的
var depth = b.wait()
// 返回订单号, 限定1秒超时, 超时返回undefined, 此对象可以继续调用wait等待如果上次wait超时
var orderId = c.wait(1000)
if(typeof(orderId) == "undefined") {
// 超时,重新获取
orderId = c.wait()
}
var records = d.wait()
}
def main():
a = exchange.Go("GetTicker")
b = exchange.Go("GetDepth")
c = exchange.Go("Buy", 1000, 0.1)
d = exchange.Go("GetRecords", PERIOD_H1)
ticker, ok = a.wait()
depth, ok = b.wait()
orderId, ok = c.wait(1000)
if ok == False:
orderId, ok = c.wait()
records, ok = d.wait()
void main() {
auto a = exchange.Go("GetTicker");
auto b = exchange.Go("GetDepth");
auto c = exchange.Go("Buy", 1000, 0.1);
auto d = exchange.Go("GetRecords", PERIOD_H1);
Ticker ticker;
Depth depth;
Records records;
TId orderId;
a.wait(ticker);
b.wait(depth);
if(!c.wait(orderId, 300)) {
c.wait(orderId);
}
d.wait(records);
}
```javascript
function main() {
var d = exchange.Go("GetRecords", PERIOD_H1)
// 等待K线结果
var records = d.wait()
// 这里wait了一个已经wait过且结束的异步操作,会返回null,并记录出错信息
var ret = d.wait()
}
def main():
d = exchange.Go("GetRecords", PERIOD_H1)
records, ok = d.wait()
ret, ok = d.wait()
void main() {
auto d = exchange.Go("GetRecords", PERIOD_H1);
Records records;
d.wait(records);
Records ret;
d.wait(ret);
}
对已经释放的并发对象,调用其wait()
方法会报错:
function main() {
while(true) {
var beginTS = new Date().getTime()
var arrRoutine = []
var arrTicker = []
var arrName = []
for(var i = 0; i < exchanges.length; i++) {
arrRoutine.push(exchanges[i].Go("GetTicker"))
arrName.push(exchanges[i].GetName())
}
for(var i = 0; i < arrRoutine.length; i++) {
arrTicker.push(arrRoutine[i].wait())
}
var endTS = new Date().getTime()
var tbl = {
type: "table",
title: "行情",
cols: ["索引", "名称", "最新成交价"],
rows: []
}
for(var i = 0; i < arrTicker.length; i++) {
tbl.rows.push([i, arrName[i], arrTicker[i].Last])
}
LogStatus(_D(), "并发获取多个交易所行情总耗时:", endTS - beginTS, "毫秒", "\n", "`" + JSON.stringify(tbl) + "`")
Sleep(500)
}
}
import time
import json
def main():
while True:
beginTS = time.time()
arrRoutine = []
arrTicker = []
arrName = []
for i in range(len(exchanges)):
arrRoutine.append(exchanges[i].Go("GetTicker"))
arrName.append(exchanges[i].GetName())
for i in range(len(exchanges)):
ticker, ok = arrRoutine[i].wait()
arrTicker.append(ticker)
endTS = time.time()
tbl = {
"type": "table",
"title": "行情",
"cols": ["索引", "名称", "最新成交价"],
"rows": []
}
for i in range(len(arrTicker)):
tbl["rows"].append([i, arrName[i], arrTicker[i]["Last"]])
LogStatus(_D(), "并发获取多个交易所行情总耗时:", endTS - beginTS, "秒", "\n", "`" + json.dumps(tbl) + "`")
Sleep(500)
void main() {
while(true) {
int length = exchanges.size();
auto beginTS = UnixNano() / 1000000;
Ticker arrTicker[length] = {};
string arrName[length] = {};
// 注意,添加几个交易所对象,这里要执行几次exchanges[n].Go 函数,这个例子是需要添加四个交易所对象,具体可以修改
auto r0 = exchanges[0].Go("GetTicker");
auto r1 = exchanges[1].Go("GetTicker");
auto r2 = exchanges[2].Go("GetTicker");
auto r3 = exchanges[3].Go("GetTicker");
GoObj *arrRoutine[length] = {&r0, &r1, &r2, &r3};
for(int i = 0; i < length; i++) {
arrName[i] = exchanges[i].GetName();
}
for(int i = 0; i < length; i++) {
Ticker ticker;
arrRoutine[i]->wait(ticker);
arrTicker[i] = ticker;
}
auto endTS = UnixNano() / 1000000;
json tbl = R"({
"type": "table",
"title": "行情",
"cols": ["索引", "名称", "最新成交价"],
"rows": []
})"_json;
for(int i = 0; i < length; i++) {
json arr = R"(["", "", ""])"_json;
arr[0] = format("%d", i);
arr[1] = arrName[i];
arr[2] = format("%f", arrTicker[i].Last);
tbl["rows"].push_back(arr);
}
LogStatus(_D(), "并发获取多个交易所行情总耗时:", format("%d", endTS - beginTS), "毫秒", "\n", "`" + tbl.dump() + "`");
Sleep(500);
}
}
并发获取多个交易所行情:
function main() {
/*
使用OKX期货下单接口测试
POST /api/v5/trade/order
*/
var beginTS = new Date().getTime()
var param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
var ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var id1 = ret1.wait()
var id2 = ret2.wait()
var id3 = ret3.wait()
var endTS = new Date().getTime()
Log("id1:", id1)
Log("id2:", id2)
Log("id3:", id3)
Log("并发下单耗时:", endTS - beginTS, "毫秒")
}
import time
import json
def main():
beginTS = time.time()
param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
id1, ok1 = ret1.wait()
id2, ok2 = ret2.wait()
id3, ok3 = ret3.wait()
endTS = time.time()
Log("id1:", id1)
Log("id2:", id2)
Log("id3:", id3)
Log("并发下单耗时:", endTS - beginTS, "秒")
void main() {
auto beginTS = UnixNano() / 1000000;
json param = R"({"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"})"_json;
auto ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
auto ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
auto ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
json id1 = R"({})"_json;
json id2 = R"({})"_json;
json id3 = R"({})"_json;
ret1.wait(id1);
ret2.wait(id2);
ret3.wait(id3);
auto endTS = UnixNano() / 1000000;
Log("id1:", id1);
Log("id2:", id2);
Log("id3:", id3);
Log("并发下单耗时:", endTS - beginTS, "毫秒");
}
对exchange.IO("api", ...)
函数并发调用:
function main() {
var counter = 0
var arr = [] // 用于测试持续引用并发的相关变量
var symbols = ["BTC_USDT", "ETH_USDT", "SOL_USDT", "LTC_USDT", "EOS_USDT"]
while (true) {
var arrRoutine = []
for (var symbol of symbols) {
var r = exchange.Go("GetTicker", symbol)
arrRoutine.push(r) // 记录并发对象用于调用 r.wait() 函数获取结果,每一轮循环都清空
// arr.push(r) // 如果使用这句代码,运行时持续引用并发的对象,无法自动释放,超过2000个并发时,会报错:```InternalError: too many routine wait, max is 2000```。
counter++
}
// 遍历 arrRoutine 调用 r.wait()
LogStatus(_D(), "routine number:", counter)
Sleep(50)
}
}
针对自动释放机制的测试
该函数只在实盘运行时创建多线程执行任务,回测不支持多线程并发执行任务(回测可用,但是还是顺序执行)。
当并发的多线程任务执行完毕时,相关变量没有被继续引用,系统底层会自动处理资源回收。
```wait()```方法支持超时参数:
1、不设置超时参数,即```wait()```,或者设置超时参数0,即```wait(0)```。```wait()```函数会阻塞等待,直到并发的线程运行完毕,返回并发线程执行的结果。
2、设置超时参数-1,即```wait(-1)```。```wait()```函数会立即返回,不同编程语言有不同的返回值,具体可以参考本小节的调用例子。
3、设置具体超时参数,即```wait(300)```,```wait()```函数会等待最长300毫秒后返回。
虽然系统底层有自动回收机制,但是如果持续引用相关变量,并发的线程并不会释放。并发的线程数量超过2000个会报错:```"too many routine wait, max is 2000"```。
支持的函数:```GetTicker```,```GetDepth```,```GetTrades```,```GetRecords```,```GetAccount```,```GetOrders```,```GetOrder```,```CancelOrder```,```Buy```,```Sell```,```GetPositions```,```IO```等。
这些函数并发调用时都基于当前{@var/EXCHANGE exchange}交易所对象执行。
Python语言与JavaScript语言中的区别,Python语言中并发对象的```wait()```函数返回两个参数,第一个是异步的api调用返回的结果,第二个表示是异步调用是否完成。
```python
def main():
d = exchange.Go("GetRecords", PERIOD_D1)
# ok是一定返回True的, 除非策略被停止
ret, ok = d.wait()
# 如果等待超时, 或者wait了一个已经结束的实例,ok返回False
ret, ok = d.wait(100)
{@fun/Global/Mail_Go Mail_Go}, {@fun/Global/HttpQuery_Go HttpQuery_Go}, {@fun/Global/EventLoop EventLoop}