Trình điều khiển tăng tốc WebSocket


Ngày tạo: 2024-10-24 18:11:09 sửa đổi lần cuối: 2024-11-14 20:04:22
sao chép: 0 Số nhấp chuột: 1692
11
tập trung vào
607
Người theo dõi

Thư viện mẫu tăng tốc dữ liệu thực tế dựa trên WebSocket

mô tả:

Chiến lược này tối ưu hóa xử lý dữ liệu thực tế của nhiều sàn giao dịch thông qua kết nối WebSocket đa luồng. Sử dụng kết nối WebSocket trong nền tảng giao dịch định lượng FMZ để có được sổ đặt hàng sâu và dữ liệu giao dịch, làm giảm đáng kể độ trễ trong việc lấy dữ liệu thị trường và nâng cao hiệu suất của hệ thống giao dịch tần số cao.

Các tính năng chính:

  • Hỗ trợ nhiều sàn giao dịchChiến lược này hỗ trợ kết nối WebSocket với nhiều sàn giao dịch như Binance, OKX, Bybit, Bitget, cung cấp luồng dữ liệu nhanh hơn và ổn định hơn so với các cuộc thăm dò API REST truyền thống.
  • Đăng ký tùy chỉnh: Cho phép đăng ký các kênh thị trường cụ thể (như độ sâu, giao dịch, v.v.) và xử lý hiệu quả dữ liệu nhận được để sử dụng ngay cho các chiến lược giao dịch.
  • Xử lý lỗi nâng cao: Cơ chế theo dõi lỗi tích hợp và kết nối lại WebSocket để đảm bảo độ tin cậy và tính liên tục của luồng dữ liệu.
  • Kiểm tra CRC32Đối với các sàn giao dịch như OKX, tính năng kiểm tra CRC32 được tích hợp để đảm bảo tính toàn vẹn của dữ liệu sổ đơn đặt hàng được nhận.

Giải pháp dựa trên WebSocket này thay thế các cuộc thăm dò API truyền thống và phù hợp với các nhà giao dịch cần tối thiểu hóa độ trễ và tối đa hóa tốc độ phản ứng của thị trường.

Hướng dẫn:

  1. khởi tạo:sử dụng $.setupWebsocket() Khởi tạo kết nối WebSocket tới điểm trao đổi mục tiêu.
  2. đăng ký:Hệ thống sẽ tự động đăng ký các kênh liên quan (như độ sâu, giao dịch, v.v.) cho các sản phẩm bạn giao dịch.
  3. Thu thập dữ liệu: Bằng cách gọiGetDepth()GetTrades() Chức năng này tự động sử dụng dữ liệu thời gian thực của WebSocket để trả về thông tin chi tiết về thị trường và hồ sơ giao dịch.
  4. Xử lý lỗi:Chính sách này bao gồm cơ chế theo dõi ghi lại lỗi kết nối và dữ liệu và tự động cố gắng kết nối lại nếu mất kết nối.

Các kịch bản này được thiết kế để chạy trên nền tảng định lượng FMZ, cung cấp truy cập dữ liệu thị trường nhanh, đáng tin cậy và có thể mở rộng cho nhiều sàn giao dịch.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
            // support custom and auto subsribe Eg: e.GetDepth('ETH_USDT')
        })
        EventLoop(100) // trigger by websocket or use Sleep control delay
    }
}
Mã nguồn chiến lược
// @ts-check

$.setupWebsocket = function(main_exchanges) {
    let crc32 = function (r) {
        for (var a, o = [], c = 0; c < 256; c++) {
            a = c;
            for (var f = 0; f < 8; f++) a = 1 & a ? 3988292384 ^ a >>> 1 : a >>> 1;
            o[c] = a
        }
        for (var n = -1, t = 0; t < r.length; t++) n = n >>> 8 ^ o[255 & (n ^ r.charCodeAt(t))];
        return (-1 ^ n) >>> 0
    }

    let trace = function (...args: any[]) {
        args.unshift('#' + __threadId())
        Log.apply(this, args)
    }
    
    let MyDial = function(address) : IDial {
        let ws = Dial(address)
        if (ws) {
            let createTime = new Date().getTime()
            let oldClose = ws.close
            ws.close = function() {
                oldClose.apply(ws)
                trace(address, "closed after", (new Date().getTime() - createTime)/1e6, "seconds")
            }
        }
        trace("connect", address, ws ? "success" : "failed")
        return ws
    }

    interface ICtx {
        name: string
        symbols: string[]
        callback?: Function
    }
    let lastUpdateSymbolsTime = new Date().getTime()
    let updateSymbols = function(ctx: ICtx, callBack: Function) {
        let ts = new Date().getTime()
        if (ts - lastUpdateSymbolsTime < 5000) {
            return
        }
        lastUpdateSymbolsTime = ts
        let e = __threadPeekMessage(-1)
        if (e) {
            trace("subscribe", e, "#ff0000")
            callBack(e.symbol, e.method)
            if (e.method == "subscribe") {
                ctx.symbols.push(e.symbol)
            } else {
                ctx.symbols = ctx.symbols.filter(symbol=>symbol!=e.symbol)
            }
        }
    }

    let supports = {}
    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let ts = new Date().getTime()
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }

    supports["OK"] = function (ctx:ICtx) {
        let depthDicAll = {}
        let processMsg = function (msg) {
            let obj = JSON.parse(msg)
            if (obj.error) {
                trace(obj.error.msg, "#ff0000")
                return
            }
    
            if (obj.event == 'subscribe') {
                // ignore
            } else if (obj.event == 'error') {
                trace(obj.msg, "#ff0000")
                return
            } else if (obj.event == 'login') {
            } else {
                if (!obj.data) {
                    trace(msg)
                    return
                }
                let instId = obj.arg.instId
    
                let event = {
                    ts: new Date().getTime(),
                    instId: instId,
                    depth: null,
                    trades: [],
                }
    
                obj.data.forEach(function (item) {
                    if (obj.arg.channel == 'trades') {
                        event.ts = Number(item.ts)
                        event.trades.push({
                            ts: Number(item.ts),
                            side: item.side,
                            price: Number(item.px),
                            qty: Number(item.sz)
                        })
                    } else if (obj.arg.channel == 'books5' || obj.arg.channel == 'books' || obj.arg.channel == 'books50-l2-tbt') {
                        let depth = {
                            asks: [],
                            bids: []
                        }
                        if (obj.arg.channel == 'books5') {
                            item.asks.forEach(function (pair) {
                                depth.asks.push({
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                                })
                            })
                            item.bids.forEach(function (pair) {
                                depth.bids.push({
                                    price: Number(pair[0]),
                                    qty: Number(pair[1])
                                })
                            })
                        } else {
                            let depthDic = depthDicAll[instId]
                            if (typeof (depthDic) === 'undefined') {
                                depthDic = {
                                    count: 0,
                                    dic: {
                                        asks: {},
                                        bids: {}
                                    }
                                }
                                depthDicAll[instId] = depthDic
                            }
                            depthDic.count += 1
                            for (let k in depthDic.dic) {
                                if (obj.action == 'snapshot') {
                                    depthDic.dic[k] = {}
                                }
                                let mp = depthDic.dic[k]
                                item[k].forEach(function (book) {
                                    if (book[1] == '0') {
                                        delete mp[book[0]]
                                    } else {
                                        mp[book[0]] = [book[1], book[3], item['ts']]
                                    }
                                })
                            }
    
                            for (let k in depth) {
                                let n = k == 'asks' ? 1 : -1
                                let mp = depthDic.dic[k]
                                Object.keys(depthDic.dic[k]).sort(function (a, b) {
                                    return n * (Number(a) - Number(b))
                                }).forEach(function (x) {
                                    // keep string for 
                                    depth[k].push({
                                        price: x,
                                        qty: mp[x][0]
                                    })
                                })
                            }
    
                            if (depthDic.count % 5000 == 0) {
                                let s = []
                                for (let i = 0; i < 25; i++) {
                                    ['bids', 'asks'].forEach(function (k) {
                                        if (i < depth[k].length) {
                                            s.push(depth[k][i].price + ':' + depth[k][i].qty)
                                        }
                                    })
                                }
                                if (crc32(s.join(":")) != Uint32Array.from(Int32Array.of(item.checksum))[0]) {
                                    throw "depth checksum error"
                                }
                            }
                            // convert to number
                            for (let dir in depth) {
                                let books = depth[dir]
                                for (let i = 0; i < books.length; i++) {
                                    books[i].price = Number(books[i].price)
                                    books[i].qty = Number(books[i].qty)
                                }
                            }
                        }
                        event.depth = depth
                        event.ts = Number(item.ts)
                    }
                })
                return event
            }
        }
        let channels = ['books5', 'trades']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                depthDicAll = {} // reset
                ws = MyDial("wss://ws.okx.com:8443/ws/v5/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push({
                                channel: channel,
                                instId: symbol // "BTC-USDT"
                            })
                        })
                    })
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ "op": "subscribe", "args": subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>({channel: c, instId: symbol})) 
                }))
            })
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write("ping")
                lastPing = ts
            }
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            } 
            if (msg != 'pong') {
                let event = processMsg(msg)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {
                    ctx.callback(event)
                }
            }
        }
    }

    supports["Bybit"] = function (ctx:ICtx) {
        let depthMp = {}
        let processMsg = function (obj:any) {
            let event = {
                ts: obj.ts,
                instId: obj.data.s,
                depth: null,
                trades: [],
            }

            if (obj.topic.startsWith("orderbook")) {
                let depthDic = depthMp[event.instId]
                if (typeof(depthDic) === 'undefined') {
                    depthDic = {}
                    depthMp[event.instId] = depthDic
                }
                let data = {asks: obj.data.a, bids: obj.data.b}
                let depth = {asks: [], bids: []}
                if (obj.type == "snapshot") {
                    depthDic.asks = {}
                    depthDic.bids = {}
                    for (let k in depth) {
                        if (!data[k]) {
                            continue
                        }
                        depth[k] = data[k].map(function(item) {
                            depthDic[k][item[0]] = item[1]
                            return {
                                price: Number(item[0]),
                                qty: Number(item[1]),
                            }
                        })
                    }
                } else if (obj.type == "delta") {
                    depthDic.count++
                    for (let k in depth) {
                        if (!data[k]) {
                            continue
                        }
                        data[k].forEach(function(item) {
                            if (item[1] == '0') {
                                delete depthDic[k][item[0]]
                            } else {
                                depthDic[k][item[0]] = item[1]
                            }
                        })
                    }
                    // build depth
                    for (let k in depth) {
                        let n = k == 'asks' ? 1 : -1
                        let mp = depthDic[k]
                        Object.keys(depthDic[k]).sort(function(a, b) {
                            return n * (Number(a) - Number(b))
                        }).forEach(function(x) {
                            depth[k].push({
                                price: x,
                                qty: mp[x]
                            })
                        })
                    }
                    // convert to number
                    for (let dir in depth) {
                        depth[dir].forEach(function(item) {
                            item.price = Number(item.price)
                            item.qty = Number(item.qty)
                        })
                    }
                }
                depth.asks = depth.asks.slice(0, 50)
                depth.bids = depth.bids.slice(0, 50)
                event.depth = depth
            } else if (obj.topic.startsWith("publicTrade")) {
                event.trades = obj.data.map(function(item) {
                    if (!event.instId) {
                        event.instId = item.s
                    }
                    return {
                        //id: item.i,
                        ts: item.T,
                        price: Number(item.p),
                        qty: Number(item.v),
                        side: item.S.toLowerCase()
                    }
                })
            }
            return event
        }
        let channels = ["orderbook.50", "publicTrade"]

        let ws = null
        let endPoint = "wss://stream.bybit.com/v5/public/spot"
        let isInverse = false
        ctx.symbols.forEach(s=>{
            let prefix = s.split('.')[0]
            if (!prefix.endsWith('USDT') && !prefix.endsWith('USDC')) {
                isInverse = true;
            } else {
                if (isInverse) {
                    throw "symbols type not support concurrent"
                }
            }
        })
        if (ctx.name == "Futures_Bybit") {
            endPoint = "wss://stream.bybit.com/v5/public/linear"
            if (isInverse) {
                endPoint = "wss://stream.bybit.com/v5/public/inverse"
            }
        }
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial(endPoint)
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push(channel+ "." + symbol)
                        })
                    })
                    if (subscribes.length > 0) {
                        ws.write(JSON.stringify({ op:"subscribe", args: subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write(JSON.stringify({ op: "ping" }))
                lastPing = ts
            }

            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>c+"."+symbol)
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                    depthMp = {}
                }
                continue
            } else if (msg) {
                let obj = JSON.parse(msg)
                if (obj.op == "ping" && obj.ret_msg == "pong") {
                    // ignore
                } else if (obj.op == 'pong') {
                } else if (obj.op == 'subscribe' || obj.op == 'unsubscribe' || !obj.topic) {
                    trace(obj)
                } else {
                    try {
                        let event = processMsg(obj)
                        if (event && (event.depth || event.trades.length > 0)) {
                            ctx.callback(event)
                        }
                    } catch(e) {
                        trace("Error:", msg)
                        ws.close()
                    }
                }
            }
        }
    }

    supports["Bitget"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: Number(obj.ts),
                instId: obj.arg.instId,
                depth: null,
                trades: [],
            }

            if (obj.arg.channel == 'trade') {
                event.trades = obj.data.map(function(item) {
                    return {
                        //id: Number(item.tradeId),
                        ts: Number(item.ts),
                        price: Number(item.price),
                        qty: Number(item.size),
                        side: item.side.toLowerCase()
                    }
                })
            } else if (obj.arg.channel == 'books15') {
                let depth = {asks: [], bids: []}
                for (let k in depth) {
                    if (!obj.data[0][k]) {
                        continue
                    }
                    depth[k] = obj.data[0][k].map(function(item) {
                        return { price: Number(item[0]), qty: Number(item[1]) }
                    })
                }
                event.depth = depth
            }
            return event
        }
        let geInstType = function(s) {
            if (ctx.name.indexOf("Futures_") == 0) {
                let supports = {
                    "USDT": "USDT-FUTURES",
                    "USD": "COIN-FUTURES",
                    "USDC": "USDC-FUTURES"
                        //"SUMCBL": "SUSDT-FUTURES",
                        //"SDMCBL": "SCOIN-FUTURES",
                        //"SCMCBL": "SUSDC-FUTURES"
                }
                let quotes = Object.keys(supports)
                for (let i = 0; i < quotes.length; i++) {
                    if (s.endsWith(quotes[i])) {
                        return supports[quotes[i]]
                    }
                }
            }
            return "SPOT";
        }
        
        let channels = ['books15', 'trade']
        let ws = null
        let lastPing = new Date().getTime()
        while (true) {
            if (!ws) {
                ws = MyDial("wss://ws.bitget.com/v2/ws/public")
                if (ws) {
                    let subscribes = []
                    ctx.symbols.forEach(function (symbol) {
                        channels.forEach(function (channel) {
                            subscribes.push({instType: geInstType(symbol), channel: channel, instId: symbol})
                        })
                    })
                    if (subscribes.length > 0) {                 
                        ws.write(JSON.stringify({ op: "subscribe", args: subscribes }))
                    }
                }
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "op": method, 
                    "args": channels.map(c=>({instType: geInstType(symbol), channel: c, instId: symbol}))
                }))
            })
            // every 10 seconds
            let ts = new Date().getTime()
            if (ts - lastPing > 10000) {
                ws.write("ping")
                lastPing = ts
            }
            let msg = ws.read(1000)
            if (!msg) {
                // is closed
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == "ping" || msg == "pong") {
                continue
            }
            let obj = JSON.parse(msg)
            if (obj.event == "ping") {
                ws.write(JSON.stringify({event: "ping"}))
            } else if (obj.data && obj.arg && obj.arg.channel) {
                let event = processMsg(obj)
                if (event && (event.depth || (event.trades && event.trades.length > 0))) {
                    ctx.callback(event)
                }
            } else {
                trace(msg)
            }
        }
    }

    let getFunction = function (eName:String) {
        for (let key in supports) {
            if (eName.toLowerCase().indexOf(key.toLowerCase()) != -1) {
                return supports[key]
            }
        }
    }

    this.wssPublic = function (ctx:ICtx) {
        let func = getFunction(ctx.name)
        if (!func) {
            throw "not support " + ctx.name
        }
        trace("#"+__threadId(), "init websocket for", ctx.name)
        return func(ctx)
    }
    
    function threadMarket(eName: string, symbols: string[]) {
        let trades = []
        let tradeId = 0
        let currentTid = __threadId()
        this.wssPublic({ name: eName, symbols: symbols, callback: function (event) {
            let tick = {event: 'tick', name: eName, method: ''}
            if (event.depth) {
                tick.method = event.instId+'_depth'
                __threadSetData(currentTid, tick.method, event)
            } else if (event.trades) {
                tick.method = event.instId+'_trades'
                event.trades.forEach(item=>{
                    trades.push({
                        Id: ++tradeId,
                        Time: item.ts, 
                        Price: item.price,
                        Amount: item.qty,
                        Type: item.side == 'buy' ? 0 : 1
                    })
                })
                let tradePos = __threadGetData(currentTid, "tradePos")
                if (typeof(tradePos) === 'number') {
                    while (trades.length > 0 && (trades[0].Id <= tradePos || trades.length > 1000)) {
                        trades.shift()
                    }
                }
                __threadSetData(currentTid, tick.method, trades)
            }
        }})
    }

    if (__threadId() > 0) {
        return
    }
    // init inside main thread
    let tidMap = {}
    if (typeof(main_exchanges) === 'undefined') {
        main_exchanges = exchanges
    }
    main_exchanges.forEach(function(e) {
        let markets = e.GetMarkets()
        let eName = e.GetName()

        if (typeof(getFunction(eName)) !== 'function') {
            Log(eName, "websocket driver is not implemented yet")
            return
        }

        let tid = tidMap[eName]
        let subscribe = {}
        if (typeof(tid) !== 'number') {
            tid = __Thread([threadMarket, eName, []], [$.setupWebsocket])
            tidMap[eName] = tid
        }

        let getSymbol = function(symbol) {
            if (typeof(symbol) === 'undefined') {
                symbol = e.GetCurrency()
                if (e.GetName().indexOf("Futures_") != -1) {
                    symbol += '.' + e.GetContractType()
                }
            }
            if (typeof(subscribe[symbol]) === 'undefined') {
                let info = markets[symbol]
                if (info) {
                    subscribe[symbol] = true
                    __threadPostMessage(tid, {method: 'subscribe', symbol: info.Symbol})
                }
            }
            return symbol                
        }
        let origin_GetDepth = e.GetDepth
        let origin_GetTrades = e.GetTrades
        e.GetDepth = function(symbol) : IDepth {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_depth')
                if (obj) {
                    let delay = new Date().getTime() - obj.ts
                    if (delay < 3000) {
                        let ret = {Time: obj.ts, Asks: [], Bids: [], Symbol: symbol, Info: null, Alias: info.Symbol, Ref: 'websocket', Delay: delay};
                        obj.depth.asks.forEach(item=>{ret.Asks.push({Price: item.price, Amount: item.qty})})
                        obj.depth.bids.forEach(item=>{ret.Bids.push({Price: item.price, Amount: item.qty})})
                        return ret
                    }
                }
            }
            return origin_GetDepth(symbol) ;
        }
        e.GetTrades = function(symbol) : ITrade[] {
            symbol = getSymbol(symbol)
            let info = markets[symbol]
            if (info) {
                let obj = __threadGetData(tid, info.Symbol+'_trades')
                if (obj) {
                    if (obj.length > 0) {
                        __threadSetData(tid, "tradePos", obj[obj.length-1].Id)
                    }
                    return obj
                }
            }
            return origin_GetTrades(symbol) ;
        }
    })
}

function main() {
    $.setupWebsocket()
    if (typeof(threading) === 'undefined') {
        throw "please update docker to latest version"
    }
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}