Loading ...

Teach you to upgrade the market collector backtest the custom data source

Author: , Created: 2020-06-06 08:53:02, Updated:

Previous article Teach you to implement a market quotes collector taught you how to implement a market collector. We have implemented a robot program that collects market quotes together.

How to use market data after we collected it? it will be using for the backtest system. Relying on the custom data source function of FMZ platform backtest system, we can directly use the collected data as the data source of the backtest system, so that we can let the backtest system uses in any market where we want to backtest historical data.

Therefore, we can give the “Market Quote Collector” an upgrade! let the market collector can also serve as a custom data source to provide data to the backtest system.

Get Ready

It is different from the preparation work in the last article. The last time was a docker program running on my local MAC computer, installing the mongodb database to start the database service. This time we changed the operating environment to VPS and used the Alibaba Cloud Linux server to run our set of programs.

  • Mongodb Database

As in the previous article, we need install the mongodb database on the device where the market collector program is running and start the service. It is basically the same as installing mongodb on a MAC computer. There are a lot of tutorials on the Internet, you can google it, it is very simple.

  • Install Python 3

The program uses python3, pay attention to the use of some python libraries, if they are not installed, you need to install them first.

pymongo http urllib

  • Docker

A FMZ docker running will be enough.

Transform the “Market Quotes Collector”

The market quotes collector is this: https://www.fmz.com/strategy/199120 (RecordsCollecter) strategy.

Let’s make some modifications to it:

Before the program enters the while loop for collecting data, a multi-threaded library is used, and concurrent execution starts a service to monitor the data request of the FMZ platform backtest system. (Other details can be ignored)

RecordsCollecter (upgrade to provide custom data source function)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
            self.send_header("Content-type", "application/json")

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request, self.path:", self.path, "query parameter:", dictParam)
            # At present, the backtesting system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            # Connect to the database
            Log("Connect to the database service to obtain data, the database:", exName, "table:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            # Construct query condition: greater than a certain value{'age': {'$gt': 20}} Less than a certain value{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions:", dbQuery, "Number of inquiries:", exRecords.find(dbQuery).count(), "Total number of databases:", exRecords.find().count())
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
            Log("data:", data, "Respond to backtest system requests.")
            # Write data reply
        except BaseException as e:
            Log("Provider do_GET error, e:", e)

def createServer(host):
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "second")
    # Connect to the database service, service address mongodb:// See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    # Start a thread to provide a custom data source service
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local computer test
        _thread.start_new_thread(createServer, (("", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write line by line, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write in
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # adding drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))


Configure the robot


Run the robot, run the market quotes collector.


Open a test strategy for backtest. For example:

function main() {

Configure the backtest option, set the exchange to Binance because the temporary custom data source cannot yet formulate an exchange name by itself, you can only borrow one of the exchange configurations in the list, the backtest shows that Binance, the actual It is the data of the simulation market of WexApp.


Compare whether the chart generated by the backtest system based on the market quotes collector as a custom data source is the same as the 1-hour K-line chart on the wexApp exchange page.

img img

In this way, the robot on the VPS can collect K-line data by itself, and we can obtain the collected data at any time and backtest directly in the backtest system.

You can continue to expand, for example, try the real-level backtest custom data sources, and multi-variety, multi-market data collection and other functions.