Ich zeige Ihnen, wie Sie den Market Collector aktualisieren und die benutzerdefinierte Datenquelle testen.

Schriftsteller:Gutes, Erstellt: 2020-06-06 08:53:02, aktualisiert: 2023-11-01 20:28:58

img

Vorheriger ArtikelIch zeige Ihnen, wie man einen Marktzins-Sammler implementiert.Wir haben ein Roboterprogramm implementiert, das Marktkurse zusammen sammelt.

Wie können wir Marktdaten verwenden, nachdem wir sie gesammelt haben? es wird für das Backtest-System verwendet werden. Auf der Basis der benutzerdefinierten Datenquelle Funktion des FMZ-Plattform-Backtest-Systems können wir die gesammelten Daten direkt als Datenquelle des Backtest-Systems verwenden, so dass wir das Backtest-System in jedem Markt verwenden können, in dem wir historische Daten zurückprüfen möchten.

Daher können wir dem Market Quote Collector ein Upgrade geben! lassen Sie den Market Collector auch als benutzerdefinierte Datenquelle dienen, um Daten an das Backtest-System zu liefern.

Mach dich bereit.

Es ist anders als die Vorbereitungsarbeit im letzten Artikel. Das letzte Mal war ein Docker-Programm auf meinem lokalen MAC-Computer ausgeführt, die Installation der mongodb-Datenbank, um den Datenbankservice zu starten. Diesmal haben wir die Betriebsumgebung auf VPS geändert und den Alibaba Cloud Linux-Server verwendet, um unsere Programme auszuführen.

  • Mongodb-Datenbank

Wie im vorherigen Artikel müssen wir die mongodb Datenbank auf dem Gerät installieren, auf dem das Market Collector Programm ausgeführt wird und den Service starten. Es ist im Grunde das gleiche wie die Installation von mongodb auf einem MAC-Computer. Es gibt viele Tutorials im Internet, Sie können es googeln, es ist sehr einfach.

  • Installieren Sie Python 3

Das Programm verwendet python3, achten Sie auf die Verwendung einiger Python-Bibliotheken, wenn sie nicht installiert sind, müssen Sie sie zuerst installieren.

Pymongo http Urllib

  • Docker

Ein laufender FMZ-Docker genügt.

Verwandeln Sie den Marktzitaten-Sammler

Der Sammler der Marktkurse ist dieser:https://www.fmz.com/strategy/199120(RecordsCollector) Strategie.

Lassen Sie uns einige Änderungen vornehmen:

Bevor das Programm die while-Schleife für die Datenerhebung betritt, wird eine mehrthreaded Bibliothek verwendet, und die gleichzeitige Ausführung startet einen Dienst zur Überwachung der Datenanfrage des FMZ-Plattform-Backtestsystems. (Weitere Details können ignoriert werden)

RecordsCollector (Upgrade zur Bereitstellung einer benutzerdefinierten Datenquelle)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Prüfung

Konfiguration des Roboters

img

Runnen Sie den Roboter, laufen Sie den Markt-Zitat-Sammler.

img

Öffnen Sie eine Teststrategie für Backtest.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Konfigurieren Sie die Option Backtest, setzen Sie die Börse auf Binance, weil die temporäre benutzerdefinierte Datenquelle noch keinen Austauschnamen selbst formulieren kann, Sie können nur eine der Austauschkonfigurationen in der Liste ausleihen, der Backtest zeigt, dass Binance, die tatsächliche Es sind die Daten des Simulationsmarktes von WexApp.

img

Vergleichen Sie, ob das vom Backtest-System generierte Diagramm, das auf dem Marktnoten-Sammler als benutzerdefinierte Datenquelle basiert, mit dem 1-Stunden-K-Liniendiagramm auf der WexApp-Börsenseite identisch ist.

img img

Auf diese Weise kann der Roboter auf dem VPS K-Liniendaten selbst sammeln, und wir können die gesammelten Daten jederzeit erhalten und direkt im Backtest-System backtesten.

Sie können weiterhin erweitern, zum Beispiel versuchen, die real-level-Backtest benutzerdefinierte Datenquellen, und Multi-Variety, Multi-Markt Datenerhebung und andere Funktionen.


Verwandt

Mehr