Type/to search
3
Follow
1505
Followers
一个策略模板让你无缝使用WebSocket行情
Discussions
Created 2024-10-30 09:49:20  Updated 2024-11-05 17:45:31
 0
 2162

img

这是FMZ官方开发的一个WebSocket行情模板,复制另存为模板,在新策略中勾选这个模板就可以使用:https://www.fmz.com/strategy/470349

为什么需要WebSocket

目前FMZ策略主要是传统的REST API封装,每一步API访问都要建立一个网络连接,通过轮询的方式获取市场数据。这种方法简单易用,对于大部分需求,这样做完全足够。

但是,REST协议有固有的延迟问题,当需要多交易对、多交易所策略时,延时问题会被放大。虽然用平台的Go函数可以并发执行,但延时问题依旧存在,难以满足相对高频一点的策略交易的需求,并且交易对太多,轮询频率过快还会遇到交易平台的访问频率限制。

目前交易所的服务器负担也很重,都提供了完善的WebSocket协议,并且推荐API用户使用。相对REST协议,WebSocket提供了一种持久化的双向连接方式,使得交易所能够实时推送数据到客户端,避免了频繁的请求和响应,极大地降低了延迟。一般来说,如果访问REST API的延时在20ms左右,WebSocket的推送数据大概延时2ms左右。并且链接WebSocket协议不受平台访问频率限制,基本上可以做到一次订阅几十个交易对。

WebSocket行情模板介绍

FMZ量化交易平台很早就支持了WebSocket协议,并且相对方便的调用,但对于新手用户来说,处理多个订阅,订阅多个交易所行情,并且高效方便的嵌入整个策略流程中,还是过于复杂。
这个公开的WebSocket实时行情数据加速模板,就解决了这个痛点,非常易用,完全和当前封装的API调用兼容,对于大部分原来的REST策略策略,简单改动直接使用,加速你的策略。

主要特点:

  • 多交易所支持:该策略支持币安、OKX、Bybit、Bitget等多个交易所的WebSocket连接,用户自己可以仿照这个模板的封装方法,自己支持更多的交易所。
  • 可定制订阅:允许订阅特定市场频道(如深度、交易等),并高效处理接收的数据,供交易策略即时使用。
  • 高级错误处理:内置错误跟踪和WebSocket重连机制,确保数据流的可靠性和持续性。

实现原理简单介绍

注意到这个策略使用了TypeScript,如果你只是熟悉在JavaScript的话,看起来会有点陌生。 TypeScript 在JavaScript 的基础上引入了类型系统和更丰富的语言特性,对于量化交易等需要处理复杂逻辑的应用,使用 TypeScript 可以减少潜在的错误,提高代码的可读性和可维护性。因此推荐可以简单的学习下。

另外策略使用了FMZ平台的异步机制,机制子线程可以通过 __threadPostMessage 函数向主线程发送消息。这种方式是异步的,适用于在子线程中产生的数据更新通知主线程。主线程和子线程之间可以通过 __threadGetData 和 __threadSetData 函数共享数据。这种方式允许线程访问和修改共享的状态。如果你想学习下多线程,结合平台文档,这个策略也是一个很好的学习范例。

这个策略的主要原理是通过WebSocket连接主流数字货币交易所,实时接收市场数据(如深度信息和交易信息),以便为量化交易决策提供数据支持。具体实现流程如下:

1.WebSocket 连接设置

setupWebsocket 函数用于初始化WebSocket连接,接收市场数据。它接收一个参数 main_exchanges,表示需要连接的交易所。

  • MyDial 函数:创建WebSocket连接,记录连接时间,并在关闭连接时输出关闭时间。
  • updateSymbols 函数:定时检查是否有新的订阅请求,并根据需要更新当前的交易对列表。

2. 数据处理

supports 对象定义了支持的交易所及其处理函数(如 Binance)。每个交易所的处理函数负责解析接收到的消息并提取相关数据。

  • processMsg 函数:处理来自交易所的消息,识别不同类型的数据(如深度更新、交易等),并格式化为统一的事件对象。

3. 订阅数据

在每次连接时,系统会根据当前的交易对订阅相关的市场数据通道。

  • getFunction 函数:根据交易所名称获取相应的处理函数。
  • this.wssPublic 函数:初始化WebSocket连接并启动数据接收。

4. 线程管理

为每个交易所启动一个线程,实时接收数据并通过回调函数处理数据。

  • threadMarket 函数:在子线程中接收数据,解析并存储最新的深度和交易信息。

5. 数据获取方法重写

为每个交易所重写获取深度和交易信息的方法,优先返回实时更新的数据。

模板使用方法

  1. 初始化:使用 $.setupWebsocket() 初始化目标交易所的WebSocket连接。
  2. 订阅:系统会自动为你交易的品种订阅相关频道(如深度、交易等)。
  3. 数据获取:通过调用 GetDepth()GetTrades() 函数,自动使用WebSocket实时数据进行市场深度和交易记录的返回。
  4. 错误处理:策略包括一个追踪机制,用于记录连接和数据错误,并在连接中断时自动尝试重新连接。

如果策略中加入EventLoop()函数,会改成触发机制,当有wss数据更新时会自动立即获取,没有最新数据就等待。相当于智能的Sleep函数,当然也可以直接使用Sleep。

function main() { $.setupWebsocket() while (true) { exchanges.map(e=>{ Log(e.GetName(), e.GetDepth()) Log(e.GetName(), e.GetTrades()) }) EventLoop(100) // trigger by websocket } }

参考我的上一篇多币种交易策略指南 https://www.fmz.com/digest-topic/10506 ,这里可以非常方便的对其进行改造支持WebSocket:

function MakeOrder() { for (let i in Info.trade_symbols) { let symbol = Info.trade_symbols[i]; let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price; let buy_amount = 50 / buy_price; if (Info.position[symbol].value < 2000){ Trade(symbol, "buy", buy_price, buy_amount, symbol); } } } function OnTick() { try { UpdatePosition(); MakeOrder(); UpdateStatus(); } catch (error) { Log("循环出错: " + error); } } function main() { $.setupWebsocket() InitInfo(); while (true) { let loop_start_time = Date.now(); if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) { OnTick(); Info.time.last_loop_time = Date.now(); Info.time.loop_delay = Date.now() - loop_start_time; } Sleep(5); } }

如何自己添加新的交易所

只要按照策略的模板,自己仿照下面的格式,自己参考交易所API文档即可:

supports["Binance"] = function (ctx:ICtx) { let processMsg = function (obj) { let event = { ts: obj.E, instId: obj.s, depth: null, trades: [], } if (obj.e == "depthUpdate") { let depth = { asks: [], bids: [] } obj.b.forEach(function (item) { depth.bids.push({ price: Number(item[0]), qty: Number(item[1]) }) }) obj.a.forEach(function (item) { depth.asks.push({ price: Number(item[0]), qty: Number(item[1]) }) }) event.depth = depth } else if (obj.e == 'bookTicker') { event.depth = { asks: [{ price: Number(obj.a), qty: Number(obj.A) }], bids: [{ price: Number(obj.b), qty: Number(obj.B) }] } } else if (obj.e == 'aggTrade') { event.ts = obj.E event.trades = [{ price: Number(obj.p), qty: Number(obj.q), ts: obj.T, side: obj.m ? "sell" : "buy" }] } else if (typeof (obj.asks) !== 'undefined') { event.ts = obj.E || new Date().getTime() let depth = { asks: [], bids: [] } obj.bids.forEach(function (item) { depth.bids.push({ price: Number(item[0]), qty: Number(item[1]) }) }) obj.asks.forEach(function (item) { depth.asks.push({ price: Number(item[0]), qty: Number(item[1]) }) }) event.depth = depth } else { return } return event } let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"] let ws = null let endPoint = "wss://stream.binance.com/stream" if (ctx.name == "Futures_Binance") { endPoint = "wss://fstream.binance.com/stream" } while (true) { if (!ws) { let subscribes = [] ctx.symbols.forEach(function (symbol) { channels.forEach(function (channel) { subscribes.push(symbol.toLowerCase() + "@" + channel) }) }) ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : "")) } if (!ws) { Sleep(1000) continue } updateSymbols(ctx, function(symbol:string, method:string) { ws.write(JSON.stringify({ "method": method.toUpperCase(), "params": channels.map(c=>symbol.toLowerCase()+'@'+c), "id": 2 })) }) let msg = ws.read(1000) if (!msg) { if (msg == "") { trace("websocket is closed") ws.close() ws = null } continue } if (msg == 'ping') { ws.write('pong') } else if (msg == 'pong') { } else { let obj = JSON.parse(msg) if (obj.error) { trace(obj.error.msg, "#ff0000") continue } if (!obj.stream) { continue } if (obj.stream.indexOf("depth") != -1) { if (typeof(obj.data.s) !== 'string') { // patch obj.data.s = obj.stream.split('@')[0].toUpperCase() } } let event = processMsg(obj.data) if (event) { ctx.callback(event) } } } }
Comment
All comments (0)
No data
No data
  • 1
iPhone Download
Forums
PINE Language
© 2015 - ∞ INVENTOR PTE LTD (SG)