Manual para ensinar você a atualizar o catador de transações para a recuperação de recursos de fontes de dados personalizadas

Autora:Sonhos pequenos, Criado: 2020-05-07 17:43:54, Atualizado: 2023-10-09 22:47:43

img

Manual para ensinar você a atualizar o catador de transações para a recuperação de recursos de fontes de dados personalizadas

Artigo anteriorA mão a mão ensina-te a implementar um coletor de transaçõesJuntos, implementamos um programa robótico para coletar o mercado, coletar dados do mercado e como usá-los em seguida. Claro, para o sistema de retorno, aqui, dependendo do recurso de fontes de dados personalizadas do sistema de retorno da plataforma de negociação quantitativa do inventor, podemos usar diretamente os dados coletados como fontes de dados do sistema de retorno, para que possamos aplicar o sistema de retorno a qualquer mercado em que queiramos retorno de dados históricos.

Assim, podemos fazer uma atualização para o "Recuperador de Mercado" para que o Recuperador de Mercado possa simultaneamente fornecer dados ao sistema de rastreamento como uma fonte de dados personalizada.

Se houver necessidade, ação!

Preparação

A última vez foi um programa administrador que estava sendo executado no meu MAC, instalado no banco de dados mongodb, para iniciar o serviço de banco de dados. Desta vez, mudamos o ambiente de execução para o VPS, usando o servidor Ali Cloud Linux para executar o nosso programa.

  • banco de dados mongodb

    Como no artigo anterior, é necessário instalar o banco de dados mongodb no dispositivo que executa o programa de coleta de transações e abrir o serviço.

  • Instalar o Python 3 O programa usa a linguagem Python3 e note que algumas bibliotecas são usadas, se não, precisam ser instaladas.

    • pymongo
    • http
    • urllib
  • Custódia A empresa é responsável pela criação de uma plataforma de negociação quantitativa.

O "Coleitor de Notícias" é transformado

O coletor de transações éRecordsCollecter (aula de ensino)A estratégia. O que é que ele está a fazer aqui? Antes de um programa entrar no ciclo de enquanto para coletar dados, um serviço é executado simultaneamente para iniciar um banco de dados de vários fios, para ouvir os pedidos de dados do sistema de retorno do inventor. (Alguns outros detalhes podem ser ignorados)

RecordsCollector (upgrade para fornecer recursos de fontes de dados personalizados)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Testes

Configurar robôsimg

O que é o que você está fazendo?img

A partir daí, você pode abrir uma política de teste e fazer um retest, por exemplo, uma política de retest como essa, e testar.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configurar a opção de retrospecção, definindo a troca como Binance porque, temporariamente, a fonte de dados personalizada ainda não pode criar um nome de troca por conta própria, e só pode usar uma configuração de troca na lista, quando a retrospecção é mostrada como Binance, que é realmente os dados do wexApp.

img

Se o gráfico gerado pelo sistema de retrospecção de contraste é idêntico ao gráfico da linha K de 1 hora na página do wexApp.

img

img

Isso permite que os robôs no VPS coletem os dados da linha K por conta própria, e nós podemos acessar os dados coletados diretamente no sistema de retesting. Além disso, você pode continuar a expandir, como o suporte ao rastreamento de fontes de dados personalizadas no nível do disco real, suporte a várias variedades, coleta de dados de vários mercados, etc.

Mas o que é que ele está a fazer?


Relacionados

Mais.

WuzhentaoE o serviço de fontes de dados personalizadas não recebeu a impressão do registro solicitado.

WuzhentaoQuando você seleciona dados personalizados para revisão, comece com uma transação por dados selecionada abaixo e como lidar com isso.

Lcgs006Como fazer isso, se você coletar dados de pares de moedas que não foram fornecidos acima, para fazer o retraso de alguns pares de moedas de pequenas variedades, como DOT_USDT, que não podem ser personalizados no retraso?

ZltimO topo

AgoraNuncaO teste de retrospecção para a escolha de fontes de dados personalizadas só suporta um par de transações?

Sonhos pequenosO "coletor de transações" mencionado nesta postagem precisa ser executado no servidor; fornece dados como função de fonte de dados personalizada para o sistema de pesquisa FMZ.

WuzhentaoAgora, depois de preencher o endereço de origem de dados personalizado, os dados de retorno abaixo não mostram novos dados.

Sonhos pequenosDepois de usar o recurso de fonte de dados personalizada, você também precisa preencher o endereço do serviço de fonte de dados personalizada no controle à direita.

Sonhos pequenosEste disco deve ser executado em um servidor com um IP externo para que a página do sistema de pesquisa possa ser acessada.

Sonhos pequenosVocê ainda não entendeu o que eu quero dizer, eu quero dizer que os dados fornecidos pela sua fonte de dados personalizada, por exemplo, são realmente do EOS_USDT, mas no FMZ, você só pode selecionar um par de transações como BTC_USDT, você pode usar os dados reais do EOS_USDT como o sistema de retorno do BTC_USDT para o FMZ.

Lcgs006O que é que é necessário para substituí-lo, ou há algum tutorial relacionado?

Sonhos pequenosO preço é o que você coleta, não é o mesmo que trocar nomes.

Sonhos pequenosO programa de serviço fornecido ao fonte de dados pode escrever várias transações diferentes sobre os dados fornecidos, e o sistema de retrospecção irá chamar o que for necessário.