
Bài viết trướcDạy bạn từng bước để thực hiện một nhà sưu tập thị trườngChúng tôi đã cùng nhau triển khai một chương trình robot để thu thập dữ liệu thị trường. Chúng tôi sử dụng dữ liệu thị trường đã thu thập như thế nào? Tất nhiên, nó được sử dụng cho hệ thống kiểm tra ngược. Ở đây, dựa vào chức năng nguồn dữ liệu tùy chỉnh của hệ thống kiểm tra ngược của nền tảng giao dịch định lượng của nhà phát minh, chúng ta có thể trực tiếp sử dụng dữ liệu đã thu thập được làm nguồn dữ liệu của hệ thống kiểm tra ngược, để chúng ta có thể tạo hệ thống kiểm tra ngược Hệ thống kiểm tra có thể được áp dụng cho bất kỳ thị trường nào mà chúng ta muốn kiểm tra ngược dữ liệu lịch sử.
Vì vậy, chúng ta có thể nâng cấp “Market Collector”! Công cụ thu thập thị trường cũng có thể được sử dụng như một nguồn dữ liệu tùy chỉnh để cung cấp dữ liệu cho hệ thống kiểm tra ngược.
Nếu bạn có nhu cầu, hãy hành động!
Các bước chuẩn bị khác với bài viết trước. Lần trước, tôi chạy chương trình host trên máy tính MAC cục bộ của mình, cài đặt cơ sở dữ liệu mongodb và khởi động dịch vụ cơ sở dữ liệu. Lần này chúng tôi thay đổi môi trường hoạt động thành VPS và sử dụng máy chủ Alibaba Cloud Linux để chạy chương trình của mình.
Như trong bài viết trước, bạn cần cài đặt cơ sở dữ liệu MongoDB trên thiết bị nơi chương trình thu thập thị trường chạy và khởi động dịch vụ. Về cơ bản thì nó giống như cài đặt mongodb trên máy tính MAC. Có rất nhiều hướng dẫn trên Internet. Bạn có thể tìm kiếm chúng. Rất đơn giản.
Cài đặt Python 3 Chương trình sử dụng Python 3. Lưu ý rằng một số thư viện được sử dụng và cần phải cài đặt nếu không có sẵn.
Chủ nhà Chỉ cần quản lý nền tảng giao dịch định lượng Inventor.
Người thu gom thị trườngRecordsCollecter (Hướng dẫn)Chiến lược này. Chúng ta hãy thực hiện một số thay đổi sau: Trước khi chương trình vào vòng lặp while để thu thập dữ liệu, một thư viện đa luồng được sử dụng để đồng thời khởi động một dịch vụ nhằm lắng nghe các yêu cầu dữ liệu từ hệ thống kiểm tra ngược Inventor’s Quantitative Trading Platform. (Một số chi tiết khác có thể bỏ qua)
RecordsCollecter (được nâng cấp để cung cấp chức năng nguồn dữ liệu tùy chỉnh)
import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
# 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
exName = exchange.GetName()
# 注意,period为底层K线周期
tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))
priceRatio = math.pow(10, int(dictParam["round"]))
amountRatio = math.pow(10, int(dictParam["vround"]))
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
# 连接数据库
Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
ex_DB = myDBClient[exName]
exRecords = ex_DB[tabName]
# 要求应答的数据
data = {
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
# 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
for x in exRecords.find(dbQuery).sort("Time"):
# 需要根据请求参数round和vround,处理数据精度
bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
data["data"].append(bar)
Log("数据:", data, "响应回测系统请求。")
# 写入数据应答
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
def main():
LogReset(1)
exName = exchange.GetName()
period = exchange.GetPeriod()
Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
# 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
# 创建数据库
ex_DB = myDBClient[exName]
# 打印目前数据库表
collist = ex_DB.list_collection_names()
Log("mongodb ", exName, " collist:", collist)
# 检测是否删除表
arrDropNames = json.loads(dropNames)
if isinstance(arrDropNames, list):
for i in range(len(arrDropNames)):
dropName = arrDropNames[i]
if isinstance(dropName, str):
if not dropName in collist:
continue
tab = ex_DB[dropName]
Log("dropName:", dropName, "删除:", dropName)
ret = tab.drop()
collist = ex_DB.list_collection_names()
if dropName in collist:
Log(dropName, "删除失败")
else :
Log(dropName, "删除成功")
# 开启一个线程,提供自定义数据源服务
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), )) # 本机测试
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), )) # VPS服务器上测试
Log("开启自定义数据源服务线程", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
# 创建records表
ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
preBarTime = 0
index = 1
while True:
r = _C(exchange.GetRecords)
if len(r) < 2:
Sleep(1000)
continue
if preBarTime == 0:
# 首次写入所有BAR数据
for i in range(len(r) - 1):
bar = r[i]
# 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
# 写入bar到数据库表
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
elif preBarTime != r[-1]["Time"]:
bar = r[-2]
# 写入数据前检测,数据是否已经存在,基于时间戳检测
retQuery = ex_DB_Records.find({"Time": bar["Time"]})
if retQuery.count() > 0:
continue
ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
index += 1
preBarTime = r[-1]["Time"]
LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
# 增加画图展示
ext.PlotRecords(r, "%s_%d" % ("records", period))
Sleep(10000)
Cấu hình robot

Chạy robot và chạy máy thu tiền trên thị trường.

Mở một chiến lược kiểm tra và thực hiện kiểm tra ngược, chẳng hạn như chiến lược kiểm tra ngược này và kiểm tra nó.
function main() {
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords())
Log(exchange.GetRecords().length)
}
Cấu hình các tùy chọn kiểm tra ngược và đặt sàn giao dịch thành Binance vì nguồn dữ liệu tùy chỉnh không thể tự đặt tên sàn giao dịch trong thời điểm hiện tại. Bạn chỉ có thể sử dụng một sàn giao dịch trong danh sách để cấu hình. Khi kiểm tra ngược, Binance sẽ được hiển thị. Đó là dữ liệu của đĩa mô phỏng wexApp.

So sánh biểu đồ được tạo bởi hệ thống kiểm tra ngược dựa trên bộ thu thập thị trường dưới dạng nguồn dữ liệu tùy chỉnh và biểu đồ nến 1 giờ trên trang trao đổi wexApp để xem chúng có giống nhau không.


Theo cách này, robot trên VPS có thể tự thu thập dữ liệu K-line và chúng ta có thể lấy dữ liệu đã thu thập bất kỳ lúc nào và kiểm tra ngược trực tiếp trong hệ thống kiểm tra ngược. Đây chỉ là một giọt nước trong đại dương. Các chuyên gia có thể tiếp tục mở rộng điều này, ví dụ, hỗ trợ các nguồn dữ liệu tùy chỉnh để kiểm tra ngược thời gian thực, hỗ trợ thu thập dữ liệu đa dạng và đa thị trường và các chức năng khác.
Rất vui lòng để lại tin nhắn.