カスタムデータソースをバックテストする

作者: リン・ハーン優しさ, 作成日: 2020-06-06 08:53:02, 更新日: 2023-11-01 20:28:58

img

前の記事市場 コート 収集器の実装を教えます市場 コートを集める ロボットプログラムも導入しました

市場データを収集した後にどのように使用する? それはバックテストシステムで使用されます. FMZプラットフォームバックテストシステムのカスタムデータソース機能に依存して,私たちは直接収集されたデータをバックテストシステムのデータソースとして使用することができ,バックテストシステムが過去のデータをバックテストしたい市場で使用することができます.

バックテストシステムにデータを提供するためのカスタムデータソースとしても機能できます.

準備を

前の記事の準備作業とは違います. 前回は,私のローカルMACコンピュータで実行されているdockerプログラムで,データベースサービスを起動するためにmongodbデータベースをインストールしました. 今回は,オペレーティング環境をVPSに変更し,私たちのプログラムセットを実行するためにAlibaba Cloud Linuxサーバを使用しました.

  • モンゴドブデータベース

前の記事と同様に, market collector プログラムが実行されているデバイスに mongodb データベースをインストールしてサービスを起動する必要があります. これは基本的に MAC コンピュータに mongodb をインストールするのと同じです. インターネットには多くのチュートリアルがあります.それをグーグルで検索できます.非常に簡単です.

  • Python 3 をインストールする

このプログラムは python3 を使っています.いくつかの python ライブラリの使用に注意してください.インストールされていない場合は,まずインストールする必要があります.

パイモンゴ http urllib

  • ドッカー

FMZのドーカーが稼働すれば十分だ

市場コートコレクターを変えて

市場コート収集者はこうですhttps://www.fmz.com/strategy/199120(レコードコレクター) 戦略

修正をいくつかします.

プログラムがデータ収集のための while ループに入れる前に,マルチスレッドライブラリが使用され,同時実行が FMZ プラットフォームバックテストシステムのデータ要求を監視するサービスを開始します. (その他の詳細は無視できます)

RecordsCollector (カスタムデータソース機能を提供するアップグレード)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    
    # Connect to the database service, service address mongodb://127.0.0.1:27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

テスト

ロボットを設定する

img

ロボットや市場価格収集機を 動かして

img

バックテストのテスト戦略を開きます.例えば:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

バックテストオプションを設定し,一時的なカスタムデータソースがまだ自力で交換名を構成できないため,バックテストはBinance,実際のWexAppのシミュレーション市場のデータを示します.

img

標準データソースとして市場報名集計器に基づいてバックテストシステムで生成されたチャートが wexApp 交換ページの1時間K線チャートと同じかどうかを比較する.

img img

この方法でVPS上のロボットは K線データを自力で収集し 収集されたデータをいつでも入手し バックテストシステムで直接バックテストすることができます

リアルレベルのバックテストのカスタムデータソースや 多種多種市場データ収集なども試すことができます


関連性

もっと