多线程异步支持函数,可将所有支持的函数操作转换为异步并发执行。
object
exchange.Go(method)
exchange.Go(method, ...args)
```method```参数用于指定并发函数的名称,注意该参数为函数名称字符串,而非函数引用。
method
true
string
**并发执行函数**的参数,```arg```参数可能有多个。```arg```参数的类型与数量取决于**并发执行函数**的参数要求。
arg
false
string / number / bool / object / array / function / any (平台支持的任意类型)
```javascript
function main(){
// 以下四种操作是并发多线程异步执行, 不会耗时, 立即返回的
var a = exchange.Go("GetTicker")
var b = exchange.Go("GetDepth")
var c = exchange.Go("Buy", 1000, 0.1)
var d = exchange.Go("GetRecords", PERIOD_H1)
// 调用wait方法等待返回异步获取ticker结果
var ticker = a.wait()
// 返回深度, 如果获取失败也是有可能返回null的
var depth = b.wait()
// 返回订单号, 限定1秒超时, 超时返回undefined, 此对象可以继续调用wait等待如果上次wait超时
var orderId = c.wait(1000)
if(typeof(orderId) == "undefined") {
// 超时,重新获取
orderId = c.wait()
}
var records = d.wait()
}```
```python
def main():
a = exchange.Go("GetTicker")
b = exchange.Go("GetDepth")
c = exchange.Go("Buy", 1000, 0.1)
d = exchange.Go("GetRecords", PERIOD_H1)
ticker, ok = a.wait()
depth, ok = b.wait()
orderId, ok = c.wait(1000)
if ok == False:
orderId, ok = c.wait()
records, ok = d.wait()```
```cpp
void main() {
auto a = exchange.Go("GetTicker");
auto b = exchange.Go("GetDepth");
auto c = exchange.Go("Buy", 1000, 0.1);
auto d = exchange.Go("GetRecords", PERIOD_H1);
Ticker ticker;
Depth depth;
Records records;
TId orderId;
a.wait(ticker);
b.wait(depth);
if(!c.wait(orderId, 300)) {
c.wait(orderId);
}
d.wait(records);
}```
```exchange.Go()```函数使用示例,判断```undefined```需要使用```typeof(xx) === "undefined"```,因为在JavaScript中```null == undefined```的判断结果为真。
```javascript
function main() {
var d = exchange.Go("GetRecords", PERIOD_H1)
// 等待K线结果
var records = d.wait()
// 这里wait了一个已经wait过且结束的异步操作,会返回null,并记录出错信息
var ret = d.wait()
}```
```python
def main():
d = exchange.Go("GetRecords", PERIOD_H1)
records, ok = d.wait()
ret, ok = d.wait()```
```cpp
void main() {
auto d = exchange.Go("GetRecords", PERIOD_H1);
Records records;
d.wait(records);
Records ret;
d.wait(ret);
}```
对已释放的并发对象调用其```wait()```方法将触发错误:
```javascript
function main() {
while(true) {
var beginTS = new Date().getTime()
var arrRoutine = []
var arrTicker = []
var arrName = []
for(var i = 0; i < exchanges.length; i++) {
arrRoutine.push(exchanges[i].Go("GetTicker"))
arrName.push(exchanges[i].GetName())
}
for(var i = 0; i < arrRoutine.length; i++) {
arrTicker.push(arrRoutine[i].wait())
}
var endTS = new Date().getTime()
var tbl = {
type: "table",
title: "行情",
cols: ["索引", "名称", "最新成交价"],
rows: []
}
for(var i = 0; i < arrTicker.length; i++) {
tbl.rows.push([i, arrName[i], arrTicker[i].Last])
}
LogStatus(_D(), "Total time for concurrent ticker retrieval:", endTS - beginTS, "ms", "\n", "`" + JSON.stringify(tbl) + "`")
Sleep(500)
}
}```
```python
import time
import json
def main():
while True:
beginTS = time.time()
arrRoutine = []
arrTicker = []
arrName = []
for i in range(len(exchanges)):
arrRoutine.append(exchanges[i].Go("GetTicker"))
arrName.append(exchanges[i].GetName())
for i in range(len(exchanges)):
ticker, ok = arrRoutine[i].wait()
arrTicker.append(ticker)
endTS = time.time()
tbl = {
"type": "table",
"title": "行情",
"cols": ["索引", "名称", "最新成交价"],
"rows": []
}
for i in range(len(arrTicker)):
tbl["rows"].append([i, arrName[i], arrTicker[i]["Last"]])
LogStatus(_D(), "Total time for concurrent ticker retrieval:", endTS - beginTS, "seconds", "\n", "`" + json.dumps(tbl) + "`")
Sleep(500)```
```cpp
void main() {
while(true) {
int length = exchanges.size();
auto beginTS = UnixNano() / 1000000;
Ticker arrTicker[length] = {};
string arrName[length] = {};
// 注意:根据添加的交易所对象数量,需要相应执行exchanges[n].Go函数。本示例需要添加四个交易所对象,可根据实际需求修改
auto r0 = exchanges[0].Go("GetTicker");
auto r1 = exchanges[1].Go("GetTicker");
auto r2 = exchanges[2].Go("GetTicker");
auto r3 = exchanges[3].Go("GetTicker");
GoObj *arrRoutine[length] = {&r0, &r1, &r2, &r3};
for(int i = 0; i < length; i++) {
arrName[i] = exchanges[i].GetName();
}
for(int i = 0; i < length; i++) {
Ticker ticker;
arrRoutine[i]->wait(ticker);
arrTicker[i] = ticker;
}
auto endTS = UnixNano() / 1000000;
json tbl = R"({
"type": "table",
"title": "行情",
"cols": ["索引", "名称", "最新成交价"],
"rows": []
})"_json;
for(int i = 0; i < length; i++) {
json arr = R"(["", "", ""])"_json;
arr[0] = format("%d", i);
arr[1] = arrName[i];
arr[2] = format("%f", arrTicker[i].Last);
tbl["rows"].push_back(arr);
}
LogStatus(_D(), "Total time for concurrent ticker retrieval:", format("%d", endTS - beginTS), "ms", "\n", "`" + tbl.dump() + "`");
Sleep(500);
}
}```
并发获取多个交易所行情数据:
```javascript
function main() {
/*
使用OKX期货下单接口测试
POST /api/v5/trade/order
*/
var beginTS = new Date().getTime()
var param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
var ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
var id1 = ret1.wait()
var id2 = ret2.wait()
var id3 = ret3.wait()
var endTS = new Date().getTime()
Log("id1:", id1)
Log("id2:", id2)
Log("id3:", id3)
Log("Concurrent order time:", endTS - beginTS, "ms")
}```
```python
import time
import json
def main():
beginTS = time.time()
param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
id1, ok1 = ret1.wait()
id2, ok2 = ret2.wait()
id3, ok3 = ret3.wait()
endTS = time.time()
Log("id1:", id1)
Log("id2:", id2)
Log("id3:", id3)
Log("Concurrent order time:", endTS - beginTS, "seconds")```
```cpp
void main() {
auto beginTS = UnixNano() / 1000000;
json param = R"({"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"})"_json;
auto ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
auto ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
auto ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
json id1 = R"({})"_json;
json id2 = R"({})"_json;
json id3 = R"({})"_json;
ret1.wait(id1);
ret2.wait(id2);
ret3.wait(id3);
auto endTS = UnixNano() / 1000000;
Log("id1:", id1);
Log("id2:", id2);
Log("id3:", id3);
Log("Concurrent order time:", endTS - beginTS, "ms");
}```
对```exchange.IO("api", ...)```函数进行并发调用:
```javascript
function main() {
var counter = 0
var arr = [] // 用于测试持续引用并发相关变量
var symbols = ["BTC_USDT", "ETH_USDT", "SOL_USDT", "LTC_USDT", "EOS_USDT"]
while (true) {
var arrRoutine = []
for (var symbol of symbols) {
var r = exchange.Go("GetTicker", symbol)
arrRoutine.push(r) // 记录并发对象以调用 r.wait() 函数获取结果,每轮循环都会清空
// arr.push(r) // 如果使用此行代码,运行时会持续引用并发对象,无法自动释放。当并发数超过2000时,将报错:```InternalError: too many routine wait, max is 2000```。
counter++
}
// 遍历 arrRoutine 调用 r.wait()
LogStatus(_D(), "routine number:", counter)
Sleep(50)
}
}```
针对自动释放机制的测试
该函数仅在实盘运行时创建多线程执行任务,回测不支持多线程并发执行任务(回测环境下可用,但仍为顺序执行)。
```exchange.Go()```函数返回对象后,通过调用该对象的```wait()```方法获取线程返回的数据。当并发的多线程任务执行完毕且相关变量不再被引用时,系统底层将自动进行资源回收。
```wait()```方法支持超时参数:
1、不设置超时参数,即```wait()```,或设置超时参数为0,即```wait(0)```。```wait()```函数将阻塞等待,直到并发线程运行完毕,返回并发线程的执行结果。
2、设置超时参数为-1,即```wait(-1)```。```wait()```函数将立即返回,不同编程语言的返回值有所差异,具体请参考本节的调用示例。
3、设置具体的超时参数,如```wait(300)```,```wait()```函数将等待最长300毫秒后返回。
虽然系统底层具有自动回收机制,但如果持续引用相关变量,并发线程将不会被释放。当并发线程数量超过2000个时将报错:```"too many routine wait, max is 2000"```。
支持的函数:```GetTicker```、```GetDepth```、```GetTrades```、```GetRecords```、```GetAccount```、```GetOrders```、```GetOrder```、```CancelOrder```、```Buy```、```Sell```、```GetPositions```、```IO```等。这些函数并发调用时均基于当前{@var/EXCHANGE exchange}交易所对象执行。
Python语言与JavaScript语言的差异:Python语言中并发对象的```wait()```函数返回两个参数,第一个是异步API调用返回的结果,第二个表示异步调用是否完成。
```python
def main():
d = exchange.Go("GetRecords", PERIOD_D1)
# ok一定返回True,除非策略被停止
ret, ok = d.wait()
# 如果等待超时,或者wait了一个已经结束的实例,ok返回False
ret, ok = d.wait(100)
{@fun/Global/Mail_Go Mail_Go}, {@fun/Global/HttpQuery_Go HttpQuery_Go}, {@fun/Global/EventLoop EventLoop}, {@fun/Trade/exchange.IO exchange.IO}(API限流控制)