Apprendre à mettre à niveau le collecteur de marché backtest la source de données personnalisée

Auteur:La bonté, Créé: 2020-06-06 08:53:02, Mis à jour: 2023-11-01 20:28:58

img

Article précédentVous apprendre à mettre en œuvre un collecteur de cotations de marchéNous avons mis en place un programme robotique qui collecte les cotations de marché ensemble.

Comment utiliser les données du marché après les avoir collectées? il sera utilisé pour le système de backtest. En nous appuyant sur la fonction de source de données personnalisée du système de backtest de la plate-forme FMZ, nous pouvons utiliser directement les données collectées comme source de données du système de backtest, afin que nous puissions laisser le système de backtest utiliser dans n'importe quel marché où nous voulons backtest les données historiques.

Par conséquent, nous pouvons donner le Market Quote Collector une mise à niveau! laissez le collecteur de marché peut également servir de source de données personnalisée pour fournir des données au système de backtest.

Préparez-vous

Il est différent du travail de préparation dans le dernier article. La dernière fois était un programme docker en cours d'exécution sur mon ordinateur MAC local, l'installation de la base de données mongodb pour démarrer le service de base de données. Cette fois, nous avons changé l'environnement d'exploitation à VPS et utilisé le serveur Alibaba Cloud Linux pour exécuter notre ensemble de programmes.

  • Base de données Mongodb

Comme dans l'article précédent, nous devons installer la base de données mongodb sur le périphérique où le programme collecteur de marché est en cours d'exécution et démarrer le service.

  • Installez Python 3

Le programme utilise python3, faites attention à l'utilisation de certaines bibliothèques python, si elles ne sont pas installées, vous devez d'abord les installer.

le pymongo http - Je ne sais pas

  • Docker

Un FMZ qui fonctionne suffira.

Transformez le Collecteur de devis de marché

Le collecteur de cotations de marché est celui-ci:https://www.fmz.com/strategy/199120La stratégie de l'entreprise.

Faisons quelques modifications:

Avant que le programme n'entre dans la boucle while pour collecter des données, une bibliothèque multi-threaded est utilisée et l'exécution simultanée démarre un service pour surveiller la demande de données du système de backtest de la plate-forme FMZ (d'autres détails peuvent être ignorés).

RecordsCollector (mise à niveau pour fournir une fonction de source de données personnalisée)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Test de détection

Configurer le robot

img

Faites tourner le robot, faites tourner le collecteur de devis.

img

Ouvrez une stratégie de test pour le backtest. Par exemple:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configurez l'option de backtest, définissez l'échange sur Binance car la source de données personnalisée temporaire ne peut pas encore formuler un nom d'échange par elle-même, vous ne pouvez emprunter qu'une des configurations d'échange de la liste, le backtest montre que Binance, le réel Il est les données du marché de simulation de WexApp.

img

Comparez si le graphique généré par le système de backtest basé sur le collecteur de cotations de marché en tant que source de données personnalisée est le même que le graphique K-line d'une heure sur la page d'échange de wexApp.

img img

De cette façon, le robot sur le VPS peut collecter des données de ligne K par lui-même, et nous pouvons obtenir les données collectées à tout moment et backtest directement dans le système de backtest.

Vous pouvez continuer à étendre, par exemple, essayer les sources de données personnalisées de backtest au niveau réel, et la collecte de données multi-variété, multi-marché et d'autres fonctions.


Relationnée

Plus de