avatar of 发明者量化-小小梦 发明者量化-小小梦
Seguir Mensajes Privados
4
Seguir
1271
Seguidores

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Creado el: 2020-05-07 17:43:54, Actualizado el: 2023-10-09 22:47:43
comments   15
hits   2523

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Artículo anteriorTe enseñamos paso a paso a implementar un colector de mercadoJuntos hemos implementado un programa robótico para recopilar datos de mercado. ¿Cómo utilizamos los datos de mercado recopilados? Por supuesto, se utiliza para el sistema de backtesting. Aquí, confiando en la función de fuente de datos personalizada del sistema de backtesting de la plataforma de comercio cuantitativo del inventor, podemos utilizar directamente los datos recopilados como fuente de datos del sistema de backtesting, de modo que Puede realizar el sistema de backtesting El sistema de pruebas se puede aplicar a cualquier mercado donde queramos realizar backtesting de datos históricos.

¡Por lo tanto, podemos actualizar el “Recolector de mercado”! El recopilador de mercado también se puede utilizar como fuente de datos personalizada para proporcionar datos al sistema de backtesting.

Si tienes una necesidad, ¡toma acción!

Preparar

Los preparativos son diferentes a los del último artículo. La última vez, ejecuté el programa host en mi computadora MAC local, instalé la base de datos mongodb e inicié el servicio de base de datos. Esta vez cambiamos el entorno operativo a VPS y utilizamos el servidor Linux de Alibaba Cloud para ejecutar nuestro programa.

  • base de datos mongodb

Al igual que en el artículo anterior, es necesario instalar la base de datos MongoDB en el dispositivo donde se ejecuta el programa recopilador de mercado e iniciar el servicio. Es básicamente lo mismo que instalar mongodb en una computadora MAC. Hay muchos tutoriales en Internet. Puedes buscarlos. Es muy sencillo.

  • Instalar Python 3 El programa utiliza Python 3. Tenga en cuenta que se utilizan algunas bibliotecas y es necesario instalarlas si no están disponibles.

    • pymongo
    • http
    • urllib
  • Anfitrión Simplemente ejecute un custodio de la plataforma de comercio cuantitativo Inventor.

Modificar el “Recopilador de información de mercado”

Coleccionista de mercadoRecordsCollecter (Tutorial)Esta estrategia. Vamos a hacerle algunos cambios: Antes de que el programa ingrese al bucle while para recopilar datos, se utiliza una biblioteca multiproceso para iniciar simultáneamente un servicio para escuchar las solicitudes de datos del sistema de pruebas retrospectivas de la Plataforma de Comercio Cuantitativo de Inventor. (Algunos otros detalles pueden ignorarse)

RecordsCollector (actualizado para proporcionar una función de fuente de datos personalizada)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

prueba

Configurar el robot Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Ejecute el robot y ejecute el recolector de mercado. Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Abra una estrategia de prueba y realice una prueba retrospectiva, como esta estrategia de prueba retrospectiva, y pruébela.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configure las opciones de backtesting y establezca el exchange en Binance porque la fuente de datos personalizada no puede establecer un nombre de exchange por sí sola por el momento. Solo puede usar un exchange en la lista para la configuración. Binance se muestra durante el backtesting. Son los datos del disco de simulación wexApp.

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Compare el gráfico generado por el sistema de backtesting basado en el recopilador de mercado como fuente de datos personalizada y el gráfico de velas de 1 hora en la página de intercambio de wexApp para ver si son iguales.

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

Le enseñaremos paso a paso cómo actualizar la función de fuente de datos personalizada de backtest del recopilador de mercado

De esta manera, el robot en el VPS puede recopilar los datos de la línea K por sí mismo, y podemos obtener los datos recopilados en cualquier momento y realizar pruebas retrospectivas directamente en el sistema de backtesting. Esto es solo una gota en el océano. Los expertos pueden seguir ampliando este aspecto, por ejemplo, admitiendo fuentes de datos personalizadas para pruebas retrospectivas en tiempo real, admitiendo la recopilación de datos de múltiples variedades y mercados y otras funciones.

Bienvenido a dejar un mensaje.