
Article précédentJe vous apprends étape par étape à mettre en place un collecteur de marchéEnsemble, nous avons mis en place un programme robotisé pour collecter des données de marché. Comment utilisons-nous les données de marché collectées ? Bien sûr, il est utilisé pour le système de backtesting. Ici, en s’appuyant sur la fonction de source de données personnalisée du système de backtesting de la plateforme de trading quantitative de l’inventeur, nous pouvons utiliser directement les données collectées comme source de données du système de backtesting, de sorte que nous peut créer le système de backtesting Le système de test peut être appliqué à n’importe quel marché où nous souhaitons tester les données historiques.
Nous pouvons donc améliorer le « Market Collector » ! Le collecteur de marché peut également être utilisé comme source de données personnalisée pour fournir des données au système de backtesting.
Si vous avez un besoin, agissez !
Les préparatifs sont différents de ceux du dernier article. La dernière fois, j’ai exécuté le programme hôte sur mon ordinateur MAC local, installé la base de données mongodb et démarré le service de base de données. Cette fois, nous avons changé l’environnement d’exploitation en VPS et utilisé le serveur Alibaba Cloud Linux pour exécuter notre programme.
Comme dans l’article précédent, vous devez installer la base de données MongoDB sur l’appareil sur lequel s’exécute le programme de collecte de marché et démarrer le service. C’est fondamentalement la même chose que d’installer mongodb sur un ordinateur MAC. Il existe de nombreux tutoriels sur Internet. Vous pouvez les rechercher. C’est très simple.
Installer Python 3 Le programme utilise Python 3. Notez que certaines bibliothèques sont utilisées et doivent être installées si elles ne sont pas disponibles.
Hôte Exécutez simplement un dépositaire de la plateforme de trading quantitative Inventor.
Collecteur de marchéRecordsCollecter (Tutoriel)Cette stratégie. Apportons-y quelques modifications : Avant que le programme n’entre dans la boucle while pour collecter des données, une bibliothèque multithread est utilisée pour démarrer simultanément un service afin d’écouter les demandes de données du système de backtesting de la plateforme de trading quantitative de l’inventeur. (Certains autres détails peuvent être ignorés)
RecordsCollecter (mis à niveau pour fournir une fonction de source de données personnalisée)
import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
# 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
exName = exchange.GetName()
# 注意,period为底层K线周期
tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))
priceRatio = math.pow(10, int(dictParam["round"]))
amountRatio = math.pow(10, int(dictParam["vround"]))
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
# 连接数据库
Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
ex_DB = myDBClient[exName]
exRecords = ex_DB[tabName]
# 要求应答的数据
data = {
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
# 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
for x in exRecords.find(dbQuery).sort("Time"):
# 需要根据请求参数round和vround,处理数据精度
bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
data["data"].append(bar)
Log("数据:", data, "响应回测系统请求。")
# 写入数据应答
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
def main():
LogReset(1)
exName = exchange.GetName()
period = exchange.GetPeriod()
Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
# 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
# 创建数据库
ex_DB = myDBClient[exName]
# 打印目前数据库表
collist = ex_DB.list_collection_names()
Log("mongodb ", exName, " collist:", collist)
# 检测是否删除表
arrDropNames = json.loads(dropNames)
if isinstance(arrDropNames, list):
for i in range(len(arrDropNames)):
dropName = arrDropNames[i]
if isinstance(dropName, str):
if not dropName in collist:
continue
tab = ex_DB[dropName]
Log("dropName:", dropName, "删除:", dropName)
ret = tab.drop()
collist = ex_DB.list_collection_names()
if dropName in collist:
Log(dropName, "删除失败")
else :
Log(dropName, "删除成功")
# 开启一个线程,提供自定义数据源服务
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
# 创建records表
ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
preBarTime = 0
index = 1
while True:
r = _C(exchange.GetRecords)
if len(r) < 2:
Sleep(1000)
continue
if preBarTime == 0:
# 首次写入所有BAR数据
for i in range(len(r) - 1):
bar = r[i]
# 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
# 写入bar到数据库表
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
elif preBarTime != r[-1]["Time"]:
bar = r[-2]
# 写入数据前检测,数据是否已经存在,基于时间戳检测
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
# 增加画图展示
ext.PlotRecords(r, "%s_%d" % ("records", period))
Sleep(10000)
Configurer le robot

Exécutez le robot et exécutez le collecteur de marché.

Ouvrez une stratégie de test et effectuez un backtest, comme cette stratégie de backtest, et testez-la.
function main() {
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords().length)
}
Configurez les options de backtesting et définissez l’échange sur Binance car la source de données personnalisée ne peut pas définir elle-même un nom d’échange pour le moment. Vous ne pouvez utiliser qu’un échange dans la liste pour la configuration. Binance est affiché pendant le backtesting. Il s’agit des données du disque de simulation wexApp.

Comparez le graphique généré par le système de backtesting basé sur le collecteur de marché comme source de données personnalisée et le graphique en chandelier d’une heure sur la page d’échange wexApp pour voir s’ils sont identiques.


De cette façon, le robot sur le VPS peut collecter lui-même les données de la ligne K, et nous pouvons obtenir les données collectées à tout moment et effectuer des backtests directement dans le système de backtesting. Ce n’est qu’une goutte d’eau dans l’océan. Les experts peuvent continuer à développer ce domaine, par exemple en prenant en charge des sources de données personnalisées pour les tests rétrospectifs en temps réel, en prenant en charge la collecte de données multivariétés et multimarchés et d’autres fonctions.
Bienvenue pour laisser un message.