Hands-on teaches you how to upgrade to a custom data source to retrieve custom data.

Author: The Little Dream, Created: 2020-05-07 17:43:54, Updated: 2023-10-09 22:47:43

img

Hands-on teaches you how to upgrade to a custom data source to retrieve custom data.

Previous articleHands-on teaches you how to implement a data collectorTogether we implemented a robotic program that collects market data and uses it for what purpose? Of course, for the retrospective system, where we rely on the custom data source functionality of the inventor's quantitative trading platform retrospective system, we can directly use the data collected as the data source of the retrospective system, so that we can apply the retrospective system to any market where we want to retrospective historical data.

So we can do an upgrade to the Market Collector! so that the Market Collector can also provide data to the retesting system as a custom data source.

If there is a need, get involved!

Get ready

The last time I installed Mongodb database and started the database service, it was a different preparation from the one in the previous article. This time we switched the running environment to a VPS and used Ali Cloud Linux server to run our program.

  • The mongodb database

    As in the previous article, it is necessary to install the Mongodb database on the device running the Market Collector program and open the service. There are many tutorials on the Internet, which are very simple to search.

  • Installing Python 3 The program uses the Python3 language, note that some libraries are used, if not, they need to be installed.

    • pymongo
    • http
    • urllib
  • The Trustee In addition, the inventor is the custodian of the inventor's quantitative trading platform.

This is the first time that the project has been implemented.

This is a data collection tool.RecordsCollector (teaching)This tactic. Let's make some changes to it: Before the program enters the whilst loop of collecting data, a service is launched simultaneously using a multi-threaded library to listen to the inventor's quantitative trading platform retrieval system data requests. (Other details may be ignored)

RecordsCollector (upgraded to provide custom data source functionality)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Testing

Configuring the robotimg

He is also the author of a book on the history of the country.img

Open a test policy and run a retest, such as this retest policy, test.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Configure the check-back option, set the exchange to Binance because the temporary custom data source can not yet develop an exchange name itself, only to borrow from an exchange configuration in the list, when the check-back shows Binance, actually the data of the wexApp analog disk.

img

Whether the chart generated by the benchmarking system as a custom data source is the same as the one-hour K-line chart on the wexApp exchange page.

img

img

This allows the robots on the VPS to collect the K-line data themselves, and we can access the collected data directly in the retesting system at any time. In addition to this, you can continue to expand, such as supporting custom data sources at the disk level, supporting multi-varieties, multi-market data collection, etc.

Welcome to the comments section.


Related

More

wuzhentaoAnd the custom database service has not received a print of the log request.

wuzhentaoWhen you're reviewing, select custom data, and start with the transaction-to-data checkbox below, which is how I'm going to handle this situation.

lcgs006If you collect data for a currency pair that is not provided above, and you want to retest a small variety of currency pairs, such as DOT_USDT, which cannot be customized when retested, then how do you do it?

zltimThe top

NoworNeverIf the retest selects a custom data source, does it support only one transaction pair?

The Little DreamThe "transaction collector" mentioned in this post is required to run on the server. It provides data as a custom data source function to the FMZ feedback system.

wuzhentaoAll of them, now that the custom data source address has been filled in, the following retrieval data does not show any new data. What is the vps service?

The Little DreamAfter using the custom data source function, you will also need to fill in the service address of the custom data source in the right-hand panel.

The Little DreamThis disk must be running on a server with an external IP address for the retrieval system page to be accessed.

The Little DreamYou don't understand what I mean, I mean the data provided by your custom data source, for example, is actually EOS_USDT, but on FMZ you can only select a transaction pair like BTC_USDT, you take the actual data of EOS_USDT as the data provided by BTC_USDT to the FMZ feedback system.

lcgs006Where does it need to be replaced, or is there a tutorial?

The Little DreamInstead of trading in names, you can trade in data for the price of the data you collect.

The Little DreamThis service program provides a data source with several different transaction types for the data, which the retrieval system calls itself.