3
focar em
1444
Seguidores

Um modelo de política permite que você use o mercado WebSocket perfeitamente

Criado em: 2024-10-30 09:49:20, atualizado em: 2024-11-05 17:45:31
comments   0
hits   1637

Um modelo de política permite que você use o mercado WebSocket perfeitamente

Este é um modelo de mercado WebSocket desenvolvido oficialmente pela FMZ. Copie e salve como um modelo, então selecione este modelo na nova estratégia para usá-lo: https://www.fmz.com/strategy/470349

Por que precisamos do WebSocket?

Atualmente, a estratégia FMZ é baseada principalmente no encapsulamento REST API tradicional. Cada acesso à API requer o estabelecimento de uma conexão de rede e a obtenção de dados de mercado por meio de polling. Este método é simples e fácil de usar, e é suficiente para a maioria das necessidades.

No entanto, o protocolo REST tem problemas inerentes de latência, que serão ampliados quando vários pares de negociação e várias estratégias de câmbio forem necessários. Embora as funções Go da plataforma possam ser executadas simultaneamente, o problema de atraso ainda existe, o que dificulta atender às necessidades de negociação de estratégia de frequência relativamente alta. Além disso, se houver muitos pares de negociação e a frequência de votação for muito rápido, ele encontrará o limite de frequência de acesso da plataforma de negociação. .

Atualmente, os servidores das exchanges também estão sob grande carga. Todos eles fornecem um protocolo WebSocket completo e o recomendam aos usuários da API. Comparado com o protocolo REST, o WebSocket fornece um método de conexão bidirecional persistente, que permite que a troca envie dados ao cliente em tempo real, evitando solicitações e respostas frequentes e reduzindo bastante a latência. Em termos gerais, se a latência de acesso à API REST for em torno de 20 ms, a latência de envio de dados via WebSocket será em torno de 2 ms. Além disso, o protocolo WebSocket não é limitado pela frequência de acesso da plataforma, sendo basicamente possível assinar dezenas de pares de negociação ao mesmo tempo.

Introdução ao modelo de citação do WebSocket

A plataforma de negociação quantitativa FMZ oferece suporte ao protocolo WebSocket há muito tempo e é relativamente conveniente de chamar, mas para usuários novatos, ainda é muito complicado lidar com várias assinaturas, assinar várias cotações de câmbio e incorporá-las de forma eficiente e conveniente no todo o processo estratégico. . Este modelo público de aceleração de dados de mercado em tempo real do WebSocket resolve esse problema. É muito fácil de usar e totalmente compatível com as chamadas de API encapsuladas atuais. Para a maioria das estratégias REST originais, você pode simplesmente modificá-las e usá-las diretamente para acelerar sua estratégia.

Principais características:

  • Suporte para múltiplas trocas:Esta estratégia suporta conexões WebSocket de várias exchanges, como Binance, OKX, Bybit, Bitget, etc. Os usuários podem imitar o método de empacotamento deste modelo para oferecer suporte a mais exchanges.
  • Assinatura personalizável: Permite a assinatura de canais de mercado específicos (como profundidade, negociação, etc.) e o processamento eficiente dos dados recebidos para uso imediato em estratégias de negociação.
  • Tratamento avançado de erros: Rastreamento de erros integrado e mecanismo de reconexão WebSocket para garantir a confiabilidade e a continuidade do fluxo de dados.

Uma breve introdução ao princípio de implementação

Observe que essa estratégia usa TypeScript, o que pode parecer um pouco estranho se você estiver familiarizado apenas com JavaScript. TypeScript introduz um sistema de tipos e recursos de linguagem mais ricos com base em JavaScript. Para aplicações como negociação quantitativa que precisam processar lógica complexa, usar TypeScript pode reduzir erros potenciais e melhorar a legibilidade e a manutenibilidade do código. Portanto, é recomendável aprendê-lo de forma simples.

Além disso, a estratégia utiliza o mecanismo assíncrono da plataforma FMZ, e o sub-thread do mecanismo pode serA função threadPostMessage envia uma mensagem para o thread principal. Este método é assíncrono e é adequado para notificar o thread principal sobre atualizações de dados geradas no thread filho. O thread principal e o thread filho podem ser conectados através dethreadGetData e__A função threadSetData compartilha dados. Essa abordagem permite que threads acessem e modifiquem o estado compartilhado. Se você quiser aprender sobre multithreading, essa estratégia também é um bom exemplo de aprendizado em conjunto com a documentação da plataforma.

O princípio principal dessa estratégia é conectar-se às principais bolsas de moedas digitais por meio do WebSocket e receber dados de mercado (como informações de profundidade e informações de transações) em tempo real para fornecer suporte de dados para decisões de negociação quantitativa. O processo específico de implementação é o seguinte:

1. Configurações de conexão WebSocket

setupWebsocket Esta função é usada para inicializar uma conexão WebSocket e receber dados de mercado. Ele recebe um parâmetromain_exchanges, indicando a troca que precisa ser conectada.

  • MyDial função: Crie uma conexão WebSocket, registre o tempo de conexão e gere a hora de fechamento quando a conexão for encerrada.
  • updateSymbols função: Verifique regularmente se há novas solicitações de assinatura e atualize a lista atual de pares de negociação conforme necessário.

2. Processamento de dados

supports O objeto define as trocas suportadas e suas funções de processamento (comoBinance). A função de processamento de cada exchange é responsável por analisar as mensagens recebidas e extrair dados relevantes.

  • processMsg função: Processe mensagens de trocas, identifique diferentes tipos de dados (como atualizações de profundidade, transações, etc.) e formate-os em objetos de eventos unificados.

3. Dados de assinatura

Em cada conexão, o sistema assinará os canais de dados de mercado relevantes com base no par de negociação atual.

  • getFunction função: Obtenha a função de processamento correspondente de acordo com o nome da troca.
  • this.wssPublic função: Inicialize a conexão WebSocket e comece a receber dados.

4. Gerenciamento de threads

Inicie um thread para cada troca, receba dados em tempo real e processe os dados por meio de funções de retorno de chamada.

  • threadMarket função: Receba dados no thread filho, analise e armazene as informações mais recentes sobre profundidade e transação.

5. Reescreva o método de aquisição de dados

Reescreva os métodos para obter informações de profundidade e negociação para cada bolsa, dando prioridade ao retorno de dados atualizados em tempo real.

Como usar o modelo

  1. inicialização:usar $.setupWebsocket() Inicialize a conexão WebSocket com a troca de destino.
  2. subscrição: O sistema se inscreverá automaticamente nos canais relevantes (como profundidade, negociação, etc.) para os produtos que você negocia.
  3. Aquisição de dados: Por chamadaGetDepth() e GetTrades() Função que usa automaticamente dados em tempo real do WebSocket para retornar profundidade de mercado e registros de transações.
  4. Tratamento de erros: A política inclui um mecanismo de rastreamento que registra erros de conexão e dados e tenta reconectar automaticamente se a conexão for perdida.

Se a função EventLoop() for adicionada à estratégia, ela será alterada para um mecanismo de gatilho. Quando os dados wss forem atualizados, eles serão obtidos automaticamente imediatamente e, se não houver dados mais recentes, eles esperarão. É equivalente a uma função Sleep inteligente. Claro, você também pode usar Sleep diretamente.

function main() {
    $.setupWebsocket()
    while (true) {
        exchanges.map(e=>{
            Log(e.GetName(), e.GetDepth())
            Log(e.GetName(), e.GetTrades())
        })
        EventLoop(100) // trigger by websocket
    }
}

Consulte meu guia anterior sobre estratégias de negociação multimoeda https://www.fmz.com/digest-topic/10506, onde ele pode ser facilmente modificado para oferecer suporte ao WebSocket:

function MakeOrder() {
    for (let i in Info.trade_symbols) {
        let symbol = Info.trade_symbols[i];
        let buy_price = exchange.GetDepth(symbol + '_USDT').Asks[0].Price;
        let buy_amount = 50 / buy_price;
        if (Info.position[symbol].value < 2000){
            Trade(symbol, "buy", buy_price, buy_amount, symbol);
        }
    }
}

function OnTick() {
    try {
        UpdatePosition();
        MakeOrder();
        UpdateStatus();
    } catch (error) {
        Log("循环出错: " + error);
    }
}

function main() {
    $.setupWebsocket()
    InitInfo();
    while (true) {
        let loop_start_time = Date.now();
        if (Date.now() - Info.time.last_loop_time > Info.interval * 1000) {
            OnTick();
            Info.time.last_loop_time = Date.now();
            Info.time.loop_delay = Date.now() - loop_start_time;
        }
        Sleep(5);
    }
}

Como adicionar uma nova bolsa você mesmo

Basta seguir o modelo de estratégia, imitar o seguinte formato e consultar a documentação da API de câmbio:

    supports["Binance"] = function (ctx:ICtx) {
        let processMsg = function (obj) {
            let event = {
                ts: obj.E,
                instId: obj.s,
                depth: null,
                trades: [],
            }

            if (obj.e == "depthUpdate") {
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.b.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.a.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else if (obj.e == 'bookTicker') {
                event.depth = {
                    asks: [{ price: Number(obj.a), qty: Number(obj.A) }],
                    bids: [{ price: Number(obj.b), qty: Number(obj.B) }]
                }
            } else if (obj.e == 'aggTrade') {
                event.ts = obj.E
                event.trades = [{
                    price: Number(obj.p),
                    qty: Number(obj.q),
                    ts: obj.T,
                    side: obj.m ? "sell" : "buy"
                }]
            } else if (typeof (obj.asks) !== 'undefined') {
                event.ts = obj.E || new Date().getTime()
                let depth = {
                    asks: [],
                    bids: []
                }
                obj.bids.forEach(function (item) {
                    depth.bids.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                obj.asks.forEach(function (item) {
                    depth.asks.push({
                        price: Number(item[0]),
                        qty: Number(item[1])
                    })
                })
                event.depth = depth
            } else {
                return
            }
            return event
        }
        let channels = ["depth20@100ms", /*"bookTicker", */"aggTrade"]
 
        let ws = null
        let endPoint = "wss://stream.binance.com/stream"
        if (ctx.name == "Futures_Binance") {
            endPoint = "wss://fstream.binance.com/stream"
        }
        
        while (true) {
            if (!ws) {
                let subscribes = []
                ctx.symbols.forEach(function (symbol) {
                    channels.forEach(function (channel) {
                        subscribes.push(symbol.toLowerCase() + "@" + channel)
                    })
                })
                ws = MyDial(endPoint + (subscribes.length > 0 ? ("?streams=" + subscribes.join("/")) : ""))
            }
            if (!ws) {
                Sleep(1000)
                continue
            }
            updateSymbols(ctx, function(symbol:string, method:string) {
                ws.write(JSON.stringify({ 
                    "method": method.toUpperCase(), 
                    "params": channels.map(c=>symbol.toLowerCase()+'@'+c),
                    "id": 2
                }))
            })
            let msg = ws.read(1000)
            if (!msg) {
                if (msg == "") {
                    trace("websocket is closed")
                    ws.close()
                    ws = null
                }
                continue
            }
            if (msg == 'ping') {
                ws.write('pong')
            } else if (msg == 'pong') {

            } else {
                let obj = JSON.parse(msg)
                if (obj.error) {
                    trace(obj.error.msg, "#ff0000")
                    continue
                }
                if (!obj.stream) {
                    continue
                }
                if (obj.stream.indexOf("depth") != -1) {
                    if (typeof(obj.data.s) !== 'string') {
                        // patch
                        obj.data.s = obj.stream.split('@')[0].toUpperCase()
                    }
                }
                let event = processMsg(obj.data)
                if (event) {
                    ctx.callback(event)
                }
            }
        }
    }