exchange.Go

多线程异步支持函数,可以把所有支持的函数的操作变成异步并发执行。


object

exchange.Go(method)
exchange.Go(method, ...args)

```method```参数用于指定并发的函数名称,注意该参数是函数名称字符串,并非函数引用。

method
true
string
**并发执行函数**的参数,参数```arg```可能有多个。参数```arg```的类型与个数根据**并发执行函数**的参数而定。

arg
false
string、number、bool、object、array、function、空值等系统支持的所有类型

```javascript
function main(){
    // 以下四种操作是并发多线程异步执行, 不会耗时, 立即返回的
    var a = exchange.Go("GetTicker")
    var b = exchange.Go("GetDepth") 
    var c = exchange.Go("Buy", 1000, 0.1)
    var d = exchange.Go("GetRecords", PERIOD_H1)
           
    // 调用wait方法等待返回异步获取ticker结果 
    var ticker = a.wait()            
    // 返回深度, 如果获取失败也是有可能返回null的 
    var depth = b.wait()             
    // 返回订单号, 限定1秒超时, 超时返回undefined, 此对象可以继续调用wait等待如果上次wait超时 
    var orderId = c.wait(1000)
    if(typeof(orderId) == "undefined") {
        // 超时,重新获取
        orderId = c.wait()
    }
    var records = d.wait()
}
def main():
    a = exchange.Go("GetTicker")
    b = exchange.Go("GetDepth")
    c = exchange.Go("Buy", 1000, 0.1)
    d = exchange.Go("GetRecords", PERIOD_H1)            

    ticker, ok = a.wait()
    depth, ok = b.wait()
    orderId, ok = c.wait(1000)
    if ok == False:
        orderId, ok = c.wait()
    records, ok = d.wait()
void main() {
    auto a = exchange.Go("GetTicker");
    auto b = exchange.Go("GetDepth");
    auto c = exchange.Go("Buy", 1000, 0.1);
    auto d = exchange.Go("GetRecords", PERIOD_H1);            

    Ticker ticker;
    Depth depth;
    Records records;
    TId orderId;
    a.wait(ticker);
    b.wait(depth);
    if(!c.wait(orderId, 300)) {
        c.wait(orderId);
    }
    d.wait(records);
}

```javascript
function main() {
    var d = exchange.Go("GetRecords", PERIOD_H1)
    // 等待K线结果
    var records = d.wait()
    // 这里wait了一个已经wait过且结束的异步操作,会返回null,并记录出错信息
    var ret = d.wait()
}
def main():
    d = exchange.Go("GetRecords", PERIOD_H1)
    records, ok = d.wait()
    ret, ok = d.wait()
void main() {
    auto d = exchange.Go("GetRecords", PERIOD_H1);
    Records records;
    d.wait(records);
    Records ret;
    d.wait(ret);
}

对已经释放的并发对象,调用其wait()方法会报错:

function main() {
    while(true) {
        var beginTS = new Date().getTime()
        var arrRoutine = []
        var arrTicker = []
        var arrName = []
        for(var i = 0; i < exchanges.length; i++) {
            arrRoutine.push(exchanges[i].Go("GetTicker"))
            arrName.push(exchanges[i].GetName())
        }            

        for(var i = 0; i < arrRoutine.length; i++) {
            arrTicker.push(arrRoutine[i].wait())
        }
        var endTS = new Date().getTime()            

        var tbl = {
            type: "table", 
            title: "行情", 
            cols: ["索引", "名称", "最新成交价"], 
            rows: []
        }
        
        for(var i = 0; i < arrTicker.length; i++) {
            tbl.rows.push([i, arrName[i], arrTicker[i].Last])
        }            

        LogStatus(_D(), "并发获取多个交易所行情总耗时:", endTS - beginTS, "毫秒", "\n", "`" + JSON.stringify(tbl) + "`")
        Sleep(500)
    }
}
import time 
import json
def main():
    while True:
        beginTS = time.time()
        arrRoutine = []
        arrTicker = []
        arrName = []
        for i in range(len(exchanges)):
            arrRoutine.append(exchanges[i].Go("GetTicker"))
            arrName.append(exchanges[i].GetName())            

        for i in range(len(exchanges)):
            ticker, ok = arrRoutine[i].wait()
            arrTicker.append(ticker)            

        endTS = time.time()
        tbl = {
            "type": "table", 
            "title": "行情", 
            "cols": ["索引", "名称", "最新成交价"], 
            "rows": [] 
        }            

        for i in range(len(arrTicker)):
            tbl["rows"].append([i, arrName[i], arrTicker[i]["Last"]])            

        LogStatus(_D(), "并发获取多个交易所行情总耗时:", endTS - beginTS, "秒", "\n", "`" + json.dumps(tbl) + "`")
        Sleep(500)
void main() {
    while(true) {
        int length = exchanges.size();
        auto beginTS = UnixNano() / 1000000;
        Ticker arrTicker[length] = {};
        string arrName[length] = {};
        
        // 注意,添加几个交易所对象,这里要执行几次exchanges[n].Go 函数,这个例子是需要添加四个交易所对象,具体可以修改
        auto r0 = exchanges[0].Go("GetTicker");
        auto r1 = exchanges[1].Go("GetTicker");
        auto r2 = exchanges[2].Go("GetTicker");
        auto r3 = exchanges[3].Go("GetTicker");
        GoObj *arrRoutine[length] = {&r0, &r1, &r2, &r3};
        
        for(int i = 0; i < length; i++) {
            arrName[i] = exchanges[i].GetName();
        }            

        for(int i = 0; i < length; i++) {
            Ticker ticker;
            arrRoutine[i]->wait(ticker);
            arrTicker[i] = ticker;
        }        
        auto endTS = UnixNano() / 1000000;            

        json tbl = R"({
            "type": "table", 
            "title": "行情", 
            "cols": ["索引", "名称", "最新成交价"], 
            "rows": [] 
        })"_json;            

        for(int i = 0; i < length; i++) {
            json arr = R"(["", "", ""])"_json;
            arr[0] = format("%d", i);
            arr[1] = arrName[i];
            arr[2] = format("%f", arrTicker[i].Last);
            tbl["rows"].push_back(arr);
        }            

        LogStatus(_D(), "并发获取多个交易所行情总耗时:", format("%d", endTS - beginTS), "毫秒", "\n", "`" + tbl.dump() + "`");
        Sleep(500);
    }
}

并发获取多个交易所行情:

function main() {
    /*  
        使用OKX期货下单接口测试
        POST /api/v5/trade/order        
    */
    
    var beginTS = new Date().getTime()
    var param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
    var ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
    var ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
    var ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", JSON.stringify(param))
    
    var id1 = ret1.wait()
    var id2 = ret2.wait()
    var id3 = ret3.wait()
    var endTS = new Date().getTime()                

    Log("id1:", id1)
    Log("id2:", id2)
    Log("id3:", id3)
    Log("并发下单耗时:", endTS - beginTS, "毫秒")
}
import time
import json
def main():
    beginTS = time.time()
    param = {"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"}
    ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
    ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))
    ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", json.dumps(param))            

    id1, ok1 = ret1.wait()
    id2, ok2 = ret2.wait()
    id3, ok3 = ret3.wait()
    endTS = time.time()            

    Log("id1:", id1)
    Log("id2:", id2)
    Log("id3:", id3)
    Log("并发下单耗时:", endTS - beginTS, "秒")
void main() {
    auto beginTS = UnixNano() / 1000000;
    json param = R"({"instId":"BTC-USDT-SWAP","tdMode":"cross","side":"buy","ordType":"limit","px":"16000","sz":"1","posSide":"long"})"_json;
    auto ret1 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
    auto ret2 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());
    auto ret3 = exchange.Go("IO", "api", "POST", "/api/v5/trade/order", "", param.dump());            

    json id1 = R"({})"_json;
    json id2 = R"({})"_json;
    json id3 = R"({})"_json;            

    ret1.wait(id1);
    ret2.wait(id2);
    ret3.wait(id3);
    auto endTS = UnixNano() / 1000000;            

    Log("id1:", id1);
    Log("id2:", id2);
    Log("id3:", id3);
    Log("并发下单耗时:", endTS - beginTS, "毫秒");
}

exchange.IO("api", ...)函数并发调用:

function main() {
    var counter = 0
    var arr = []                 // 用于测试持续引用并发的相关变量
    var symbols = ["BTC_USDT", "ETH_USDT", "SOL_USDT", "LTC_USDT", "EOS_USDT"]
    while (true) {
        var arrRoutine = []
        for (var symbol of symbols) {
            var r = exchange.Go("GetTicker", symbol)
            arrRoutine.push(r)   // 记录并发对象用于调用 r.wait() 函数获取结果,每一轮循环都清空
            // arr.push(r)       // 如果使用这句代码,运行时持续引用并发的对象,无法自动释放,超过2000个并发时,会报错:```InternalError: too many routine wait, max is 2000```。
            counter++
        }
        
        // 遍历 arrRoutine 调用 r.wait()
        
        LogStatus(_D(), "routine number:", counter)
        Sleep(50)
    }
}

针对自动释放机制的测试

该函数只在实盘运行时创建多线程执行任务,回测不支持多线程并发执行任务(回测可用,但是还是顺序执行)。

当并发的多线程任务执行完毕时,相关变量没有被继续引用,系统底层会自动处理资源回收。

```wait()```方法支持超时参数:
1、不设置超时参数,即```wait()```,或者设置超时参数0,即```wait(0)```。```wait()```函数会阻塞等待,直到并发的线程运行完毕,返回并发线程执行的结果。
2、设置超时参数-1,即```wait(-1)```。```wait()```函数会立即返回,不同编程语言有不同的返回值,具体可以参考本小节的调用例子。
3、设置具体超时参数,即```wait(300)```,```wait()```函数会等待最长300毫秒后返回。

虽然系统底层有自动回收机制,但是如果持续引用相关变量,并发的线程并不会释放。并发的线程数量超过2000个会报错:```"too many routine wait, max is 2000"```。

支持的函数:```GetTicker```,```GetDepth```,```GetTrades```,```GetRecords```,```GetAccount```,```GetOrders```,```GetOrder```,```CancelOrder```,```Buy```,```Sell```,```GetPositions```,```IO```等。
这些函数并发调用时都基于当前{@var/EXCHANGE exchange}交易所对象执行。

Python语言与JavaScript语言中的区别,Python语言中并发对象的```wait()```函数返回两个参数,第一个是异步的api调用返回的结果,第二个表示是异步调用是否完成。
```python
def main():
    d = exchange.Go("GetRecords", PERIOD_D1)
    # ok是一定返回True的, 除非策略被停止
    ret, ok = d.wait()          
    # 如果等待超时, 或者wait了一个已经结束的实例,ok返回False
    ret, ok = d.wait(100)

{@fun/Global/Mail_Go Mail_Go}, {@fun/Global/HttpQuery_Go HttpQuery_Go}, {@fun/Global/EventLoop EventLoop}