Tăng cấp bộ thu hành - hỗ trợ nhập file định dạng CSV để cung cấp nguồn dữ liệu tùy chỉnh

Tác giả:Giấc mơ nhỏ, Tạo: 2020-05-23 15:44:47, Cập nhật: 2023-10-08 19:48:40

img

Lưu tập dữ liệu được nâng cấp để hỗ trợ nhập file định dạng CSV và cung cấp nguồn dữ liệu tùy chỉnh

Người dùng mới nhất cần có file CSV của mình làm nguồn dữ liệu, cho phép nhà phát minh sử dụng hệ thống kiểm tra lại của nền tảng giao dịch định lượng. Các hệ thống kiểm tra lại của nền tảng giao dịch định lượng của nhà phát minh có nhiều tính năng, sử dụng đơn giản và hiệu quả, do đó, miễn là họ có dữ liệu, họ có thể kiểm tra lại, không còn giới hạn trong các sàn giao dịch, giống hỗ trợ trung tâm dữ liệu nền tảng.

Ý tưởng thiết kế

Ý tưởng thiết kế thực sự rất đơn giản, chúng ta chỉ cần thay đổi một chút trên cơ sở của bộ thu thập thị trường trước đây, chúng ta thêm một tham số vào bộ thu thập thị trường.isOnlySupportCSVĐể kiểm soát liệu chỉ sử dụng tệp CSV làm nguồn dữ liệu được cung cấp cho hệ thống tìm kiếm, thêm một tham sốfilePathForCSV, được sử dụng để thiết lập đường dẫn để đặt các tập tin dữ liệu CSV trên máy chủ được chạy bởi robot thu thập dữ liệu.isOnlySupportCSVNếu các tham số được đặt làTrueĐể quyết định sử dụng nguồn dữ liệu đó (dữ liệu được thu thập bởi chính mình, dữ liệu trong tệp CSV), thay đổi này chủ yếu là để xác định nguồn dữ liệu mà bạn muốn sử dụng.ProviderCác loạido_GETTrong hàm.

CSV là gì?

Các giá trị tách dấu ngoặc (CSV, đôi khi còn được gọi là giá trị tách dấu ngoặc, vì các ký tự tách cũng có thể không phải là dấu ngoặc) là các tập tin lưu trữ dữ liệu bảng dưới dạng văn bản thuần túy (số và văn bản); văn bản thuần túy có nghĩa là tập tin là một chuỗi ký tự, không chứa dữ liệu phải được giải thích như các số nhị phân. Các tập tin CSV được tạo thành từ các bản ghi có mục đích bất kỳ số lượng nào, được tách bằng một số dấu ngoặc; mỗi bản ghi được tạo thành từ các trường, dấu ngoặc của đoạn là các ký tự hoặc chuỗi khác, phổ biến nhất là ký tự dấu ngoặc hoặc ký tự bảng. Thông thường, tất cả các bản ghi đều có chuỗi đoạn hoàn toàn giống nhau.

Không có tiêu chuẩn chung cho định dạng tệp CSV, nhưng có một quy tắc, thường là một dòng ghi lại, đầu tiên là hành động; dữ liệu trong mỗi dòng được đánh dấu bằng dấu chấm.

Ví dụ, một tập tin CSV chúng tôi dùng để kiểm tra mở bằng sổ tay như thế này:img

Nhìn vào, dòng đầu tiên của một tập tin CSV là đầu bảng.

,open,high,low,close,vol

Chúng ta chỉ cần phân tích dữ liệu như vậy và xây dựng định dạng yêu cầu nguồn dữ liệu tùy chỉnh của hệ thống truy vấn, điều này đã được xử lý trong mã trong bài viết trước đó của chúng tôi, chỉ cần sửa đổi một chút.

Mã đã được sửa đổi

import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global isOnlySupportCSV, filePathForCSV
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)

            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            if isOnlySupportCSV:
                # 处理CSV读取,filePathForCSV路径
                listDataSequence = []
                with open(filePathForCSV, "r") as f:
                    reader = csv.reader(f)
                    # 获取表头
                    header = next(reader)
                    headerIsNoneCount = 0
                    if len(header) != len(data["schema"]):
                        Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                        return 
                    for ele in header:
                        for i in range(len(data["schema"])):
                            if data["schema"][i] == ele or ele == "":
                                if ele == "":
                                    headerIsNoneCount += 1
                                if headerIsNoneCount > 1:
                                    Log("CSV文件格式有误,请检查!", "#FF0000")
                                    return 
                                listDataSequence.append(i)
                                break
                    
                    # 读取内容
                    while True:
                        record = next(reader, -1)
                        if record == -1:
                            break
                        index = 0
                        arr = [0, 0, 0, 0, 0, 0]
                        for ele in record:
                            arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                            index += 1
                        data["data"].append(arr)
                
                Log("数据:", data, "响应回测系统请求。")
                self.wfile.write(json.dumps(data).encode())
                return 
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    if (isOnlySupportCSV):
        try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))         # 本机测试
            _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
            Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
        except BaseException as e:
            Log("启动自定义数据源服务失败!")
            Log("错误信息:", e)
            raise Exception("stop")
        while True:
            LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
            Sleep(2000)
    
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Thử nghiệm chạy

Trước tiên, chúng tôi khởi động robot thu thập thị trường, chúng tôi thêm một sàn giao dịch vào robot để robot hoạt động. Cài đặt tham số:img

img

Sau đó, chúng tôi tạo ra một chiến lược thử nghiệm:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
}

Phương pháp rất đơn giản, chỉ cần lấy và in ba lần dữ liệu K-line.

Trang truy cập, nguồn dữ liệu của hệ thống truy cập được thiết lập là nguồn dữ liệu tùy chỉnh, và địa chỉ chứa địa chỉ máy chủ được chạy bởi robot thu thập thị trường. Vì dữ liệu trong tệp CSV của chúng tôi là 1 phút K-line. Vì vậy, khi truy cập, chúng tôi đặt chu kỳ K-line là 1 phút.

img

Khi nhấp chuột bắt đầu kiểm tra lại, robot thu thập dữ liệu nhận được yêu cầu:img

Sau khi hoàn thành các chính sách thực hiện hệ thống kiểm tra, tạo biểu đồ K-line dựa trên dữ liệu đường K trong nguồn dữ liệu.img

Các tài liệu so sánh:img

img

RecordsCollector (cải tiến để cung cấp tính năng nguồn dữ liệu tùy chỉnh, hỗ trợ nguồn dữ liệu cho tệp dữ liệu CSV)

Bạn có thể tham khảo bài viết này tại đây:


Có liên quan

Thêm nữa

Anh trai tôiCó cần phải cài đặt python trên máy chủ quản trị không?

Sparta chơi theo định lượngDreamcatcher, bây giờ nguồn dữ liệu tùy chỉnh này được kiểm tra lại trên trình duyệt, dữ liệu có vấn đề về độ chính xác, bạn hãy thử xem.

AiKPM-/upload/asset/19cfcf5244f5e2cd73173.png /upload/asset/19c100ceb1eb25a38a970.png Tôi đã gắn máy tính, tôi không thể điền vào địa chỉ, tôi đã điền vào địa chỉ máy chủ, cổng 9090 và máy thu thập không phản hồi.

WeixxXin hỏi tại sao tôi đã thiết lập nguồn dữ liệu CSV tùy chỉnh trên máy chủ lưu trữ, trả về dữ liệu với yêu cầu trang, và sau đó không trả về dữ liệu trong truy cập lại, khi chỉ đặt dữ liệu trực tiếp thành hai dữ liệu, máy chủ HTTP có thể nhận được yêu cầu trong /upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c4c3d286587b3e.ng /upload/asset/169e8dcdbf9c0c544pbac8.png

WeixxXin hỏi tại sao tôi đã thiết lập nguồn dữ liệu CSV tùy chỉnh trên máy chủ lưu trữ, trả về dữ liệu với yêu cầu trang, sau đó không trả về dữ liệu trong lần kiểm tra lại và không yêu cầu /upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c4c3d28d658795b3e.png /upload/asset/169e8dcdbf9c0c544png

qq89520Làm thế nào để cài đặt các tham số?

bài giảngMột số người cho rằng, nếu họ có thể đánh giá bất kỳ đồng nào, có lẽ cả cổ phiếu cũng có thể.

dsaidasi 666

Giấc mơ nhỏBạn cần phải có Python.

Sparta chơi theo định lượngLà một lỗi hệ thống kiểm tra lại, đã được khắc phục.

Giấc mơ nhỏCác thông tin về độ chính xác trong tài liệu API có thể được xem dưới đây.

Giấc mơ nhỏBạn cần phải hiểu bài viết, mã. Đây là về việc sử dụng tệp CSV làm nguồn dữ liệu để cung cấp dữ liệu cho hệ thống kiểm tra lại.

Giấc mơ nhỏXem mô tả trong tài liệu API.

WeixxDữ liệu tùy chỉnh sử dụng phương thức exchange.GetData (()) để kiểm tra lại có thể biến K-string thành dữ liệu tùy chỉnh không?

Giấc mơ nhỏDịch vụ cung cấp nguồn dữ liệu tùy chỉnh phải được đặt trên máy chủ, phải là IP mạng công cộng.

WeixxXin hỏi làm thế nào để xây dựng dữ liệu truy cập địa phương trên máy chủ http, liệu truy cập địa phương không hỗ trợ truy cập nguồn dữ liệu tùy chỉnh? Tôi đã thêm exchanges vào truy cập địa phương: [{ "eid":"Huobi","currency":"ETH_USDT","feeder":"http://127.0.0.1:9090"}] tham số này, và thay đổi IP của robot cũng không yêu cầu máy chủ

Giấc mơ nhỏDữ liệu quá lớn. Trang web không thể tải được, ngoài ra DEMO bạn đã nghiên cứu, nên không có vấn đề, tôi đoán bạn đã đặt sai.

WeixxTôi là dữ liệu csv một phút K dòng là dữ liệu của các loại tiền tệ khác, sau đó vì giao dịch không thể chọn tùy tiện khi quay lại, robot và sàn giao dịch chọn quay lại được thiết lập là huobi, giao dịch là BTC-USDT, dữ liệu yêu cầu này tôi đôi khi có thể nhận được yêu cầu từ phía robot, nhưng quay lại không thể lấy dữ liệu, và tôi đã thay đổi khung thời gian của csv từ giây thành millisecond cũng không thể lấy dữ liệu.

Giấc mơ nhỏCó yêu cầu nào về dữ liệu định nghĩa này không? Ví dụ như một phần thời gian và một phần giây có thể được xem không?

Giấc mơ nhỏCó thể sử dụng dữ liệu lớn, và tôi đã thử nghiệm điều này.

WeixxMột số lượng nhỏ dữ liệu có thể được lấy, nhưng khi tôi chỉ định một tập tin CSV với dữ liệu hơn một phút trong một năm, nó không thể được lấy, có quá nhiều dữ liệu có ảnh hưởng không?

WeixxTôi đang cấu hình trên robot của mình là một sàn giao dịch HUOBI, sau đó cặp giao dịch cũng được đặt là BTC-USDT, và cũng được cấu hình như vậy khi kiểm tra lại, sau đó mã được kiểm tra lại là sử dụng một chức năng exchange.GetRecords ((), liệu dữ liệu được định nghĩa này có yêu cầu không? ví dụ như thời gian phần millisecond và giây có thể được xem?

Giấc mơ nhỏBạn có thể ở cuối trình duyệt vì bạn đã chỉ định các tham số truy vấn, hệ thống trả lời không thể kích hoạt bot trả lời, cho biết robot không chấp nhận yêu cầu, giải thích rằng nơi đó đã được cấu hình sai khi trả lời, kiểm tra, debugging sẽ tìm thấy vấn đề.

Giấc mơ nhỏNếu bạn muốn đọc file CSV của riêng bạn, bạn có thể đặt đường dẫn cho file này.