Ensinar-lhe a atualizar o coletor de mercado backtest a fonte de dados personalizados

Autora:Bem-estar, Criado: 2020-06-06 08:53:02, Atualizado: 2023-11-01 20:28:58

img

Artigo anteriorEnsinar-lhe a implementar um coletor de cotações de mercadoNós implementamos um programa robótico que coleta cotações de mercado juntos.

Como usar os dados de mercado depois de coletá-los? ele será usado para o sistema de backtest.

Portanto, podemos dar o Market Quote Collector uma atualização! deixe o coletor de mercado também pode servir como uma fonte de dados personalizados para fornecer dados para o sistema backtest.

Preparem-se.

É diferente do trabalho de preparação no último artigo. A última vez foi um programa docker rodando em meu computador MAC local, instalando o banco de dados mongodb para iniciar o serviço de banco de dados. Desta vez, mudamos o ambiente operacional para VPS e usamos o servidor Alibaba Cloud Linux para executar nosso conjunto de programas.

  • Base de dados Mongodb

Como no artigo anterior, precisamos instalar o banco de dados mongodb no dispositivo onde o programa market collector está sendo executado e iniciar o serviço. É basicamente o mesmo que instalar mongodb em um computador MAC. Há muitos tutoriais na Internet, você pode pesquisar, é muito simples.

  • Instalar Python 3

O programa usa o python3, preste atenção ao uso de algumas bibliotecas de python, se elas não estiverem instaladas, você precisa instalá-las primeiro.

pymongo http urllib

  • Docker

Uma doca FMZ a funcionar será suficiente.

Transforme o Coletor de Cotações de Mercado

O coletor de cotações de mercado é este:https://www.fmz.com/strategy/199120(RecordsCollector) estratégia.

Vamos fazer algumas modificações:

Antes que o programa entre no loop enquanto para coletar dados, uma biblioteca multi-threaded é usada, e a execução simultânea inicia um serviço para monitorar a solicitação de dados do sistema de backtest da plataforma FMZ. (Outros detalhes podem ser ignorados)

RecordsCollector (upgrade para fornecer função de fonte de dados personalizada)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Teste

Configure o robô

img

Dirige o robô, dirige o coletor de cotações de mercado.

img

Abra uma estratégia de teste para backtest.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configure a opção de backtest, defina a troca para Binance porque a fonte de dados personalizada temporária ainda não pode formular um nome de troca por conta própria, você só pode emprestar uma das configurações de troca na lista, o backtest mostra que Binance, o real É os dados do mercado de simulação de WexApp.

img

Compare se o gráfico gerado pelo sistema de backtest baseado no coletor de cotações de mercado como uma fonte de dados personalizada é o mesmo que o gráfico de linha K de 1 hora na página de troca do wexApp.

img img

Desta forma, o robô no VPS pode coletar dados de linha K por si só, e podemos obter os dados coletados a qualquer momento e backtest diretamente no sistema de backtest.

Você pode continuar a expandir, por exemplo, tentar o backtest de nível real fontes de dados personalizados, e multi-variedade, coleta de dados multi-mercado e outras funções.


Relacionados

Mais.