
حال ہی میں، ایک صارف کو انوینٹر کوانٹیٹیو ٹریڈنگ پلیٹ فارم کے بیک ٹیسٹنگ سسٹم کے لیے ڈیٹا سورس کے طور پر اپنی CSV فارمیٹ فائل استعمال کرنے کی ضرورت تھی۔ انوینٹر کوانٹیٹیو ٹریڈنگ پلیٹ فارم کے بیک ٹیسٹنگ سسٹم میں بہت سے افعال ہیں اور جب تک آپ کے پاس ڈیٹا موجود ہے، آپ بیک ٹیسٹنگ کر سکتے ہیں، اور آپ پلیٹ فارم ڈیٹا سینٹر کے ذریعے تعاون یافتہ تبادلے اور مصنوعات تک محدود نہیں ہیں۔ .
ڈیزائن آئیڈیا دراصل بہت آسان ہے ہمیں پچھلے مارکیٹ کلیکٹر میں ایک پیرامیٹر شامل کرنے کی ضرورت ہے۔isOnlySupportCSVیہ کنٹرول کرنے کے لیے استعمال کیا جاتا ہے کہ آیا بیک ٹیسٹنگ سسٹم کے لیے صرف CSV فائلوں کو ڈیٹا کے ذرائع کے طور پر استعمال کیا جائے، اور دوسرا پیرامیٹر شامل کیا جائے۔filePathForCSV، سرور پر CSV ڈیٹا فائل کا راستہ سیٹ کرنے کے لیے استعمال کیا جاتا ہے جہاں مارکیٹ کلیکٹر روبوٹ چلتا ہے۔ آخر میں، کے مطابقisOnlySupportCSVکیا پیرامیٹر پر سیٹ ہے۔Trueیہ فیصلہ کرنے کے لیے کہ کون سا ڈیٹا سورس استعمال کرنا ہے (1. خود سے جمع کردہ، 2. CSV فائل میں ڈیٹا)، یہ تبدیلی بنیادی طور پرProviderکلاسdo_GETفنکشن
کوما سے الگ شدہ قدریں (CSV، جسے بعض اوقات کریکٹر سے الگ کردہ اقدار بھی کہا جاتا ہے کیونکہ حد بندی کرنے والے حروف کوما کے علاوہ بھی ہو سکتے ہیں) وہ فائلیں ہیں جو ٹیبلر ڈیٹا (نمبرز اور ٹیکسٹ) کو سادہ متن میں محفوظ کرتی ہیں۔ سادہ متن کا مطلب ہے کہ فائل حروف کی ایک ترتیب ہے اور اس میں کوئی ڈیٹا نہیں ہے جس کی بائنری نمبروں کی طرح تشریح کی جانی چاہیے۔ ایک CSV فائل کسی نہ کسی قسم کے لائن بریک کریکٹر سے الگ کیے گئے ریکارڈز پر مشتمل ہوتی ہے، اور ہر ریکارڈ فیلڈز پر مشتمل ہوتا ہے، اور فیلڈز کے درمیان الگ کرنے والا دوسرے حروف یا تار ہوتے ہیں، جن میں سے سب سے عام کوما یا ٹیبز ہوتے ہیں۔ عام طور پر، تمام ریکارڈز میں قطعات کی ترتیب بالکل یکساں ہوتی ہے۔ یہ عام طور پر سادہ ٹیکسٹ فائلیں ہیں۔ اسے کھولنے کے لیے WORDPAD یا Notepad استعمال کرنے کی سفارش کی جاتی ہے ایک اور طریقہ یہ ہے کہ اسے ایک نئی فائل کے طور پر محفوظ کیا جائے اور پھر اسے EXCEL کے ساتھ کھولیں۔
CSV فائل فارمیٹ کے لیے کوئی عالمگیر معیار نہیں ہے، لیکن کچھ اصول ہیں، عام طور پر ایک سطر فی ریکارڈ، پہلی سطر ہیڈر کے ساتھ۔ ہر قطار میں موجود ڈیٹا کوما سے الگ کیا جاتا ہے۔
مثال کے طور پر، ہم نے جانچ کے لیے جو CSV فائل استعمال کی ہے وہ نوٹ پیڈ کے ساتھ کھولنے پر اس طرح نظر آتی ہے:

مشاہدہ کریں کہ CSV فائل کی پہلی لائن ٹیبل ہیڈر ہے۔
,open,high,low,close,vol
ہمیں اس ڈیٹا کو پارس اور منظم کرنے کی ضرورت ہے، اور پھر اسے بیک ٹیسٹنگ سسٹم کے کسٹم ڈیٹا سورس کے لیے مطلوبہ فارمیٹ میں بنانا ہوگا، یہ ہمارے پچھلے آرٹیکل میں پہلے ہی کوڈ میں ہینڈل کیا جا چکا ہے اور اس میں صرف معمولی ترمیم کی ضرورت ہے۔
import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global isOnlySupportCSV, filePathForCSV
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
# 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
exName = exchange.GetName()
# 注意,period为底层K线周期
tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))
priceRatio = math.pow(10, int(dictParam["round"]))
amountRatio = math.pow(10, int(dictParam["vround"]))
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
# 要求应答的数据
data = {
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
if isOnlySupportCSV:
# 处理CSV读取,filePathForCSV路径
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
# 获取表头
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
# 读取内容
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据:", data, "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
return
# 连接数据库
Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
ex_DB = myDBClient[exName]
exRecords = ex_DB[tabName]
# 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
for x in exRecords.find(dbQuery).sort("Time"):
# 需要根据请求参数round和vround,处理数据精度
bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
data["data"].append(bar)
Log("数据:", data, "响应回测系统请求。")
# 写入数据应答
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
def main():
LogReset(1)
if (isOnlySupportCSV):
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
Sleep(2000)
exName = exchange.GetName()
period = exchange.GetPeriod()
Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
# 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
# 创建数据库
ex_DB = myDBClient[exName]
# 打印目前数据库表
collist = ex_DB.list_collection_names()
Log("mongodb ", exName, " collist:", collist)
# 检测是否删除表
arrDropNames = json.loads(dropNames)
if isinstance(arrDropNames, list):
for i in range(len(arrDropNames)):
dropName = arrDropNames[i]
if isinstance(dropName, str):
if not dropName in collist:
continue
tab = ex_DB[dropName]
Log("dropName:", dropName, "删除:", dropName)
ret = tab.drop()
collist = ex_DB.list_collection_names()
if dropName in collist:
Log(dropName, "删除失败")
else :
Log(dropName, "删除成功")
# 开启一个线程,提供自定义数据源服务
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
# 创建records表
ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
preBarTime = 0
index = 1
while True:
r = _C(exchange.GetRecords)
if len(r) < 2:
Sleep(1000)
continue
if preBarTime == 0:
# 首次写入所有BAR数据
for i in range(len(r) - 1):
bar = r[i]
# 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
# 写入bar到数据库表
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
elif preBarTime != r[-1]["Time"]:
bar = r[-2]
# 写入数据前检测,数据是否已经存在,基于时间戳检测
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
# 增加画图展示
ext.PlotRecords(r, "%s_%d" % ("records", period))
Sleep(10000)
سب سے پہلے، ہم مارکیٹ کلیکٹر روبوٹ شروع کرتے ہیں، روبوٹ میں ایک تبادلہ شامل کرتے ہیں، اور روبوٹ کو چلنے دیتے ہیں۔
پیرامیٹر ترتیب:


پھر ہم ایک ٹیسٹ حکمت عملی بناتے ہیں:
function main() {
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
}
حکمت عملی بہت آسان ہے، صرف تین K-لائن ڈیٹا حاصل کرنا اور پرنٹ کرنا۔
بیک ٹیسٹنگ پیج پر، بیک ٹیسٹنگ سسٹم کے ڈیٹا سورس کو حسب ضرورت ڈیٹا سورس پر سیٹ کریں، اور سرور کا پتہ بھریں جہاں مارکیٹ کلیکٹر روبوٹ چلتا ہے۔ چونکہ ہماری CSV فائل میں ڈیٹا 1 منٹ کی K-line ہے۔ لہذا، بیک ٹیسٹنگ کرتے وقت، ہم K-line کی مدت 1 منٹ پر سیٹ کرتے ہیں۔

بیک ٹیسٹنگ شروع کریں پر کلک کریں، اور مارکیٹ کلیکٹر روبوٹ ڈیٹا کی درخواست وصول کرتا ہے:

بیک ٹیسٹنگ سسٹم حکمت عملی پر عمل درآمد مکمل کرنے کے بعد، یہ ڈیٹا سورس میں K-لائن ڈیٹا کی بنیاد پر K-line چارٹ تیار کرتا ہے۔

فائل میں ڈیٹا کا موازنہ کریں:


یہ صرف ایک نقطہ آغاز ہے، پیغام چھوڑنے میں خوش آمدید۔