avatar of 发明者量化-小小梦 发明者量化-小小梦
focar em Mensagem privada
4
focar em
1271
Seguidores

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Criado em: 2020-05-07 17:43:54, atualizado em: 2023-10-09 22:47:43
comments   15
hits   2523

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Artigo anteriorEnsinamos passo a passo como implementar um coletor de mercadoJuntos, implementamos um programa de robô para coletar dados de mercado. Como usamos os dados de mercado coletados? Claro, ele é usado para o sistema de backtesting. Aqui, confiando na função de fonte de dados personalizada do sistema de backtesting da plataforma de negociação quantitativa do inventor, podemos usar diretamente os dados coletados como a fonte de dados do sistema de backtesting, para que possamos pode fazer o sistema de backtesting O sistema de teste pode ser aplicado a qualquer mercado onde queremos fazer backtest de dados históricos.

Portanto, podemos atualizar o “Market Collector”! O coletor de mercado também pode ser usado como uma fonte de dados personalizada para fornecer dados ao sistema de backtesting.

Se você tem uma necessidade, tome uma atitude!

Preparar

Os preparativos são diferentes do último artigo. Da última vez, executei o programa host no meu computador MAC local, instalei o banco de dados mongodb e iniciei o serviço de banco de dados. Desta vez, mudamos o ambiente operacional para VPS e usamos o servidor Alibaba Cloud Linux para executar nosso programa.

  • banco de dados mongodb

Assim como no artigo anterior, você precisa instalar o banco de dados MongoDB no dispositivo onde o programa coletor de mercado é executado e iniciar o serviço. É basicamente o mesmo que instalar o mongodb em um computador MAC. Há muitos tutoriais na Internet. Você pode procurá-los. É muito simples.

  • Instalar Python 3 O programa usa Python 3. Observe que algumas bibliotecas são usadas e precisam ser instaladas caso não estejam disponíveis.

    • pymongo
    • http
    • urllib
  • Hospedar Basta executar um custodiante da plataforma de negociação quantitativa Inventor.

Modifique o “Coletor de Informações de Mercado”

Colecionador de MercadoRecordsCollecter (Tutorial)Esta estratégia. Vamos fazer algumas alterações: Antes que o programa entre no loop while para coletar dados, uma biblioteca multithread é usada para iniciar simultaneamente um serviço para escutar solicitações de dados do sistema de backtesting da Plataforma de Negociação Quantitativa do Inventor. (Alguns outros detalhes podem ser ignorados)

RecordsCollecter (atualizado para fornecer função de fonte de dados personalizada)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

teste

Configurar o robô Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Execute o robô e execute o coletor de mercado. Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Abra uma estratégia de teste e execute um backtest, como esta estratégia de backtest, e teste-a.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configure as opções de backtesting e defina a exchange para Binance porque a fonte de dados personalizada não pode definir um nome de exchange por enquanto. Você só pode usar uma exchange na lista. Ao fazer backtesting, Binance é exibido. São os dados do disco de simulação wexApp .

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Compare o gráfico gerado pelo sistema de backtesting com base no coletor de mercado como uma fonte de dados personalizada e o gráfico de velas de 1 hora na página de câmbio do wexApp para ver se eles são iguais.

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Ensina passo a passo como atualizar a função de fonte de dados personalizada de backtest do coletor de mercado

Dessa forma, o robô no VPS pode coletar os dados da linha K sozinho, e podemos obter os dados coletados a qualquer momento e fazer backtest diretamente no sistema de backtesting. Isso é apenas uma gota no oceano. Os especialistas podem continuar a expandir isso, por exemplo, dando suporte a fontes de dados personalizadas para backtesting em tempo real, dando suporte à coleta de dados multi-variedade e multi-mercado e outras funções.

Bem-vindo para deixar uma mensagem.