Dial

用于原始的Socket访问,支持tcpudptlsunix协议。支持4种流行的通信协议:mqttnatsamqpkafka。支持连接数据库,支持的数据库包括:sqlite3mysqlpostgresclickhouse

如果超时,Dial()函数返回空值。正常调用时返回一个连接对象,该对象包含三个方法:readwritecloseread方法用于读取数据,write方法用于发送数据,close方法用于关闭连接。

- 不传参数时,阻塞直到接收到消息后返回。例如:```ws.read()```。
- 传入参数时,单位为毫秒,指定消息等待超时时间。例如:```ws.read(2000)```指定超时时间为2秒(2000毫秒)。
- 以下两个参数仅对WebSocket有效:
  传入参数```-1```时,无论是否有消息,函数都立即返回。例如:```ws.read(-1)```。
  传入参数```-2```时,无论是否有消息,函数都立即返回,但只返回最新的消息,缓冲区中的其他消息将被丢弃。例如:```ws.read(-2)```。
```read()```函数缓冲区说明:

WebSocket协议推送的数据,如果策略中```read()```函数调用的时间间隔过长,可能会造成数据累积。这些数据存储在缓冲区中,缓冲区的数据结构为队列,上限为2000条。超过2000条后,最新的数据进入缓冲区,最旧的数据将被清除。

|场景|无参数|参数:-1|参数:-2|参数:2000,单位是毫秒|
| - | - | - | - | - |
| 缓冲区已有数据 | 立即返回最旧数据 | 立即返回最旧数据 | 立即返回最新数据 | 立即返回最旧数据 |
| 缓冲区无数据 | 阻塞直到有数据时返回 | 立即返回空值 | 立即返回空值 | 等待2000毫秒,无数据返回空值,有数据则返回 |
| WebSocket连接断开或底层重连时 | read()函数返回空字符串,即:"",write()函数返回0。检测到该情况后,可以使用close()函数关闭连接。如果设置了自动重连,则无需关闭,系统底层会自动重连。|
-- | -- | -- |
object

Dial(address)
Dial(address, timeout)
Dial(address, options)

请求地址。
address
true
string
超时时间(秒)。
timeout
false
number
配置选项。
options
false
object

```javascript
function main(){
    // Dial支持tcp://、udp://、tls://、unix://协议,可添加一个参数指定超时秒数
    var client = Dial("tls://www.baidu.com:443")
    if (client) {
        // write可再添加一个数字参数指定超时时间,write返回成功发送的字节数
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while (true) {
            // read可再添加一个数字参数指定超时时间,单位:毫秒。返回null表示出错、超时或socket已关闭
            var buf = client.read()
            if (!buf) {
                break
            }
            Log(buf)
        }
        client.close()
    }
}```
```python
def main():
    client = Dial("tls://www.baidu.com:443")
    if client:
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n")
        while True:
            buf = client.read()
            if not buf:
                break
            Log(buf)
        client.close()```
```cpp
void main() {
    auto client = Dial("tls://www.baidu.com:443");
    if(client.Valid) {
        client.write("GET / HTTP/1.1\nConnection: Closed\n\n");
        while(true) {
            auto buf = client.read();
            if(buf == "") {
                break;
            }
            Log(buf);
        }
        client.close();
    }
}```
Dial函数调用示例:
```javascript
function main() {
    LogStatus("正在连接...")
    // 访问币安的 WebSocket 接口
    var client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if (!client) {
        Log("Connection failed, exiting")
        return
    }

    while (true) {
        // read 只返回调用 read 之后获取的数据
        var buf = client.read()
        if (!buf) {
            break
        }
        var table = {
            type: 'table',
            title: '行情图表',
            cols: ['币种', '最高', '最低', '买一', '卖一', '最后成交价', '成交量', '更新时间'],
            rows: []
        }
        var obj = JSON.parse(buf)
        _.each(obj, function(ticker) {
            table.rows.push([ticker.s, ticker.h, ticker.l, ticker.b, ticker.a, ticker.c, ticker.q, _D(ticker.E)])
        })
        LogStatus('`' + JSON.stringify(table) + '`')
    }
    client.close()
}```
```python
import json
def main():
    LogStatus("正在连接...")
    client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr")
    if not client:
        Log("Connection failed, exiting")
        return

    while True:
        buf = client.read()
        if not buf:
            break
        table = {
            "type" : "table",
            "title" : "行情图表",
            "cols" : ["币种", "最高", "最低", "买一", "卖一", "最后成交价", "成交量", "更新时间"],
            "rows" : []
        }
        obj = json.loads(buf)
        for i in range(len(obj)):
            table["rows"].append([obj[i]["s"], obj[i]["h"], obj[i]["l"], obj[i]["b"], obj[i]["a"], obj[i]["c"], obj[i]["q"], _D(int(obj[i]["E"]))])
        LogStatus('`' + json.dumps(table) + '`')
    client.close()```
```cpp
void main() {
    LogStatus("正在连接...");
    auto client = Dial("wss://stream.binance.com:9443/ws/!ticker@arr");
    if(!client.Valid) {
        Log("Connection failed, exiting");
        return;
    }

    while(true) {
        auto buf = client.read();
        if(buf == "") {
            break;
        }
        json table = R"({
            "type" : "table",
            "title" : "行情图表",
            "cols" : ["币种", "最高", "最低", "买一", "卖一", "最后成交价", "成交量", "更新时间"],
            "rows" : []
        })"_json;
        json obj = json::parse(buf);
        for(auto& ele : obj.items()) {
            table["rows"].push_back({ele.value()["s"], ele.value()["h"], ele.value()["l"], ele.value()["b"], ele.value()["a"], ele.value()["c"],
                ele.value()["q"], _D(ele.value()["E"])});
        }
        LogStatus("`" + table.dump() + "`");
    }
    client.close();
}```
访问币安(Binance)的 WebSocket 行情接口:
```javascript
function main() {
    let options = {"headers": {"X-MBX-APIKEY": "your access key"}}
    let random = `fmz${UnixNano()}`
    let ts = new Date().getTime()
    let secretKey = "your secret key"
    let topic = "com_announcement_en"
    let payload = `random=${random}&topic=${topic}&recvWindow=30000&timestamp=${ts}`
    let signature = Encode("sha256", "string", "hex", payload, "string", secretKey)
    let query = `?${payload}&signature=${signature}`

    Log("query:", query)
    let conn = Dial(`wss://api.binance.com/sapi/wss${query}`, options)

    for (var i = 0 ; i < 10 ; i++) {
        let ret = conn.read()
        Log(ret)
    }
}```
```python
import time

def main():
    options = {"headers": {"X-MBX-APIKEY": "your access key"}}
    random = "fmz" + str(UnixNano())
    ts = int(time.time() * 1000)
    secretKey = "your secret key"
    topic = "com_announcement_en"
    payload = f"random={random}&topic={topic}&recvWindow=30000&timestamp={ts}"
    signature = Encode("sha256", "string", "hex", payload, "string", secretKey)
    query = f"?{payload}&signature={signature}"

    Log("query:", query)
    conn = Dial(f"wss://api.binance.com/sapi/wss{query}", options)

    for i in range(10):
        ret = conn.read()
        Log(ret)```
```cpp
// 暂不支持```
访问币安的WebSocket接口,设置WSS请求头。
```javascript
var ws = null
function main(){
    var param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    // 在调用Dial函数时,指定reconnect=true即设置为重连模式,指定payload即为重连时发送的消息。在WebSocket连接断开后,会自动重连,自动发送消息
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + JSON.stringify(param))
    if(ws){
        var pingCyc = 1000 * 20
        var lastPingTime = new Date().getTime()
        while(true){
            var nowTime = new Date().getTime()
            var ret = ws.read()
            Log("ret:", ret)
            if(nowTime - lastPingTime > pingCyc){
                var retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Sending: ping", "#FF0000")
            }
            LogStatus("当前时间:", _D())
            Sleep(1000)
        }
    }
}

function onexit() {
    ws.close()
    Log("Exiting")
}```
```python
import json
import time

ws = None
def main():
    global ws
    param = {
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    }
    ws = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        pingCyc = 1000 * 20
        lastPingTime = time.time() * 1000
        while True:
            nowTime = time.time() * 1000
            ret = ws.read()
            Log("ret:", ret)
            if nowTime - lastPingTime > pingCyc:
                retPing = ws.write("ping")
                lastPingTime = nowTime
                Log("Sending: ping", "#FF0000")
            LogStatus("当前时间:", _D())
            Sleep(1000)

def onexit():
    ws.close()
    Log("Exiting")```
```cpp
auto objWS = Dial("wss://ws.okx.com:8443/ws/v5/public|compress=gzip_raw&mode=recv&reconnect=true");

void main() {
    json param = R"({
        "op": "subscribe",
        "args": [{
            "channel": "tickers",
            "instId": "BTC-USDT"
        }]
    })"_json;

    objWS.write(param.dump());
    if(objWS.Valid) {
        uint64_t pingCyc = 1000 * 20;
        uint64_t lastPingTime = Unix() * 1000;
        while(true) {
            uint64_t nowTime = Unix() * 1000;
            auto ret = objWS.read();
            Log("ret:", ret);
            if(nowTime - lastPingTime > pingCyc) {
                auto retPing = objWS.write("ping");
                lastPingTime = nowTime;
                Log("Sending: ping", "#FF0000");
            }
            LogStatus("当前时间:", _D());
            Sleep(1000);
        }
    }
}

void onexit() {
    objWS.close();
    Log("Exiting");
}```
访问OKX的WebSocket行情接口:
```javascript
var ws = null

function main(){
    var param = {"sub": "market.btcusdt.detail", "id": "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + JSON.stringify(param))
    if(ws){
        while(1){
            var ret = ws.read()
            Log("ret:", ret)
            // 响应心跳包操作
            try {
                var jsonRet = JSON.parse(ret)
                if(typeof(jsonRet.ping) == "number") {
                    var strPong = JSON.stringify({"pong" : jsonRet.ping})
                    ws.write(strPong)
                    Log("Responding to ping, sending pong:", strPong, "#FF0000")
                }
            } catch(e) {
                Log("e.name:", e.name, "e.stack:", e.stack, "e.message:", e.message)
            }

            LogStatus("当前时间:", _D())
            Sleep(1000)
        }
    }
}

function onexit() {
    ws.close()
    Log("Executing ws.close()")
}```
```python
import json
ws = None

def main():
    global ws
    param = {"sub" : "market.btcusdt.detail", "id" : "id1"}
    ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + json.dumps(param))
    if ws:
        while True:
            ret = ws.read()
            Log("ret:", ret)
            # 响应心跳包操作
            try:
                jsonRet = json.loads(ret)
                if "ping" in jsonRet and type(jsonRet["ping"]) == int:
                    strPong = json.dumps({"pong" : jsonRet["ping"]})
                    ws.write(strPong)
                    Log("Responding to ping, sending pong:", strPong, "#FF0000")
            except Exception as e:
                Log("e:", e)

            LogStatus("当前时间:", _D())
            Sleep(1000)

def onexit():
    ws.close()
    Log("Executing ws.close()")```
```cpp
using namespace std;

void main() {
    json param = R"({"sub" : "market.btcusdt.detail", "id" : "id1"})"_json;
    auto ws = Dial("wss://api.huobi.pro/ws|compress=gzip&mode=recv&reconnect=true&payload=" + param.dump());
    if(ws.Valid) {
        while(true) {
            auto ret = ws.read();
            Log("ret:", ret);
            // 响应心跳包操作
            try
            {
                auto jsonRet = json::parse(ret);
                if(jsonRet["ping"].is_number()) {
                    json pong = R"({"pong" : 0})"_json;
                    pong["pong"] = jsonRet["ping"];
                    auto strPong = pong.dump();
                    ws.write(strPong);
                    Log("Responding to ping, sending pong:", strPong, "#FF0000");
                }
            } catch(exception &e)
            {
                Log("e:", e.what());
            }

            LogStatus("当前时间:", _D());
            Sleep(1000);
        }
    }
}

void onexit() {
    // ws.close();
    Log("Executing ws.close()");
}```
访问火币的WebSocket行情接口:
```javascript
function getLogin(pAccessKey, pSecretKey, pPassphrase) {
    // 签名函数,用于登录
    var ts = (new Date().getTime() / 1000).toString()
    var login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)   // exchange.HMAC已废弃,暂时支持。请使用最新的exchange.Encode函数替换
        }]
    }
    return login
}

var client_private = null
function main() {
    // 因为read函数使用了超时设置,需要过滤超时错误,否则会产生冗余错误输出
    SetErrorFilter("timeout")

    // 持仓频道订阅信息
    var posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }

    var accessKey = "xxx"
    var secretKey = "xxx"
    var passphrase = "xxx"

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)  // 登录后不能立即订阅私有频道,需要等待服务器响应
    client_private.write(JSON.stringify(posSubscribe))
    if (client_private) {
        var lastPingTS = new Date().getTime()
        while (true) {
            var buf = client_private.read(-1)
            if (buf) {
                Log(buf)
            }

            // 检测断线并重连
            if (buf == "" && client_private.write(JSON.stringify(posSubscribe)) == 0) {
                Log("Detected disconnection, closing connection, reconnecting")
                client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(JSON.stringify(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(JSON.stringify(posSubscribe))
            }

            // 发送心跳包
            var nowPingTS = new Date().getTime()
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping")
                lastPingTS = nowPingTS
            }
        }
    }
}

function onexit() {
    var ret = client_private.close()
    Log("Connection closed!", ret)
}```
```python
import json
import time

def getLogin(pAccessKey, pSecretKey, pPassphrase):
    ts = str(time.time())
    login = {
        "op": "login",
        "args":[{
            "apiKey"    : pAccessKey,
            "passphrase" : pPassphrase,
            "timestamp" : ts,
            "sign" : exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey)
        }]
    }
    return login

client_private = None
def main():
    global client_private
    SetErrorFilter("timeout")

    posSubscribe = {
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    }

    accessKey = "xxx"
    secretKey = "xxx"
    passphrase = "xxx"

    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
    client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
    Sleep(3000)
    client_private.write(json.dumps(posSubscribe))
    if client_private:
        lastPingTS = time.time() * 1000
        while True:
            buf = client_private.read(-1)
            if buf:
                Log(buf)

            if buf == "" and client_private.write(json.dumps(posSubscribe)) == 0:
                Log("Detected disconnection, closing connection, reconnecting")
                ret = client_private.close()
                client_private = Dial("wss://ws.okx.com:8443/ws/v5/private")
                client_private.write(json.dumps(getLogin(accessKey, secretKey, passphrase)))
                Sleep(3000)
                client_private.write(json.dumps(posSubscribe))

            nowPingTS = time.time() * 1000
            if nowPingTS - lastPingTS > 10 * 1000:
                client_private.write("ping")
                lastPingTS = nowPingTS

def onexit():
    ret = client_private.close()
    Log("Connection closed!", ret)```
```cpp
auto client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");

json getLogin(string pAccessKey, string pSecretKey, string
pPassphrase) {
    auto ts = std::to_string(Unix());
    json login = R"({
        "op": "login",
        "args": [{
            "apiKey": "",
            "passphrase": "",
            "timestamp": "",
            "sign": ""
        }]
    })"_json;
    login["args"][0]["apiKey"] = pAccessKey;
    login["args"][0]["passphrase"] = pPassphrase;
    login["args"][0]["timestamp"] = ts;
    login["args"][0]["sign"] = exchange.HMAC("sha256", "base64", ts + "GET" + "/users/self/verify", pSecretKey);
    return login;
}

void main() {
    SetErrorFilter("timeout");
    json posSubscribe = R"({
        "op": "subscribe",
        "args": [{
            "channel": "positions",
            "instType": "ANY"
        }]
    })"_json;

    auto accessKey = "xxx";
    auto secretKey = "xxx";
    auto passphrase = "xxx";

    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
    Sleep(3000);
    client_private.write(posSubscribe.dump());

    if (client_private.Valid) {
        uint64_t lastPingTS = Unix() * 1000;

        while (true) {
            auto buf = client_private.read(-1);
            if (buf != "") {
                Log(buf);
            }
            if (buf == "") {
                if (client_private.write(posSubscribe.dump()) == 0) {
                    Log("Detected disconnection, closing connection, reconnecting");
                    client_private.close();
                    client_private = Dial("wss://ws.okx.com:8443/ws/v5/private");
                    client_private.write(getLogin(accessKey, secretKey, passphrase).dump());
                    Sleep(3000);
                    client_private.write(posSubscribe.dump());
                }
            }

            uint64_t nowPingTS = Unix() * 1000;
            if (nowPingTS - lastPingTS > 10 * 1000) {
                client_private.write("ping");
                lastPingTS = nowPingTS;
            }
        }
    }
}

void onexit() {
    client_private.close();
    Log("Exiting");
}```
访问OKX的WebSocket认证接口:
```javascript
var conn = null
function main() {
    var accessKey = "your accessKey"

    var ts = new Date().getTime()
    var signature = exchange.Encode("sha256", "string", "hex", String(ts), "string", "{{secretkey}}")
    Log("signature:", signature)

    var payload = {
        "id": 1,
        "method": "server.sign",
        "params": {
            "access_id": accessKey,
            "signed_str": signature,
            "timestamp": ts,
        }
    }
    Log(`JSON.stringify(payload):`, JSON.stringify(payload))

    conn = Dial("wss://socket.coinex.com/v2/futures|compress=gzip&mode=recv&payload=" + JSON.stringify(payload))
    if (!conn) {
        throw "stop"
    }
    Log("Dial ... ", conn.read())

    // 订阅持仓推送
    conn.write(JSON.stringify({
        "method": "position.subscribe",
        "params": {"market_list": ["BTCUSDT"]},
        "id": 1
    }))

    while (true) {
        var msg = conn.read()
        if (msg) {
            Log("msg:", msg)
        }
    }
}

function onexit() {
    conn.close()
}```
```python
// 略```
```cpp
// 略```
访问CoinEx的WebSocket认证接口:
```javascript
let strPushDataV3ApiWrapper = `syntax = "proto3";



option java_package = "com.mxc.push.common.protobuf";
option optimize_for = SPEED;
option java_multiple_files = true;
option java_outer_classname = "PushDataV3ApiWrapperProto";

message PublicAggreDealsV3Api {

  repeated PublicAggreDealsV3ApiItem deals  = 1;
  string eventType = 2;
}

message PublicAggreDealsV3ApiItem {
  string price = 1;
  string quantity = 2;
  int32 tradeType = 3;
  int64 time = 4;
}

message PushDataV3ApiWrapper {
  string channel = 1;
  oneof body {
    PublicAggreDealsV3Api publicAggreDeals = 314;
  }

  optional string symbol = 3;
  optional string symbolId = 4;
  optional int64 createTime = 5;
  optional int64 sendTime = 6;
}`

let code = HttpQuery("https://cdnjs.cloudflare.com/ajax/libs/protobufjs/7.5.3/protobuf.js")
let exports = {}
let module = { exports }
new Function("module", "exports", code)(module, exports)
let protobuf = module.exports

function main() {
    const PushDataV3ApiWrapper = protobuf.parse(strPushDataV3ApiWrapper).root.lookupType("PushDataV3ApiWrapper")

    var payload = {
        "method": "SUBSCRIPTION",
        "params": [
            "[email protected]@100ms@BTCUSDT"
        ]
    }

    // proxy=socks5://x.x.x.x:xxxx
    var conn = Dial("wss://wbs-api.mexc.com/ws|payload=" + JSON.stringify(payload))

    var data = null
    while (true) {
        var ret = conn.read()
        if (ret) {
            const uint8arrayData = new Uint8Array(ret)
            const message = PushDataV3ApiWrapper.decode(uint8arrayData)

            data = PushDataV3ApiWrapper.toObject(message, {
              longs: String,
              enums: String,
              bytes: String,
              defaults: true,
              arrays: true,
              objects: true
            })
            Log("data:", data)
        }
        LogStatus(_D(), data)
    }
}```
```python
# 可以使用Python中相应的库实现编码和解码。```
```cpp
// 略```
访问MEXC交易所的`Websocket`接口,订阅`public.aggre.deals.v3.api.pb`频道,使用`protobuf.js`解码二进制数据的示例:
```javascript
var client = null
function main() {
    // client = Dial("sqlite3://:memory:")   // 使用内存数据库
    client = Dial("sqlite3://test1.db")      // 打开/连接托管者所在目录的数据库文件

    // 记录句柄
    var sqlite3Handle = client.fd()
    Log("sqlite3Handle:", sqlite3Handle)

    // 查询数据库中的表
    var ret = client.exec("SELECT name FROM sqlite_master WHERE type='table'")
    Log(ret)
}

function onexit() {
    Log("Executing client.close()")
    client.close()
}```
```python
// 不支持```
```cpp
// 不支持```
Dial函数连接数据库时返回的连接对象具有2个独有的方法:

    - ```exec(sqlString)```: 用于执行SQL语句,使用方式类似于```DBExec()```函数。

    - ```fd()```: ```fd()```函数返回一个句柄(例如:句柄变量为handle),用于其他线程重连(即使Dial创建的对象已经执行```close()```函数关闭连接),将句柄传入```Dial()```函数,例如:```Dial(handle)```重用连接。

    以下是Dial函数连接```sqlite3```数据库的示例。

```address```参数的详细说明:在标准地址```wss://ws.okx.com:8443/ws/v5/public```后,使用```|```符号作为分隔符。如果参数字符串中包含```|```字符,则使用```||```作为分隔符。分隔符后的部分用于设置功能参数,各参数之间使用```&```字符连接。

例如,同时设置```socks5```代理和压缩参数时可以写作:
```Dial("wss://ws.okx.com:8443/ws/v5/public|proxy=socks5://xxx:9999&compress=gzip_raw&mode=recv")```
| Dial函数的address参数支持的功能 | 参数说明 |
| - | - |
| WebSocket协议数据压缩相关的参数:compress=参数值 | compress为压缩方式,compress参数可选gzip_raw、gzip等。如果gzip方式非标准gzip,可以使用扩展方式:gzip_raw |
| WebSocket协议数据压缩相关的参数:mode=参数值 | mode为压缩模式,mode参数可选dual、send、recv三种。dual为双向压缩,发送和接收压缩数据。send为发送压缩数据。recv为接收压缩数据,本地进行解压缩。 |
| WebSocket协议启用compression设置:enableCompression=true | 使用enableCompression=false关闭该设置,默认不启用。 |
| WebSocket协议设置底层自动重连相关的参数:reconnect=参数值 | reconnect用于设置是否重连,reconnect=true为启用重连。不设置该参数时默认不重连。 |
| WebSocket协议设置底层自动重连相关的参数:interval=参数值 | interval为重试时间间隔,单位为毫秒,interval=10000表示重试间隔为10秒,不设置时默认为1秒,即interval=1000。 |
| WebSocket协议设置底层自动重连相关的参数:payload=参数值 | payload为WebSocket重连时需要发送的订阅消息,例如:payload=okok。 |
| socks5代理的相关参数:proxy=参数值 | proxy为socks5代理设置,参数值格式:socks5://name:[email protected]:1080,其中name为socks5服务端用户名,pwd为socks5服务端登录密码,1080为socks5服务端口。 |
```Dial()```函数仅支持实盘环境。
使用Dial函数连接数据库时,连接字符串的编写请参考各数据库对应的Go语言驱动项目。

| 支持的数据库 | 驱动项目 | 连接字符串(Connection String) | 备注 |
| - | - | - | - |
| sqlite3 | github.com/mattn/go-sqlite3 | sqlite3://file:test.db?cache=shared&mode=memory | ```sqlite3://```前缀表示使用sqlite3数据库,调用示例:```Dial("sqlite3://test1.db")``` |
| mysql | github.com/go-sql-driver/mysql | mysql://username:yourpassword@tcp(localhost:3306)/yourdatabase?charset=utf8mb4 | -- |
| postgres | github.com/lib/pq | postgres://user=postgres dbname=yourdatabase sslmode=disable password=yourpassword host=localhost port=5432 | -- |
| clickhouse | github.com/ClickHouse/clickhouse-go | clickhouse://tcp://host:9000?username=username&password=yourpassword&database=youdatabase | -- |
需要注意,当```address```参数中设置的```payload```内容包含字符```=```或其他特殊字符时,可能会影响```Dial```函数的```address```参数解析。请参考以下示例。

backPack交易所WebSocket私有接口调用示例:
```js
var client = null

function main() {
    // base64编码的秘钥对公钥,即在FMZ上配置的access key
    var base64ApiKey = "xxx"

    var ts = String(new Date().getTime())
    var data = "instruction=subscribe&timestamp=" + ts + "&window=5000"

    // 由于signEd25519最终返回的是base64编码,其中会有字符"="
    var signature = signEd25519(data)

    // payload 被JSON编码后可能包含字符"="
    payload = {
        "method": "SUBSCRIBE",
        "params": ["account.orderUpdate"],
        "signature": [base64ApiKey, signature, ts, "5000"]
    }

    client = Dial("wss://ws.backpack.exchange")
    client.write(JSON.stringify(payload))
    if (!client) {
        Log("Connection failed, exiting")
        return
    }

    while (true) {
        var buf = client.read()
        Log(buf)
    }
}

function onexit() {
    client.close()
}

function signEd25519(data) {
    return exchange.Encode("ed25519.seed", "raw", "base64", data, "base64", "{{secretkey}}")
}

代码中采用以下调用方式可以正常工作:

client = Dial("wss://ws.backpack.exchange")
client.write(JSON.stringify(payload))

如果直接将payload写入参数中则无法正常工作,例如:

client = Dial("wss://ws.backpack.exchange|payload=" +
JSON.stringify(payload))

目前仅JavaScript语言支持Dial函数中使用mqttnatsamqpkafka通信协议。以下以JavaScript语言策略代码为例,展示mqttnatsamqpkafka四种协议的使用示例:

// 需要先配置、部署完成各个协议的代理服务器
// 为了便于演示,主题test_topic的订阅(read操作)、发布(write操作)都在当前这个策略中进行

var arrConn = []
var arrName = []

function main() {
    LogReset(1)
    conn_nats = Dial("nats://[email protected]:4222?topic=test_topic")
    conn_mqtt = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
    conn_amqp = Dial("amqp://q:[email protected]:5672/?queue=test_Queue")
    conn_kafka = Dial("kafka://localhost:9092/test_topic")
    arrConn = [conn_nats, conn_amqp, conn_mqtt, conn_kafka]
    arrName = ["nats", "amqp", "mqtt", "kafka"]

    while (true) {
        for (var i in arrConn) {
            var conn = arrConn[i]
            var name = arrName[i]

            // 写数据
            conn.write(name + ", time: " + _D() + ", test msg.")

            // 读数据
            var readMsg = conn.read(1000)
            Log(name + " readMsg: ", readMsg, "#FF0000")
        }

        Sleep(1000)
    }
}

function onexit() {
    for (var i in arrConn) {
        arrConn[i].close()
        Log("Closing", arrName[i], "connection")
    }
}

详细文档请参考:探索FMZ:交易策略实盘间通信协议实践