Hướng dẫn cho bạn nâng cấp tính năng truy xuất dữ liệu tùy chỉnh cho bộ thu thập dữ liệu

Tác giả:Giấc mơ nhỏ, Tạo: 2020-05-07 17:43:54, Cập nhật: 2023-10-09 22:47:43

img

Hướng dẫn cho bạn nâng cấp tính năng truy xuất dữ liệu tùy chỉnh cho bộ thu thập dữ liệu

Bài trướcBàn tay hướng dẫn bạn thực hiện một bộ sưu tập thị trườngChúng tôi cùng nhau thực hiện một chương trình robot thu thập thị trường, thu thập dữ liệu thị trường và sử dụng nó như thế nào tiếp theo? Tất nhiên, cho hệ thống truy cập lại, nơi dựa trên tính năng nguồn dữ liệu tùy chỉnh của hệ thống truy cập lại của nhà phát minh, chúng tôi có thể trực tiếp đưa dữ liệu thu thập được thành nguồn dữ liệu của hệ thống truy cập lại, để chúng tôi có thể áp dụng hệ thống truy cập lại cho bất kỳ thị trường nào mà chúng tôi muốn truy cập lại dữ liệu lịch sử.

Do đó, chúng ta có thể nâng cấp cho "Công cụ thu thập dữ liệu thị trường" để các công cụ thu thập dữ liệu thị trường cũng có thể cung cấp dữ liệu cho hệ thống truy cập dữ liệu tùy chỉnh.

Nếu có nhu cầu, hãy bắt tay!

Sẵn sàng

Khác với bài viết trước, lần trước tôi đã cài đặt một chương trình quản trị chạy trên máy Mac của tôi và khởi động dịch vụ cơ sở dữ liệu mongodb.

  • Cơ sở dữ liệu mongodb

    Như bài viết trước, bạn cần cài đặt cơ sở dữ liệu mongodb trên thiết bị chạy chương trình thu thập dữ liệu và mở dịch vụ. Về cơ bản, cũng giống như cài đặt mongodb trên máy tính MAC, có rất nhiều hướng dẫn trực tuyến có thể tìm kiếm và xem, rất đơn giản.

  • Cài đặt Python 3 Các chương trình sử dụng ngôn ngữ python3, lưu ý rằng một số thư viện được sử dụng, nếu không cần cài đặt.

    • bạch tuộc
    • http
    • urllib
  • Người quản lý Một nhà phát minh có thể là người quản lý một nền tảng giao dịch định lượng.

Tái thiết kế bộ sưu tập thông tin

Người thu thập thông tinRecordsCollecter (giảng dạy)Chiến lược này. Chúng ta hãy thay đổi nó một chút: Trước khi chương trình đi vào vòng lặp trong khi thu thập dữ liệu, sử dụng thư viện nhiều luồng, đồng thời thực hiện khởi động một dịch vụ để nghe lén các yêu cầu dữ liệu của hệ thống truy vấn của nhà phát minh. (Một số thay đổi chi tiết khác có thể bị bỏ qua)

RecordsCollector (cải tiến để cung cấp tính năng nguồn dữ liệu tùy chỉnh)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Kiểm tra

Thiết lập robotimg

Những người tham gia vào cuộc họp này đã nói rằng họ sẽ không làm điều gì sai trái.img

Mở một chính sách kiểm tra, kiểm tra lại, ví dụ như chính sách kiểm tra lại như thế này, kiểm tra.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Tùy chọn cấu hình kiểm tra lại, đặt sàn giao dịch là Binance bởi vì nguồn dữ liệu tùy chỉnh tạm thời chưa thể tự tạo ra một tên sàn giao dịch, chỉ có thể mượn cấu hình của một sàn giao dịch trong danh sách, khi kiểm tra lại, hiển thị là Binance, thực sự là dữ liệu của WexApp Analog Disc.

img

Hệ thống kiểm tra so sánh liệu biểu đồ được tạo ra bởi bộ thu thập thị trường như một nguồn dữ liệu tùy chỉnh có giống với biểu đồ đường K 1 giờ trên trang giao dịch wexApp hay không.

img

img

Điều này có thể cho phép robot trên VPS tự thu thập dữ liệu K-line, và chúng tôi có thể truy cập dữ liệu thu thập được trực tiếp trong hệ thống kiểm tra. Ngoài ra, bạn có thể tiếp tục mở rộng, ví dụ như hỗ trợ truy cập dữ liệu tùy chỉnh ở mức đĩa thực, hỗ trợ nhiều loại, thu thập dữ liệu đa thị trường, v.v.

Chào mừng bạn để ý nhé.


Có liên quan

Thêm nữa

wuzhentaoVà dịch vụ cung cấp dữ liệu tùy chỉnh không nhận được bản in yêu cầu đăng ký này.

wuzhentaoKhi kiểm tra lại, chọn dữ liệu tùy chỉnh, bắt đầu với giao dịch đối với dữ liệu chọn ở dưới đây, làm thế nào để xử lý tình huống này?

lcgs006Nếu thu thập dữ liệu về các cặp tiền không được cung cấp ở trên, làm thế nào để thực hiện tính toán lại một số cặp tiền nhỏ, chẳng hạn như DOT_USDT, và các loại tiền không thể được tùy chỉnh khi tính toán lại?

zltimđỉnh

Không bao giờNếu bạn chọn một nguồn dữ liệu tùy chỉnh, liệu bạn có hỗ trợ một cặp giao dịch?

Giấc mơ nhỏCác công cụ này được thử nghiệm để cung cấp dữ liệu cho các hệ thống FMZ.

wuzhentaoCó, bây giờ sau khi điền vào địa chỉ nguồn dữ liệu tùy chỉnh, dữ liệu truy cập sau không hiển thị dữ liệu mới.

Giấc mơ nhỏSau khi sử dụng tính năng nguồn dữ liệu tùy chỉnh, bạn cũng cần phải điền địa chỉ dịch vụ của nguồn dữ liệu tùy chỉnh ở bên phải của điều khiển.

Giấc mơ nhỏKhối này phải chạy trên một máy chủ có IP ngoại tuyến để trang hệ thống truy cập có thể truy cập.

Giấc mơ nhỏBạn chưa hiểu ý tôi, ý tôi là dữ liệu mà nguồn dữ liệu tùy chỉnh của bạn cung cấp, ví dụ như thực tế là EOS_USDT, nhưng trên FMZ chỉ có thể chọn giao dịch như BTC_USDT, bạn lấy dữ liệu thực tế của EOS_USDT như dữ liệu của BTC_USDT được cung cấp cho FMZ.

lcgs006Bạn có thể tìm hiểu thêm về cách sử dụng các phương pháp thay thế, hoặc có hướng dẫn nào cho việc này không?

Giấc mơ nhỏKhông cần giao dịch cho tên, thay vào đó bạn có thể sử dụng giá dữ liệu là dữ liệu bạn thu thập.

Giấc mơ nhỏCác ứng dụng dịch vụ này có thể viết nhiều giao dịch khác nhau cho dữ liệu được cung cấp cho nguồn dữ liệu, và hệ thống trả lời sẽ tự gọi những gì cần thiết.