Loading ...

OKEX & Binance Websocket High-Frequency Multi-Symbols Trading Template

Author: 发明者量化, Date: 2022-12-27 15:37:42
Tags:

event.positions) { Log(“account update”, event) } if (event.orders) { Log(“private orders update”, event) }

// include orders, positions, balance for latest
Log("account", ctx.wsPrivate.account)

// for test only
if (false) {
    return {
        amendOrders: [{
            instId: event.instId,
            clOrdId: "xxxx*****",
            cxlOnFail: true,
            newSz: "2",
        }],
        newOrders: [{
            instId: event.instId,
            clOrdId: UUID(),
            side: "sell",
            tdMode: "cross",
            ordType: "post_only",
            px: event.depth.asks[0].price.toFixed(4),
            sz: "1",
        }, {
            instId: event.instId,
            clOrdId: UUID(),
            side: "sell",
            tdMode: "cross",
            ordType: "post_only",
            px: event.depth.asks[0].price.toFixed(4),
            sz: "1",
        }],
        cancelOrders: [{
            instId: order.instId,
            clOrdId: order.Id
        }]
    }
}

}

function main() { let instId = exchange.SetContractType(“swap”).InstrumentID

let ctx = $.NewWSS(exchange, function(ws) {
    let msg = null
    if (exchange.GetName() === 'Futures_OKCoin') {
        msg = {
            op: "subscribe",
            args: [{
                channel: "books5",
                instId: instId,
            }, {
                channel: "trades",
                instId: instId,
            }]
        }
    } else {
        let symbol = exchange.GetCurrency().replace('_', '').toLowerCase()
        msg = {
            method: "SUBSCRIBE",
            params: [symbol + "@aggTrade", symbol + "@depth20@100ms"],
            id: "1",
        }
    }
    ws.write(JSON.stringify(msg))
    Log("subscribe", msg, "channel")
    LogStatus("Ready")
}, onTick, Debug)

while (true) {
    ctx.poll()
    EventLoop(1000)
}

}


The running status is as follows, you can also subscribe to multiple symbols in the subscribe function


 ![img](/upload/asset/107646c11b93b32c493.png) 
 
 
The following is the a common market maker robot for OKX and Binance based on this template


 ![img](/upload/asset/73393d2122c1f41a2a.png) 
 
 
 ![img](/upload/asset/17efd8896a74dc9e271.gif) 


/* jshint esversion: 6 */

function NewOKXWebSocketPrivate(ctx, verbose) {
    let self = {
        ws: null,
        verbose: verbose,
        isReadyDic: {},
        lastPing: 0,
        ctx: ctx,
        account: {
            orders: {},
            cancelPending: {},
            ordersCount: {
                pending: 0,
                buy: 0,
                amend: 0,
                canceled: 0,
                filled: 0,
                push: 0,
                sell: 0
            },
            positions: {},
            balance: {},
            init_balance: {},
            balanceUpdate: 0,
            positionsUpdate: 0,
            ordersUpdate: 0
        }
    }
    let acc = _C(ctx.e.GetAccount)
    let pair = ctx.e.GetCurrency().split('_')
    self.account.balance[pair[1]] = {
        free: acc.Balance,
        borrowed: 0,
        locked: 0
    }
    self.account.balance[pair[0]] = {
        free: acc.Stock,
        borrowed: 0,
        locked: 0
    }
    self.account.init_balance[pair[1]] = {
        free: acc.Balance,
        borrowed: 0,
        locked: 0
    }
    self.account.init_balance[pair[0]] = {
        free: acc.Stock,
        borrowed: 0,
        locked: 0
    }

    self.reset = function() {
        if (self.ws) {
            self.ws.close()
            self.ws = null
        }
        self.isReadyDic = {}
    }
    self.send = function(msg) {
        if (!self.ws) {
            return
        }
        self.ws.write(JSON.stringify(msg))
    }
    self.pushOrder = function(instId, px, sz, amount, ordType) {
        self.ws.write(JSON.stringify({
            "instId": instId,
            "tdMode": "cross",
            "side": side,
            "ordType": ordType,
            "px": px.toFixed(2),
            "sz": px.toFixed(0)
        }))
    }
    self.getOrdersMap = function(instId) {
        let mp = self.account.orders[instId]
        if (typeof(mp) === 'undefined') {
            mp = {}
            self.account.orders[instId] = mp
        }
        return mp
    }

    self.batchOrders = function(orders) {
        for (let i = 0; i < orders.length; i += 20) {
            self.send({
                "id": "batchOrders",
                "op": "batch-orders",
                "args": orders.slice(i, i + 20)
            })
            if (self.verbose) {
                Log("batchOrders", orders.slice(i, i + 20))
            }
        }
        let ts = new Date().getTime()
        orders.forEach(function(item) {
            let mp = self.getOrdersMap(item.instId)
            mp[item.clOrdId] = {
                Id: item.clOrdId,
                Created: ts,
                instId: item.instId,
                Price: Number(item.px),
                Amount: Number(item.sz),
                Type: item.side == "sell" ? ORDER_TYPE_SELL : ORDER_TYPE_BUY,
                DealAmount: 0,
                Status: ORDER_STATE_PENDING
            }
        })
    }

    self.cancelOrders = function(orders) {
        let ts = new Date().getTime()

        orders.forEach(function(item) {
            if (typeof(self.account.cancelPending[item.clOrdId]) === 'undefined') {
                self.account.cancelPending[item.clOrdId] = {
                    retry: 0,
                    ts: ts,
                    data: item
                }
            }
        })

        // remove from orders
        orders.forEach(function(item) {
            delete self.getOrdersMap(item.instId)[item.clOrdId]
        })

        for (let id in self.account.cancelPending) {
            let item = self.account.cancelPending[id]
            if (ts - item.ts > 10000) {
                // recancel
                orders.push(item.data)
                Log("remove timeout order", item, "#ff0000")
                item.retry += 1
                item.ts = ts
            }
            if (item.retry > 10) {
                Log("force remove order", item, "#ff0000")
                delete self.account.cancelPending[id]
            }
        }

        for (let i = 0; i < orders.length; i += 20) {
            self.send({
                "id": "cancelOrders",
                "op": "batch-cancel-orders",
                "args": orders.slice(i, i + 20)
            })
        }

        if (self.verbose) {
            Log("cancelOrders", orders)
        }
    }

    self.amendOrders = function(orders) {
        let ts = new Date().getTime()
        orders.forEach(function(item) {
            let order = self.getOrdersMap(item.instId)[item.clOrdId]
            if (order) {
                order.Price = Number(item.newPx)
                order.Amount = Number(item.newSz)
                order.Update = undefined
                order.Created = ts
            }
        })

        self.send({
            "id": "amendOrders",
            "op": "batch-amend-orders",
            "args": orders,
        })

        if (self.verbose) {
            Log("amendOrders", orders)
        }
    }

    self.processOrders = function(data, reset) {
        let ret = []
        if (Debug) {
            Log("ORDERS: ", data)
        }
        if (reset) {
            self.account.orders = {}
            for (let k in self.account.orderCount) {
                self.account.ordersCount[k] = 0
            }
        }

        data.forEach(function(item) {
            let mp = self.getOrdersMap(item.instId)
            let dataOrder = {
                Id: item.clOrdId,
                instId: item.instId,
                Info: item,
                Price: Number(item.px),
                Amount: Number(item.sz),
                Update: Number(item.uTime || item.cTime),
                Created: Number(item.cTime),
                Type: item.side == "buy" ? ORDER_TYPE_BUY : ORDER_TYPE_SELL,
                DealAmount: Number(item.accFillSz)
            }
            if (item.state == "canceled") {
                dataOrder.Status = ORDER_STATE_CANCELED
                self.account.ordersCount.canceled += 1
                self.account.ordersCount.pending -= 1
            } else if (item.state == "live") {
                dataOrder.Status = ORDER_STATE_PENDING
                if (item.amendResult == "0") {
                    self.account.ordersCount.amend += 1
                } else {
                    self.account.ordersCount.push += 1
                    self.account.ordersCount.pending += 1
                }
            } else if (item.state == "partially_filled") {
                dataOrder.Status = ORDER_STATE_PENDING
            } else if (item.state == "filled") {
                dataOrder.Status = ORDER_STATE_CLOSED
                self.account.ordersCount.filled += 1
                self.account.ordersCount.pending -= 1
            } else {
                throw "unknow order state " + JSON.stringify(item)
            }

            // remove from cancelPending orders
            if (dataOrder.Status != ORDER_STATE_PENDING) {
                delete self.account.cancelPending[dataOrder.Id]
                if (Debug) {
                    Log("remove from cancel pending orders", dataOrder)
                }
            }


            // update anyway
            let oldOrder = mp[dataOrder.Id]

            if ((!oldOrder) || (dataOrder.DealAmount != oldOrder.DealAmount)) {
                if (item.state == "partially_filled" || item.state == "filled") {
                    Log(item.state, item.side, dataOrder, (item.clOrdId.indexOf('YYYYY') == 0 ? '#ff0000' : ''))
                }
            }

            let update = false
            if (dataOrder.Status == ORDER_STATE_PENDING) {
                // 修改订单的命令还没返回就收到了new订单的update导致时间序列错乱
                if (oldOrder) {
                    update = true
                    Object.assign(oldOrder, dataOrder)
                } else {
                    mp[dataOrder.Id] = dataOrder
                    update = true
                }
                if (self.verbose) {
                    let suffix = ''
                    if (self.ctx.depth) {
                        suffix = 'bid:' + JSON.stringify(self.ctx.depth.bids[0]) + ', ask:' + JSON.stringify(self.ctx.depth.asks[0])
                    }
                    Log("update order", Boolean(oldOrder), mp[dataOrder.Id], item, suffix)
                }
            } else {
                update = true
                if (oldOrder) {
                    // avoid ref
                    oldOrder.Status = dataOrder.Status
                    delete mp[dataOrder.Id]
                }
                if (self.verbose) {
                    Log("order " + item.state, dataOrder)
                }
            }

            if (update) {
                ret.push(dataOrder)
            }
            self.account.ordersUpdate = Number(item.uTime || item.cTime)
        })
        return ret
    }

    self.processMsg = function(msg) {
        let obj = JSON.parse(msg)
        if (obj.event == "error") {
            Log("Error:", obj.msg)
            Sleep(1000)
            self.reset()
        } else if (obj.event == "login") {
            Log("Login success")
            self.ws.write(JSON.stringify({
                "op": "subscribe",
                "args": [{
                    "channel": "balance_and_position"
                }, {
                    "channel": "orders",
                    "instType": "ANY"
                }]
            }))
        } else if (obj.event == "subscribe") {
            Log("subscribe OK", obj.arg.channel)
            if (obj.arg.channel == "orders" && !self.isReadyDic["orders"]) {
                let ret = self.ctx.e.IO("api", "GET", "/api/v5/trade/orders-pending")
                if (ret && ret.code == "0") {
                    self.processOrders(ret.data, true)
                    Log("pocess orders ok", self.account.ordersCount)
                } else {
                    Log("process order failed", ret, "#ff0000")
                }
                self.isReadyDic["orders"] = true
            }
        } else if (obj && obj.arg && obj.data && obj.data.length > 0) {
            let event = {}
            if (obj.arg.channel == 'balance_and_position') {
                if (self.verbose) {
                    Log(obj, "#ff0000")
                }
                event.ts = Number(obj.data[0].pTime)
                if (obj.data[0].posData) {
                    let positions = {}
                    obj.data[0].posData.forEach(function(item) {
                        if (typeof(positions[item.instId]) === 'undefined') {
                            positions[item.instId] = 0
                        }
                        positions[item.instId] = {
                            Amount: Number(item.pos),
                            Price: Number(item.avgPx)
                        }
                        self.account.positionsUpdate = Number(item.uTime)
                    })
                    for (let instId in positions) {
                        self.account.positions[instId] = positions[instId]
                    }
                    event.positions = positions
                }
                if (obj.data[0].balData) {
                    let balance = {}
                    obj.data[0].balData.forEach(function(item) {
                        balance[item.ccy] = {
                            free: Number(item.cashBal),
                            locked: 0
                        }
                        self.account.balanceUpdate = Number(item.uTime)
                    })
                    for (let instId in balance) {
                        self.account.balance[instId] = balance[instId]
                    }
                    event.balance = balance
                }
                self.isReadyDic["account"] = true
            } else if (obj.arg.channel == 'orders') {
                event.orders = self.processOrders(obj.data)
                event.ts = Number(obj.data[0].uTime)
            } else {
                // {"id":"amendOrders","op":"batch-amend-orders" ...}
                //Log("DATA RECV", "<" + msg + ">")
            }
            // position change
            if (event.ts) {
                self.ctx.processTick(event)
            }
        } else {
            //Log("RECV", "<" + msg + ">")
        }
    }

    self.poll = function(timeout) {
        let ts = new Date().getTime()
        if (self.lastPing == 0) {
            self.lastPing = ts
        }
        if (self.ws == null) {
            self.ws = Dial("wss://ws.okx.com:8443/ws/v5/private")
            if (self.ws) {
                let tsStr = (ts / 1000).toString()
                let authMsg = {
                    "op": "login",
                    "args": [{
                        "apiKey": AccessKey,
                        "passphrase": Passphrase,
                        "timestamp": tsStr,
                        "sign": self.ctx.e.HMAC("sha256", "base64", tsStr + "GET" + "/users/self/verify", "{{secretkey}}")
                    }]
                }
                self.ws.write(JSON.stringify(authMsg))
            }
        }
        if (!self.ws) {
            return;
        }

        if (ts - self.lastPing > 10000) {
            self.ws.write("ping")
            self.lastPing = ts
        }

        let lastRead = false
        while (true) {
            let msg = self.ws.read(-1)
            if (msg == "") {
                self.reset()
                break
            }
            if (msg == null) {
                if (typeof(timeout) == 'number' && timeout > 0) {
                    msg = self.ws.read(timeout)
                    lastRead = true
                }
            }
            if (msg != null && msg != "") {
                if (msg != "pong" && msg != "ping") {
                    self.processMsg(msg)
                }
            } else {
                break
            }
            if (lastRead) {
                break
            }
        }
    }
    return self
}


function NewOKXWebSocketPublic(e, onLogin, onTick, verbose) {
    var crc32 = function(r) {
        for (var a, o = [], c = 0; c < 256; c++) {
            a = c;
            for (var f = 0; f < 8; f++) a = 1 & a ? 3988292384 ^ a >>> 1 : a >>> 1;
            o[c] = a
        }
        for (var n = -1, t = 0; t < r.length; t++) n = n >>> 8 ^ o[255 & (n ^ r.charCodeAt(t))];
        return (-1 ^ n) >>> 0
    };

    let self = {
        e: e,
        key: e.GetName() + '/' + e.GetCurrency(),
        quoteCurrency: e.GetQuoteCurrency(),
        name: e.GetName(),
        verbose: verbose,
        isFutures: e.GetName().indexOf("Futures_") == 0,
        ws: null,
        channles: [],
        depthCount: 0,
        depthConsumed: 0,
        lastPing: 0,
        depthDic: {
            asks: {},
            bids: {}
        },
        trades: [],
        lastMarket: 0
    }
    self.wsPrivate = NewOKXWebSocketPrivate(self, verbose)

    self.processTick = function(event) {
        if (typeof(onTick) !== 'function') {
            return
        }
        let ret = onTick(self, event)
        if (ret) {
            if (ret.newOrders && ret.newOrders.length > 0) {
                ret.ctx.wsPrivate.batchOrders(ret.newOrders)
            }
            if (ret.amendOrders && ret.amendOrders.length > 0) {
                ret.ctx.wsPrivate.amendOrders(ret.amendOrders)
            }
            if (ret.cancelOrders && ret.cancelOrders.length > 0) {
                ret.ctx.wsPrivate.cancelOrders(ret.cancelOrders)
            }
        }
    }

    self.processMsg = function(msg) {
        let obj = JSON.parse(msg)
        if (obj.event == 'subscribe') {
            self.channles.push(obj)
        } else if (obj.event == 'error') {
            throw obj.msg
        } else if (obj.event == 'login') {
            self.channles = []
            if (typeof(onLogin) === 'function') {
                onLogin(self.ws)
            }
        } else {
            self.lastMarket = new Date().getTime();
            let instId = obj.arg.instId
            let event = {
                instId: instId
            }
            obj.data.forEach(function(item) {
                if (obj.arg.channel == 'trades') {
                    if (typeof(event.trades) === 'undefined') {
                        event.trades = []
                    }
                    event.ts = Number(item.ts)
                    event.trades.push({
                        ts: Number(item.ts),
                        side: item.side,
                        price: Number(item.px),
                        qty: Number(item.sz)
                    })
                } else if (obj.arg.channel == 'books5' || obj.arg.channel == 'books' || obj.arg.channel == 'books50-l2-tbt') {
                    let tsBegin = UnixNano()
                    let depth = {
                        asks: [],
                        bids: []
                    }
                    if (obj.arg.channel == 'books5') {
                        item.asks.forEach(function(pair) {
                            depth.asks.push({
                                price: Number(pair[0]),
                                qty: Number(pair[1])
                            })
                        })
                        item.bids.forEach(function(pair) {
                            depth.bids.push({
                                price: Number(pair[0]),
                                qty: Number(pair[1])
                            })
                        })
                    } else {
                        let depthDic = self.depthDic[instId]
                        if (typeof(depthDic) === 'undefined') {
                            depthDic = {
                                count: 0,
                                dic: {}
                            }
                            self.depthDic[instId] = depthDic
                        }
                        depthDic.count += 1
                        for (let k in depthDic.dic) {
                            if (obj.action == 'snapshort') {
                                depthDic.dic[k] = {}
                            }
                            let mp = depthDic.dic[k]
                            item[k].forEach(function(book) {
                                if (book[1] == '0') {
                                    delete mp[book[0]]
                                } else {
                                    mp[book[0]] = [book[1], book[3], item['ts']]
                                }
                            })
                        }

                        for (let k in depth) {
                            let n = k == 'asks' ? 1 : -1
                            let mp = depthDic.dic[k]
                            Object.keys(depthDic.dic[k]).sort(function(a, b) {
                                return n * (Number(a) - Number(b))
                            }).forEach(function(x) {
                                // keep string for 
                                depth[k].push({
                                    price: x,
                                    qty: mp[x][0]
                                })
                            })
                        }

                        if (depthDic.count % 5000 == 0) {
                            let s = []
                            for (let i = 0; i < 25; i++) {
                                ['bids', 'asks'].forEach(function(k) {
                                    if (i < depth[k].length) {
                                        s.push(depth[k][i].price + ':' + depth[k][i].qty)
                                    }
                                })
                            }
                            if (crc32(s.join(":")) != Uint32Array.from(Int32Array.of(item.checksum))[0]) {
                                throw "depth checksum error"
                            }
                        }
                        // convert to number
                        for (let dir in depth) {
                            let books = depth[dir]
                            for (let i = 0; i < books.length; i++) {
                                books[i].price = Number(books[i].price)
                                books[i].qty = Number(books[i].qty)
                            }
                        }
                    }
                    event.depth = depth
                    event.ts = Number(item.ts)
                }
            })
            if (event.ts) {
                self.processTick(event)
            }
        }
    }

    self.isReady = function() {
        return self.wsPrivate.isReadyDic["account"] && self.wsPrivate.isReadyDic["orders"]
    }

    self.reset = function() {
        if (self.ws) {
            self.ws.close()
            self.ws = null
        }
        self.isReadyDic = {}
    }

    self.poll = function(timeout) {
        let ts = new Date().getTime()
        if (self.lastPing == 0) {
            self.lastPing = ts
        }
        if (!self.ws) {
            self.ws = Dial("wss://ws.okx.com:8443/ws/v5/public")
            if (!self.ws) {
                return
            }
            let tsStr = (new Date().getTime() / 1000).toString()
            let authMsg = {
                "op": "login",
                "args": [{
                    "apiKey": AccessKey,
                    "passphrase": Passphrase,
                    "timestamp": tsStr,
                    "sign": self.e.HMAC("sha256", "base64", tsStr + "GET" + "/users/self/verify", "{{secretkey}}")
                }]
            }
            self.ws.write(JSON.stringify(authMsg))
        }
        if (!self.ws) {
            return
        }
        if (ts - self.lastPing > 10000) {
            self.ws.write("ping")
            self.lastPing = ts
        }
        let lastRead = false
        while (true) {
            let msg = self.ws.read(-1)
            if (msg == "") {
                self.reset()
                break
            }
            if (msg == null) {
                if (typeof(timeout) == 'number' && timeout > 0) {
                    msg = self.ws.read(timeout)
                    lastRead = true
                }
            }
            if (msg != null && msg != "") {
                if (msg != "pong") {
                    self.wsPrivate.poll()
                    self.processMsg(msg)
                    self.wsPrivate.poll()
                }
            } else {
                break
            }
            if (lastRead) {
                break
            }
        }
        self.wsPrivate.poll()
    }
    return self
}


function NewBinanceSocketPrivate(ctx, verbose) {
    let self = {
        ws: null,
        verbose: verbose,
        isFutures: ctx.isFutures,
        isReadyDic: {},
        lastPing: 0,
        ctx: ctx,
        useMargin: UseMargin,
        quoteCurrency: ctx.e.GetQuoteCurrency(),
        listenKey: null,
        listenKeyUpdate: 0,
        account: {
            cancelPending: {},
            orders: {},
            ordersCount: {
                pending: 0,
                buy: 0,
                amend: 0,
                canceled: 0,
                filled: 0,
                push: 0,
                sell: 0
            },
            positions: {},
            balance: {},
            init_balance: {},
            balanceUpdate: 0,
            positionsUpdate: 0,
            ordersUpdate: 0
        }
    }

    if (ctx.isFutures) {
        // TODO change to api
        let positions = _C(exchange.GetPosition)
        positions.forEach(function(pos) {
            // avoid self.instId
            self.account.positions[self.instId] = {
                Amount: pos.Amount * (pos.Type == PD_LONG ? 1 : -1),
                Price: pos.Price
            }
        })
    }
    let pair = ctx.e.GetCurrency().split('_')
    if (self.useMargin && !ctx.isFutures) {
        let ret = _C(ctx.e.IO, "api", "GET", "/sapi/v1/margin/account")
        ret.userAssets.forEach(function(item) {
            if (item.asset == pair[0] || item.asset == pair[1]) {
                self.account.balance[item.asset] = {
                    free: Number(item.free),
                    locked: Number(item.locked),
                    borrowed: Number(item.borrowed),
                }
                self.account.init_balance[item.asset] = {
                    free: Number(item.free),
                    locked: Number(item.locked),
                    borrowed: Number(item.borrowed)
                }
            }
        })
        self.tradePath = "/sapi/v1/margin/order"
    } else {
        let acc = _C(ctx.e.GetAccount)
        self.account.balance[pair[1]] = {
            free: acc.Balance,
            locked: acc.FrozenBalance,
            borrowed: 0
        }
        self.account.balance[pair[0]] = {
            free: acc.Stocks,
            locked: acc.FrozenStocks,
            borrowed: 0
        }
        self.account.init_balance[pair[1]] = {
            free: acc.Balance,
            locked: acc.FrozenBalance,
            borrowed: 0
        }
        self.account.init_balance[pair[0]] = {
            free: acc.Stocks,
            locked: acc.FrozenStocks,
            borrowed: 0
        }
        self.tradePath = "/api/v3/order"
    }
    Log(ctx.e.GetName(), self.account.init_balance)

    self.getOrdersMap = function(instId) {
        let mp = self.account.orders[instId]
        if (typeof(mp) === 'undefined') {
            mp = {}
            self.account.orders[instId] = mp
        }
        return mp
    }

    self.reset = function() {
        if (self.ws) {
            self.ws.close()
            self.ws = null
        }
        self.isReadyDic = {}
    }

    self.send = function(msg) {
        if (!self.ws) {
            return
        }
        self.ws.write(JSON.stringify(msg))
    }

    self.batchOrders = function(orders) {
        let args = []
        let ts = new Date().getTime()
        orders.forEach(function(item) {
            args.push({
                symbol: item.instId,
                side: item.side.toUpperCase(),
                newClientOrderId: item.clOrdId,
                type: "LIMIT",
                price: item.px,
                quantity: item.sz,
                timeInForce: "GTX",
            })
            self.getOrdersMap(item.instId)[item.clOrdId] = {
                Id: item.clOrdId,
                Created: ts,
                instId: item.instId,
                Price: Number(item.px),
                Amount: Number(item.sz),
                Type: item.side == "sell" ? ORDER_TYPE_SELL : ORDER_TYPE_BUY,
                DealAmount: 0,
                Status: ORDER_STATE_PENDING
            }
        })
        let rets = []
        if (self.ctx.isFutures) {
            for (let i = 0; i < args.length; i += 5) {
                let ret = self.ctx.e.IO("api", "POST", "/fapi/v1/batchOrders", "", JSON.stringify({
                    batchOrders: args.slice(i, i + 5)
                }))
                if (self.verbose) {
                    Log("batchOrders", args, ret)
                }
                rets.push(ret)
            }
        } else {
            orders.forEach(function(order) {
                let ret = self.ctx.e.IO("api", "POST", self.tradePath, "", JSON.stringify(order))
                if (self.verbose) {
                    Log("batchOrders", args, ret)
                }
                rets.push(ret)
            })
        }
        return rets
    }

    self.cancelOrders = function(orders) {
        let ts = new Date().getTime()

        orders.forEach(function(item) {
            if (typeof(self.account.cancelPending[item.clOrdId]) === 'undefined') {
                self.account.cancelPending[item.clOrdId] = {
                    retry: 0,
                    ts: ts,
                    data: item
                }
            }
        })

        for (let id in self.account.cancelPending) {
            let item = self.account.cancelPending[id]
            if (ts - item.ts > 10000) {
                // recancel
                orders.push(item.data)
                Log("remove timeout order", item, "#ff0000")
                item.retry += 1
                item.ts = ts
            }
            if (item.retry > 10) {
                Log("force remove order", item, "#ff0000")
                delete self.account.cancelPending[id]
            }
        }


        let mp = {}
        orders.forEach(function(item) {
            if (typeof(mp[item.instId]) === 'undefined') {
                mp[item.instId] = []
            }
            mp[item.instId].push(item.clOrdId)
            delete self.getOrdersMap(item.instId)[item.clOrdId]
        })
        let rets = []
        if (self.ctx.isFutures) {
            for (let instId in mp) {
                let arr = mp[instId]
                for (let i = 0; i < arr.length; i += 5) {
                    let ids = arr.slice(i, i + 5)
                    let ret = self.ctx.e.IO("api", "DELETE", "/fapi/v1/batchOrders", "", JSON.stringify({
                        symbol: instId,
                        origClientOrderIdList: ids
                    }))
                    if (self.verbose) {
                        Log("cancelOrders", instId, arr, ret)
                    }
                    // cancel success
                    if (ret && ret.length == ids.length) {
                        ids.forEach(function(id) {
                            delete self.account.cancelPending[id]
                        })
                    }
                    rets.push(ret)
                }
            }
        } else {
            for (let i = 0; i < orders.length; i += 5) {
                let ret = self.ctx.e.IO("api", "DELETE", self.tradePath, "", JSON.stringify(orders[i]))
                if (self.verbose) {
                    Log("cancelOrders", orders[i], ret)
                }
                rets.push(ret)
            }
        }

        return rets
    }

    self.processOrders = function(ts, data, reset) {
        if (Debug) {
            Log("ORDERS: ", data)
        }
        if (reset) {
            self.account.orders = {}
            for (let k in self.account.orderCount) {
                self.account.ordersCount[k] = 0
            }
        }
        let ret = []
        data.forEach(function(item) {
            let instId = item.s
            let dataOrder = {
                Id: item.C || item.c,
                instId: instId,
                Info: item,
                Price: Number(item.p),
                Amount: Number(item.q),
                Update: Number(item.T || item.E || ts),
                Created: Number(item.O || item.E || ts),
                Type: item.S == "BUY" ? ORDER_TYPE_BUY : ORDER_TYPE_SELL,
                DealAmount: Number(item.z)
            }
            if (typeof(item.ap) !== 'undefined') {
                dataOrder.AvgPrice = Number(item.ap)
            } else if (Number(item.z) > 0 && typeof(item.Z) !== 'undefined') {
                dataOrder.AvgPrice = Number(item.Z) / Number(item.z)
            }

            if (item.X == "NEW") {
                dataOrder.Status = ORDER_STATE_PENDING
                self.account.ordersCount.push += 1
                self.account.ordersCount.pending += 1
            } else if (item.X == "PARTIALLY_FILLED") {
                dataOrder.Status = ORDER_STATE_PENDING
            } else if (item.X == "FILLED") {
                dataOrder.Status = ORDER_STATE_CLOSED
                self.account.ordersCount.filled += 1
                self.account.ordersCount.pending -= 1
            } else if (item.X == "CANCELED" || item.X == "EXPIRED" || item.X == "REJECT" || item.X == "NEW_INSURANCE" || item.X == "NEW_ADL") {
                dataOrder.Status = ORDER_STATE_CANCELED
                self.account.ordersCount.canceled += 1
                self.account.ordersCount.pending -= 1
            } else {
                throw "unknow order state " + JSON.stringify(item)
            }

            // remove from cancelPending orders
            if (dataOrder.Status != ORDER_STATE_PENDING) {
                delete self.account.cancelPending[dataOrder.Id]
                if (Debug) {
                    Log("remove from cancel pending orders", dataOrder)
                }
            }

            let mp = self.getOrdersMap(instId)
            let oldOrder = mp[dataOrder.Id]

            if ((!oldOrder) || (dataOrder.DealAmount != oldOrder.DealAmount)) {
                if (item.X == "PARTIALLY_FILLED" || item.X == "FILLED") {
                    Log(item.X, item.S, dataOrder)
                }
            }

            let update = false
            if (dataOrder.Status == ORDER_STATE_PENDING) {
                // 修改订单的命令还没返回就收到了new订单的update导致时间序列错乱
                if (oldOrder) {
                    update = true
                    Object.assign(oldOrder, dataOrder)
                } else {
                    mp[dataOrder.Id] = dataOrder
                    update = true
                }
                if (self.verbose) {
                    Log("update order", Boolean(oldOrder), mp[dataOrder.Id], item)
                }
            } else {
                update = true
                if (oldOrder) {
                    // avoid ref
                    Object.assign(oldOrder, dataOrder)
                    delete mp[dataOrder.Id]
                }
                if (self.verbose) {
                    Log("order " + item.X, dataOrder)
                }
            }
            if (update) {
                ret.push(dataOrder)
            }
            self.account.ordersUpdate = ts
        })
        return ret
    }

    self.processPrivateMsg = function(msg) {
        let obj = JSON.parse(msg)
        if (self.verbose) {
            Log(obj, "#ff0000")
        }
        let event = {}
        if (obj.e == "ORDER_TRADE_UPDATE") {
            event.orders = self.processOrders(obj.E, [obj.o])
            event.ts = obj.E
            self.isReadyDic["orders"] = true
        } else if (obj.e == "ACCOUNT_UPDATE" && obj.a) {
            event.ts = obj.E
            self.account.balanceUpdate = obj.E
            if (obj.a.B) {
                obj.a.B.forEach(function(item) {
                    self.account.balance[item.a] = {
                        free: Number(item.wb),
                        locked: 0
                    }
                })
                event.balance = self.account.balance
            }
            if (obj.a.P) {
                obj.a.P.forEach(function(item) {
                    self.account.positions[item.s] = {
                        Amount: Number(item.pa),
                        Price: Number(item.ep)
                    }
                    self.account.positionsUpdate = obj.E
                })
                event.positions = self.account.positions
            }
            self.isReadyDic["account"] = true
        } else if (obj.e == "outboundAccountPosition") {
            event.ts = obj.E
            self.account.balanceUpdate = obj.E
            if (obj.B) {
                obj.B.forEach(function(item) {
                    self.account.balance[item.a] = {
                        free: Number(item.f),
                        locked: Number(item.l)
                    }
                })
                event.balance = self.account.balance
            }
            self.isReadyDic["account"] = true
        } else if (obj.e == "executionReport") {
            event.orders = self.processOrders(obj.E, [obj])
            event.ts = obj.E
            self.isReadyDic["orders"] = true
        } else {
            //Log("RECV", "<" + msg + ">")
        }

        if (event.ts) {
            self.ctx.processTick(event)
        }
    }

    self.poll = function(timeout) {
        let ts = new Date().getTime()
        if (self.lastPing == 0) {
            self.lastPing = ts
        }
        if (!self.listenKey || ts - self.listenKeyUpdate > 60000 * 30) {
            let ret = self.ctx.e.IO("api", "POST", self.ctx.isFutures ? "/fapi/v1/listenKey" : (self.useMargin ? "/sapi/v1/userDataStream" : "/api/v3/userDataStream"))
            if (ret && ret.listenKey) {
                self.listenKey = ret.listenKey
                self.listenKeyUpdate = ts
                //Log("ListenKey Update", ret)
            }
        }
        if (!self.listenKey) {
            return
        }
        if (self.ws == null) {
            let base = (self.ctx.isFutures ? 'wss://fstream.binance.com' : 'wss://stream.binance.com:9443') + "/ws"
            Log("Dial: ", base + "/*****")
            self.ws = Dial(base + "/" + self.listenKey)
        }

        if (!self.ws) {
            return;
        }

        let lastRead = false
        while (true) {
            let msg = self.ws.read(-1)
            if (msg == "") {
                Log("Binance private websocket reset")
                self.reset()
                break
            }
            if (msg == null) {
                if (typeof(timeout) == 'number' && timeout > 0) {
                    msg = self.ws.read(timeout)
                    lastRead = true
                }
            }
            if (msg != null && msg != "") {
                //Log("BP", msg)
                if (msg == "ping") {
                    self.ws.write("pong")
                    self.lastPing = ts
                } else if (msg != "pong") {
                    self.processPrivateMsg(msg)
                }
            } else {
                break
            }
            if (lastRead) {
                break
            }
        }
    }
    return self
}

function NewBinanceWebSocketPublic(e, onLogin, onTick, verbose) {
    let self = {
        e: e,
        key: e.GetName() + '/' + e.GetCurrency(),
        quoteCurrency: e.GetQuoteCurrency(),
        name: e.GetName(),
        isFutures: e.GetName().indexOf("Futures_") == 0,
        verbose: verbose,
        ws: null,
        depthCount: 0,
        depthConsumed: 0,
        lastPing: 0,
        isReadyDic: {},
        trades: []
    }
    self.base = self.isFutures ? "wss://fstream.binance.com/ws" : "wss://stream.binance.com/ws"

    self.wsPrivate = NewBinanceSocketPrivate(self, verbose)

    self.isReady = function() {
        return self.depth && self.wsPrivate.isReadyDic["account"] && self.wsPrivate.isReadyDic["orders"]
    }

    self.reset = function() {
        if (self.ws) {
            self.ws.close()
            self.ws = null
        }
        self.isReadyDic = {}
    }

    self.processTick = function(event) {
        if (typeof(onTick) !== 'function') {
            return
        }
        let ret = onTick(self, event)
        if (ret) {
            if (ret.amendOrders && ret.amendOrders.length > 0) {
                ret.ctx.wsPrivate.amendOrders(ret.amendOrders)
            }
            if (ret.cancelOrders && ret.cancelOrders.length > 0) {
                ret.ctx.wsPrivate.cancelOrders(ret.cancelOrders)
            }
            if (ret.newOrders && ret.newOrders.length > 0) {
                ret.ctx.wsPrivate.batchOrders(ret.newOrders)
            }
        }
    }

    self.processMsg = function(msg) {
        let obj = JSON.parse(msg)
        if (obj.error) {
            Log(obj)
            throw obj.error.msg
        }

        let event = {
            ts: obj.E,
            instId: obj.s
        }

        if (obj.e == "depthUpdate") {
            let depth = {
                asks: [],
                bids: []
            }
            obj.b.forEach(function(item) {
                depth.bids.push({
                    price: Number(item[0]),
                    qty: Number(item[1])
                })
            })
            obj.a.forEach(function(item) {
                depth.asks.push({
                    price: Number(item[0]),
                    qty: Number(item[1])
                })
            })
            event.depth = depth
            //self.depthTime = obj.data.E
        } else if (obj.e == 'aggTrade') {
            event.ts = obj.E
            event.trades = [{
                price: Number(obj.p),
                qty: Number(obj.q),
                ts: obj.T,
                side: obj.m ? "sell" : "buy"
            }]
        } else if (typeof(obj.asks) !== 'undefined') {
            let depth = {
                asks: [],
                bids: []
            }
            obj.bids.forEach(function(item) {
                depth.bids.push({
                    price: Number(item[0]),
                    qty: Number(item[1])
                })
            })
            obj.asks.forEach(function(item) {
                depth.asks.push({
                    price: Number(item[0]),
                    qty: Number(item[1])
                })
            })
            event.ts = obj.E || new Date().getTime()
            event.depth = depth
        } else {
            Log(">>>", msg)
            return
        }

        self.processTick(event)
    }

    self.isReady = function() {
        return true
    }

    self.poll = function(timeout) {
        let ts = new Date().getTime()
        if (!self.ws) {
            Log("Dial: ", self.base)
            self.ws = Dial(self.base)
            if (self.ws) {
                onLogin(self.ws)
            }
        }
        if (!self.ws) {
            return
        }
        let lastRead = false
        while (true) {
            let msg = self.ws.read(-1)
            if (msg == "") {
                if (Debug) {
                    Log("DEBUG> RESET websocket", self.base, "#ff0000")
                }
                self.reset()
                break
            }
            if (msg == null) {
                if (typeof(timeout) == 'number' && timeout > 0) {
                    msg = self.ws.read(timeout)
                    lastRead = true
                }
            }
            if (msg != null && msg != "") {
                if (msg == "ping") {
                    self.ws.write("pong")
                    self.lastPing = ts
                } else if (msg == "pong") {
                    // ignore
                } else {
                    self.wsPrivate.poll()
                    self.processMsg(msg)
                    self.wsPrivate.poll()
                }
            } else {
                break
            }
            if (lastRead) {
                break
            }
        }
        self.wsPrivate.poll()
    }
    return self
}

$.NewWSS = function(e, onLogin, onTick, Verbose) {
    let pfn = null
    if (e.GetName().indexOf("Binance") != -1) {
        pfn = NewBinanceWebSocketPublic
        Log("New Binance websocket", e.GetName(), e.GetCurrency())
    } else {
        pfn = NewOKXWebSocketPublic
        Log("New OKX Websocket", e.GetName(), e.GetCurrency())
    }
    return pfn(e, onLogin, onTick, Verbose)
}


function onTick(ctx, event) {
    if (event.depth) {
        Log("depth update", event)
    }
    if (event.trades) {
        Log("trades received", event)
    }
    if (event.balance || event.positions) {
        Log("account update", event)
    }
    if (event.orders) {
        Log("private orders update", event)
    }
    
    // include orders, positions, balance for latest
    Log("account", ctx.wsPrivate.account)
    
    // for test only
    if (false) {
        return {
            amendOrders: [{
                instId: event.instId,
                clOrdId: "xxxx*****",
                cxlOnFail: true,
                newSz: "2",
            }],
            newOrders: [{
                instId: event.instId,
                clOrdId: UUID(),
                side: "sell",
                tdMode: "cross",
                ordType: "post_only",
                px: event.depth.asks[0].price.toFixed(4),
                sz: "1",
            }, {
                instId: event.instId,
                clOrdId: UUID(),
                side: "sell",
                tdMode: "cross",
                ordType: "post_only",
                px: event.depth.asks[0].price.toFixed(4),
                sz: "1",
            }],
            cancelOrders: [{
                instId: order.instId,
                clOrdId: order.Id
            }]
        }
    }
}

function main() {
    let instId = exchange.SetContractType("swap").InstrumentID

    let ctx = $.NewWSS(exchange, function(ws) {
        let msg = null
        if (exchange.GetName() === 'Futures_OKCoin') {
            msg = {
                op: "subscribe",
                args: [{
                    channel: "books5",
                    instId: instId,
                }, {
                    channel: "trades",
                    instId: instId,
                }]
            }
        } else {
            let symbol = exchange.GetCurrency().replace('_', '').toLowerCase()
            msg = {
                method: "SUBSCRIBE",
                params: [symbol + "@aggTrade", symbol + "@depth20@100ms"],
                id: "1",
            }
        }
        ws.write(JSON.stringify(msg))
        Log("subscribe", msg, "channel")
        LogStatus("Ready")
    }, onTick, Debug)

    while (true) {
        ctx.poll()
        EventLoop(1000)
    }
}



More

allez-z 能不能简单说下EventLoop函数的执行逻辑,我看api文档立面没有相关介绍

奥克量化 Zero总牛🍺

发明者量化 晚些会加到文档里去