4
Подписаться
1271
Подписчики

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Создано: 2020-05-07 17:43:54, Обновлено: 2023-10-09 22:47:43
comments   15
hits   2523

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Предыдущая статьяНаучить вас шаг за шагом внедрять рыночный коллекторВместе мы внедрили программу-робот для сбора рыночных данных. Как мы используем собранные рыночные данные? Конечно, он используется для системы бэктестинга. Здесь, опираясь на функцию пользовательского источника данных системы бэктестинга количественной торговой платформы изобретателя, мы можем напрямую использовать собранные данные в качестве источника данных системы бэктестинга, так что мы можно создать систему бэктестинга. Систему тестирования можно применить к любому рынку, где мы хотим провести бэктестинг исторических данных.

Поэтому мы можем усовершенствовать «Сборщик рынка»! Рыночный коллектор также можно использовать в качестве пользовательского источника данных для предоставления данных системе бэктестинга.

Если у вас есть потребность, действуйте!

Подготовить

Подготовка отличается от предыдущей статьи. В прошлый раз я запустил хост-программу на локальном компьютере MAC, установил базу данных mongodb и запустил службу базы данных. На этот раз мы изменили операционную среду на VPS и использовали сервер Alibaba Cloud Linux для запуска нашей программы.

  • база данных mongodb

Как и в предыдущей статье, вам необходимо установить базу данных MongoDB на устройство, где запущена программа-сборщик рынка, и запустить службу. По сути, это то же самое, что и установка mongodb на компьютер MAC. В Интернете есть много руководств. Можете поискать их. Это очень просто.

  • Установить Питон 3 Программа использует Python 3. Обратите внимание, что используются некоторые библиотеки, которые необходимо установить, если они недоступны.

    • pymongo
    • http
    • urllib
  • Хозяин Просто запустите кастодиана количественной торговой платформы Inventor.

Измените «Сборщик рыночной информации»

Коллекционер рынкаRecordsCollecter (Учебник)Эта стратегия. Давайте внесем в него некоторые изменения: Прежде чем программа войдет в цикл while для сбора данных, многопоточная библиотека используется для одновременного запуска службы, которая прослушивает запросы данных от системы бэктестинга количественной торговой платформы Inventor. (Некоторые другие детали можно проигнорировать)

RecordsCollector (обновлен для предоставления функции пользовательского источника данных)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

тест

Настройте робота Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Запустите робота и запустите сборщик рыночных данных. Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Откройте тестовую стратегию и выполните бэктест, например, эту стратегию бэктеста, и протестируйте ее.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Настройте параметры бэктестинга и установите биржу Binance, поскольку пользовательский источник данных не может задать имя биржи самостоятельно в настоящее время. Вы можете использовать только биржу в списке для настройки. При бэктестинге отображается Binance. Это данные диска моделирования wexApp.

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Сравните график, созданный системой бэктестинга на основе рыночного коллектора в качестве пользовательского источника данных, и часовой график свечей на странице биржи wexApp, чтобы убедиться, что они одинаковы.

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Научите вас шаг за шагом обновлять функцию пользовательского источника данных бэктестинга рыночного коллектора.

Таким образом, робот на VPS может самостоятельно собирать данные K-line, а мы можем получить собранные данные в любое время и провести бэктестинг непосредственно в системе бэктестинга. Это всего лишь капля в море. Эксперты могут продолжать расширять это, например, поддерживая пользовательские источники данных для бэктестинга в реальном времени, поддерживая сбор многовариантных и многорыночных данных и другие функции.

Вы можете оставить сообщение.