Научить вас обновлять рынок коллектор backtest пользовательский источник данных

Автор:Доброта, Создано: 2020-06-06 08:53:02, Обновлено: 2023-11-01 20:28:58

img

Предыдущая статьяНаучить вас внедрять рыночные котировки коллекторМы внедрили робот-программу, которая собирает рыночные котировки вместе.

Как использовать рыночные данные после того, как мы их собрали? он будет использовать для системы бэкстеста. Опираясь на пользовательскую функцию источника данных системы бэкстеста платформы FMZ, мы можем напрямую использовать собранные данные в качестве источника данных системы бэкстеста, чтобы мы могли позволить системе бэкстеста использовать на любом рынке, где мы хотим бэкстестировать исторические данные.

Поэтому мы можем дать Market Quote Collector обновление! пусть рыночный коллектор также может служить в качестве пользовательского источника данных для предоставления данных в системе backtest.

Приготовьтесь.

Это отличается от подготовки работы в предыдущей статье. Последний раз была программа докера, работающая на моем локальном компьютере MAC, устанавливая базу данных mongodb для запуска службы базы данных. На этот раз мы изменили операционную среду на VPS и использовали сервер Alibaba Cloud Linux для запуска нашего набора программ.

  • База данных Mongodb

Как и в предыдущей статье, нам нужно установить базу данных mongodb на устройстве, на котором работает программа market collector, и запустить услугу. Это в основном то же самое, что установить mongodb на компьютере MAC. В Интернете есть много учебников, вы можете поискать их в Google, это очень просто.

  • Установка Python 3

Программа использует python3, обратите внимание на использование некоторых библиотек python, если они не установлены, вам нужно сначала установить их.

пимонго http Urllib

  • Докер

Достаточно запуска FMZ докера.

Преобразуйте Сборщик рыночных котировок

Сборщик рыночных котировок:https://www.fmz.com/strategy/199120(Рекордсколлектор) стратегия.

Давайте сделаем некоторые изменения:

Перед тем, как программа входит в петлю while для сбора данных, используется многопоточная библиотека, и одновременное исполнение запускает службу для мониторинга запроса данных системы бэкстеста платформы FMZ. (Другие детали могут быть проигнорированы)

RecordsCollector (обновление для предоставления пользовательской функции источника данных)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Испытание

Конфигурировать робота

img

Запустите робота, запустите коллектор котировок.

img

Откройте стратегию тестирования для обратного тестирования. Например:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Конфигурируйте опцию обратного теста, установите биржу на Binance, потому что временный источник пользовательских данных еще не может сформулировать имя биржи самостоятельно, вы можете взять только одну из конфигураций биржи в списке, обратный тест показывает, что Binance, фактический Это данные симуляционного рынка WexApp.

img

Сравните, совпадает ли диаграмма, созданная системой обратного теста, основанной на коллекторе котировок рынка в качестве пользовательского источника данных, с графиком 1-часовой K-линии на странице обмена wexApp.

img img

Таким образом, робот на VPS может собирать данные K-линии самостоятельно, и мы можем получить собранные данные в любое время и backtest прямо в системе backtest.

Вы можете продолжать расширять, например, попробовать реальный уровень backtest пользовательские источники данных, и многообразный, многорыночный сбор данных и другие функции.


Связанные

Больше