3
フォロー
1444
フォロワー

ポリシーテンプレートを使用すると、WebSocketマーケットをシームレスに使用できます。

作成日:: 2024-10-30 09:49:20, 更新日:: 2024-11-05 17:45:31
comments   0
hits   1637

ポリシーテンプレートを使用すると、WebSocketマーケットをシームレスに使用できます。

これは、FMZ によって公式に開発された WebSocket マーケット テンプレートです。これをコピーしてテンプレートとして保存し、新しい戦略でこのテンプレートを選択して使用します: https://www.fmz.com/strategy/470349

WebSocket が必要な理由は何ですか?

現在、FMZ 戦略は主に従来の REST API カプセル化に基づいています。各 API アクセスでは、ポーリングを通じて市場データを取得するためにネットワーク接続を確立する必要があります。この方法はシンプルで使いやすく、ほとんどのニーズに十分対応できます。

ただし、REST プロトコルには固有のレイテンシの問題があり、複数の取引ペアや複数の交換戦略が必要な場合には、この問題が拡大します。プラットフォームのGo機能は同時に実行できますが、遅延の問題が依然として存在し、比較的高頻度の戦略取引のニーズを満たすことが困難です。さらに、取引ペアが多すぎてポーリング頻度が高すぎると、高速の場合、取引プラットフォームのアクセス頻度制限に達します。

現在、取引所のサーバーにも大きな負荷がかかっています。取引所はすべて完全な WebSocket プロトコルを提供しており、API ユーザーにそれを推奨しています。 REST プロトコルと比較して、WebSocket は永続的な双方向接続方法を提供します。これにより、交換はデータをクライアントにリアルタイムでプッシュできるため、頻繁な要求と応答が回避され、レイテンシが大幅に削減されます。一般的に、REST API にアクセスする際のレイテンシが約 20 ミリ秒の場合、WebSocket 経由でデータをプッシュする際のレイテンシは約 2 ミリ秒です。さらに、WebSocket プロトコルはプラットフォームのアクセス頻度によって制限されず、基本的に一度に数十の取引ペアをサブスクライブすることが可能です。

WebSocket 見積テンプレートの紹介

FMZ定量取引プラットフォームは長い間WebSocketプロトコルをサポートしており、呼び出しは比較的便利ですが、初心者ユーザーにとっては、複数のサブスクリプションを処理し、複数の取引所の見積もりを購読し、それらを効率的かつ便利に埋め込むことはまだ複雑すぎます。全体的な戦略プロセス。 このパブリック WebSocket リアルタイム市場データ高速化テンプレートは、この問題を解決します。非常に使いやすく、現在のカプセル化された API 呼び出しと完全に互換性があります。元の REST 戦略のほとんどでは、それらを変更して直接使用して高速化することができます。あなたの戦略。

主な特徴:

  • 複数の取引所のサポート:この戦略は、Binance、OKX、Bybit、Bitgetなどの複数の取引所のWebSocket接続をサポートしています。ユーザーはこのテンプレートのパッケージング方法を模倣して、より多くの取引所を自分でサポートすることができます。
  • カスタマイズ可能なサブスクリプション: 特定の市場チャネル (深度、取引など) へのサブスクリプションと、受信したデータの効率的な処理を可能にし、取引戦略ですぐに使用できるようにします。
  • 高度なエラー処理: エラー追跡と WebSocket 再接続メカニズムが組み込まれており、データ フローの信頼性と継続性が確保されます。

実装原則の簡単な紹介

この戦略では TypeScript を使用するので、JavaScript にしか精通していない人にとっては少し馴染みがないかもしれないことに注意してください。 TypeScript は、JavaScript に基づく型システムとより豊富な言語機能を導入します。複雑なロジックを処理する必要がある定量取引などのアプリケーションでは、TypeScript を使用すると潜在的なエラーが減り、コードの読みやすさと保守性が向上します。そのため、簡単に学習することをお勧めします。

さらに、この戦略はFMZプラットフォームの非同期メカニズムを使用し、メカニズムのサブスレッドはthreadPostMessage 関数は、メイン スレッドにメッセージを送信します。このメソッドは非同期であり、子スレッドで生成されたデータの更新をメインスレッドに通知するのに適しています。メインスレッドと子スレッドは、threadGetDataと__threadSetData 関数はデータを共有します。このアプローチにより、スレッドは共有状態にアクセスして変更できるようになります。マルチスレッドについて学習したい場合、この戦略はプラットフォームのドキュメントと組み合わせると良い学習例にもなります。

この戦略の主な原則は、WebSocket を介して主流のデジタル通貨取引所に接続し、市場データ (深度情報や取引情報など) をリアルタイムで受信して、定量的な取引決定のためのデータサポートを提供することです。具体的な実施プロセスは以下のとおりです。

1. WebSocket接続設定

setupWebsocket この関数は、WebSocket 接続を初期化し、市場データを受信するために使用されます。 1つのパラメータを受け取るmain_exchanges接続する必要がある取引所を示します。

  • MyDial 関数: WebSocket 接続を作成し、接続時間を記録し、接続が閉じられたときのクローズ時間を出力します。
  • updateSymbols 関数: 新しいサブスクリプションリクエストを定期的に確認し、必要に応じて現在の取引ペアリストを更新します。

2. データ処理

supports オブジェクトは、サポートされている交換とその処理関数(Binance)。各取引所の処理機能は、受信したメッセージを解析し、関連するデータを抽出する役割を担います。

  • processMsg 関数: 取引所からのメッセージを処理し、さまざまな種類のデータ (深度の更新、トランザクションなど) を識別し、それらを統一されたイベント オブジェクトにフォーマットします。

3. サブスクリプションデータ

各接続ごとに、システムは現在の取引ペアに基づいて関連する市場データ チャネルをサブスクライブします。

  • getFunction 関数: 交換名に応じて対応する処理関数を取得します。
  • this.wssPublic 関数: WebSocket 接続を初期化し、データの受信を開始します。

4. スレッド管理

各交換のスレッドを開始し、データをリアルタイムで受信し、コールバック関数を通じてデータを処理します。

  • threadMarket 関数: 子スレッドでデータを受信し、最新の深度とトランザクション情報を解析して保存します。

5. データ取得方法を書き直す

各取引所の深度情報や取引情報を取得するメソッドを書き直し、リアルタイムで更新されるデータを返すことを優先します。

テンプレートの使い方

  1. 初期化:使用 $.setupWebsocket() ターゲット交換への WebSocket 接続を初期化します。
  2. サブスクリプション: システムは、取引する商品の関連チャネル(深度、取引など)に自動的に登録します。
  3. データ収集: 電話をかけるGetDepth() そして GetTrades() 関数は、WebSocket のリアルタイム データを自動的に使用して、市場の深さとトランザクション レコードを返します。
  4. エラー処理: ポリシーには、接続およびデータ エラーをログに記録し、接続が失われた場合に自動的に再接続を試みる追跡メカニズムが含まれています。

ストラテジーにEventLoop()関数を追加するとトリガー機構に変更されます。wssデータが更新されると自動的に即時取得され、最新データが無い場合は待機するようになります。インテリジェントなスリープ機能に相当します。もちろん、スリープを直接使用することもできます。

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

マルチ通貨取引戦略に関する以前のガイド https://www.fmz.com/digest-topic/10506 を参照してください。ここでは、WebSocket をサポートするように簡単に変更できます。

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

自分で新しい取引所を追加する方法

戦略テンプレートに従い、次の形式を模倣し、取引所の API ドキュメントを参照してください。

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }