
Baru-baru ini, pengguna perlu menggunakan fail format CSV sendiri sebagai sumber data untuk sistem ujian belakang Platform Dagangan Kuantitatif Pencipta. Sistem ujian belakang Platform Dagangan Kuantitatif Inventor mempunyai banyak fungsi dan mudah dan cekap untuk digunakan Selagi anda mempunyai data, anda boleh melakukan ujian balik, dan anda tidak lagi terhad kepada pertukaran dan produk yang disokong oleh pusat data platform. .
Idea reka bentuk sebenarnya sangat mudah. Kami hanya perlu membuat sedikit perubahan kepada pengumpul pasaran sebelumnya.isOnlySupportCSVDigunakan untuk mengawal sama ada untuk menggunakan fail CSV sahaja sebagai sumber data untuk sistem ujian belakang dan menambah parameter lainfilePathForCSV, digunakan untuk menetapkan laluan ke fail data CSV pada pelayan tempat robot pengumpul pasaran dijalankan. Akhirnya, menurutisOnlySupportCSVAdakah parameter ditetapkan kepadaTrueUntuk menentukan sumber data yang hendak digunakan (1. dikumpul oleh anda sendiri, 2. data dalam fail CSV), perubahan ini terutamanya dalamProviderKelasdo_GETfungsi.
Nilai Dipisahkan Koma (CSV, kadangkala juga dipanggil nilai dipisahkan aksara kerana aksara pembatas boleh selain daripada koma) ialah fail yang menyimpan data jadual (nombor dan teks) dalam teks biasa. Teks biasa bermaksud bahawa fail adalah jujukan aksara dan tidak mengandungi data yang mesti ditafsirkan seperti nombor binari. Fail CSV terdiri daripada sebarang bilangan rekod, dipisahkan oleh sejenis aksara pemisah baris setiap rekod terdiri daripada medan, dan pemisah antara medan ialah aksara atau rentetan lain, yang paling biasa ialah koma atau tab. Biasanya, semua rekod mempunyai jujukan medan yang sama. Ini biasanya fail teks biasa. Adalah disyorkan untuk menggunakan WORDPAD atau Notepad untuk membukanya. Kaedah lain ialah menyimpannya sebagai fail baharu dan kemudian membukanya dengan EXCEL.
Tiada standard universal untuk format fail CSV, tetapi terdapat peraturan tertentu, biasanya satu baris setiap rekod, dengan baris pertama sebagai pengepala. Data dalam setiap baris dipisahkan dengan koma.
Sebagai contoh, fail CSV yang kami gunakan untuk ujian kelihatan seperti ini apabila dibuka dengan Notepad:

Perhatikan bahawa baris pertama fail CSV ialah pengepala jadual.
,open,high,low,close,vol
Kami perlu menghuraikan dan menyusun data ini, dan kemudian membinanya ke dalam format yang diperlukan oleh sumber data tersuai sistem ujian belakang Ini telah pun dikendalikan dalam kod dalam artikel kami sebelum ini dan hanya memerlukan sedikit pengubahsuaian.
import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global isOnlySupportCSV, filePathForCSV
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
# 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
exName = exchange.GetName()
# 注意,period为底层K线周期
tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))
priceRatio = math.pow(10, int(dictParam["round"]))
amountRatio = math.pow(10, int(dictParam["vround"]))
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
# 要求应答的数据
data = {
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
if isOnlySupportCSV:
# 处理CSV读取,filePathForCSV路径
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
# 获取表头
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
# 读取内容
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据:", data, "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
return
# 连接数据库
Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
ex_DB = myDBClient[exName]
exRecords = ex_DB[tabName]
# 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
for x in exRecords.find(dbQuery).sort("Time"):
# 需要根据请求参数round和vround,处理数据精度
bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
data["data"].append(bar)
Log("数据:", data, "响应回测系统请求。")
# 写入数据应答
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
def main():
LogReset(1)
if (isOnlySupportCSV):
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
Sleep(2000)
exName = exchange.GetName()
period = exchange.GetPeriod()
Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
# 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
# 创建数据库
ex_DB = myDBClient[exName]
# 打印目前数据库表
collist = ex_DB.list_collection_names()
Log("mongodb ", exName, " collist:", collist)
# 检测是否删除表
arrDropNames = json.loads(dropNames)
if isinstance(arrDropNames, list):
for i in range(len(arrDropNames)):
dropName = arrDropNames[i]
if isinstance(dropName, str):
if not dropName in collist:
continue
tab = ex_DB[dropName]
Log("dropName:", dropName, "删除:", dropName)
ret = tab.drop()
collist = ex_DB.list_collection_names()
if dropName in collist:
Log(dropName, "删除失败")
else :
Log(dropName, "删除成功")
# 开启一个线程,提供自定义数据源服务
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
# 创建records表
ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
preBarTime = 0
index = 1
while True:
r = _C(exchange.GetRecords)
if len(r) < 2:
Sleep(1000)
continue
if preBarTime == 0:
# 首次写入所有BAR数据
for i in range(len(r) - 1):
bar = r[i]
# 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
# 写入bar到数据库表
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
elif preBarTime != r[-1]["Time"]:
bar = r[-2]
# 写入数据前检测,数据是否已经存在,基于时间戳检测
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
# 增加画图展示
ext.PlotRecords(r, "%s_%d" % ("records", period))
Sleep(10000)
Mula-mula, kita mulakan robot pengumpul pasaran, tambah pertukaran pada robot, dan biarkan robot berjalan.
Konfigurasi parameter:


Kemudian kami membuat strategi ujian:
function main() {
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
}
Strategi ini sangat mudah, hanya mendapatkan dan mencetak tiga data K-line.
Pada halaman ujian belakang, tetapkan sumber data sistem ujian belakang kepada sumber data tersuai dan isikan alamat pelayan tempat robot pengumpul pasaran dijalankan. Memandangkan data dalam fail CSV kami ialah 1 minit K-line. Oleh itu, apabila ujian belakang, kami menetapkan tempoh K-line kepada 1 minit.

Klik Mula Uji Balik, dan robot pengumpul pasaran menerima permintaan data:

Selepas sistem ujian belakang melengkapkan pelaksanaan strategi, ia menjana carta garis K berdasarkan data garis K dalam sumber data.

Bandingkan data dalam fail:


Ini hanyalah titik permulaan, dialu-alukan untuk meninggalkan mesej.