
Baru-baru ini, seorang pengguna perlu menggunakan berkas format CSV miliknya sendiri sebagai sumber data untuk sistem pengujian ulang Platform Perdagangan Kuantitatif Inventor. Sistem pengujian ulang dari Inventor Quantitative Trading Platform memiliki banyak fungsi dan mudah digunakan. Selama Anda memiliki data, Anda dapat melakukan pengujian ulang, dan Anda tidak lagi terbatas pada bursa dan produk yang didukung oleh pusat data platform. .
Ide desainnya sebenarnya sangat sederhana. Kita hanya perlu membuat sedikit perubahan pada market collector sebelumnya. Kita menambahkan parameter pada market collector.isOnlySupportCSVDigunakan untuk mengontrol apakah hanya akan menggunakan file CSV sebagai sumber data untuk sistem pengujian ulang, dan menambahkan parameter lainfilePathForCSV, digunakan untuk menetapkan jalur ke berkas data CSV di server tempat robot pengumpul pasar berjalan. Akhirnya, menurutisOnlySupportCSVApakah parameternya diatur keTrueUntuk memutuskan sumber data mana yang akan digunakan (1. dikumpulkan sendiri, 2. data dalam file CSV), perubahan ini terutama terjadiProviderKelasdo_GETfungsi.
Nilai yang Dipisahkan Koma (CSV, terkadang juga disebut nilai yang dipisahkan karakter karena karakter pembatasnya bisa selain koma) adalah file yang menyimpan data tabular (angka dan teks) dalam teks biasa. Teks biasa berarti bahwa berkas tersebut merupakan rangkaian karakter dan tidak berisi data yang harus ditafsirkan seperti angka biner. Berkas CSV terdiri atas sejumlah catatan, dipisahkan oleh semacam karakter pemisah baris; setiap catatan terdiri atas bidang-bidang, dan pemisah antara bidang-bidang tersebut adalah karakter atau string lain, yang paling umum adalah koma atau tab. Biasanya, semua catatan memiliki urutan bidang yang persis sama. Ini biasanya berupa berkas teks biasa. Disarankan untuk menggunakan WORDPAD atau Notepad untuk membukanya. Cara lain adalah menyimpannya sebagai file baru lalu membukanya dengan EXCEL.
Tidak ada standar universal untuk format file CSV, tetapi ada aturan tertentu, umumnya satu baris per rekaman, dengan baris pertama menjadi tajuk. Data pada setiap baris dipisahkan dengan koma.
Misalnya, file CSV yang kami gunakan untuk pengujian terlihat seperti ini saat dibuka dengan Notepad:

Perhatikan bahwa baris pertama file CSV adalah tajuk tabel.
,open,high,low,close,vol
Kita perlu mengurai dan mengatur data ini, lalu menyusunnya ke dalam format yang dibutuhkan oleh sumber data khusus sistem pengujian ulang. Hal ini telah ditangani dalam kode di artikel kita sebelumnya dan hanya memerlukan sedikit modifikasi.
import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global isOnlySupportCSV, filePathForCSV
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
# 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
exName = exchange.GetName()
# 注意,period为底层K线周期
tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))
priceRatio = math.pow(10, int(dictParam["round"]))
amountRatio = math.pow(10, int(dictParam["vround"]))
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
# 要求应答的数据
data = {
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
if isOnlySupportCSV:
# 处理CSV读取,filePathForCSV路径
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
# 获取表头
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
# 读取内容
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据:", data, "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
return
# 连接数据库
Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
ex_DB = myDBClient[exName]
exRecords = ex_DB[tabName]
# 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
for x in exRecords.find(dbQuery).sort("Time"):
# 需要根据请求参数round和vround,处理数据精度
bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
data["data"].append(bar)
Log("数据:", data, "响应回测系统请求。")
# 写入数据应答
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
def main():
LogReset(1)
if (isOnlySupportCSV):
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
Sleep(2000)
exName = exchange.GetName()
period = exchange.GetPeriod()
Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
# 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
# 创建数据库
ex_DB = myDBClient[exName]
# 打印目前数据库表
collist = ex_DB.list_collection_names()
Log("mongodb ", exName, " collist:", collist)
# 检测是否删除表
arrDropNames = json.loads(dropNames)
if isinstance(arrDropNames, list):
for i in range(len(arrDropNames)):
dropName = arrDropNames[i]
if isinstance(dropName, str):
if not dropName in collist:
continue
tab = ex_DB[dropName]
Log("dropName:", dropName, "删除:", dropName)
ret = tab.drop()
collist = ex_DB.list_collection_names()
if dropName in collist:
Log(dropName, "删除失败")
else :
Log(dropName, "删除成功")
# 开启一个线程,提供自定义数据源服务
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
# 创建records表
ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
preBarTime = 0
index = 1
while True:
r = _C(exchange.GetRecords)
if len(r) < 2:
Sleep(1000)
continue
if preBarTime == 0:
# 首次写入所有BAR数据
for i in range(len(r) - 1):
bar = r[i]
# 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
# 写入bar到数据库表
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
elif preBarTime != r[-1]["Time"]:
bar = r[-2]
# 写入数据前检测,数据是否已经存在,基于时间戳检测
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
# 增加画图展示
ext.PlotRecords(r, "%s_%d" % ("records", period))
Sleep(10000)
Pertama, kita jalankan robot pengumpul pasar, tambahkan bursa ke robot, lalu biarkan robot berjalan.
Konfigurasi parameter:


Kemudian kami membuat strategi pengujian:
function main() {
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
}
Strateginya sangat sederhana, hanya memperoleh dan mencetak tiga data K-line.
Pada halaman pengujian ulang, tetapkan sumber data sistem pengujian ulang ke sumber data khusus, dan isi alamat server tempat robot pengumpul pasar berjalan. Karena data pada berkas CSV kami adalah K-line 1 menit. Oleh karena itu, saat melakukan pengujian ulang, kami menetapkan periode K-line menjadi 1 menit.

Klik Mulai Pengujian Ulang, dan robot pengumpul pasar menerima permintaan data:

Setelah sistem pengujian ulang menyelesaikan eksekusi strategi, sistem akan menghasilkan grafik garis K berdasarkan data garis K dalam sumber data.

Bandingkan data dalam berkas:


Ini hanya titik awal, silakan meninggalkan pesan.