Pemasangan semula pengumpul perkara - menyokong import fail dalam format CSV untuk menyediakan sumber data tersuai

Penulis:Mimpi kecil, Dicipta: 2020-05-23 15:44:47, Dikemas kini: 2023-10-08 19:48:40

img

Pengumpul perkara menaik taraf semula keran untuk menyokong import fail dalam format CSV untuk menyediakan sumber data tersuai

Pengguna terkini memerlukan fail CSV mereka sendiri sebagai sumber data, untuk membolehkan pencipta menggunakan sistem retest platform dagangan kuantitatif. Pencipta menggunakan pelbagai fungsi sistem retest platform dagangan kuantitatif, menggunakan ringkas dan cekap, sehingga mereka boleh melakukan retest selagi mereka mempunyai data, dan tidak lagi terhad kepada pertukaran yang disokong pusat data platform.

Idea Reka Bentuk

Dan idea reka bentuknya sangat mudah, hanya dengan sedikit perubahan pada pengumpul pasaran sebelum ini, kita boleh menambah parameter untuk pengumpul pasaran.isOnlySupportCSVUntuk mengawal sama ada hanya menggunakan fail CSV sebagai sumber data yang diberikan kepada sistem penjumlahan, tambah satu parameterfilePathForCSV, yang digunakan untuk menyediakan laluan untuk meletakkan fail data CSV di pelayan yang dikendalikan oleh bot pengumpul pasaran.isOnlySupportCSVAdakah parameter ditetapkan sebagaiTrueUntuk membuat keputusan mengenai sumber data yang akan digunakan (data yang dikumpulkan sendiri, data dalam fail CSV), perubahan ini adalah sebahagian besarnya disebabkan oleh faktor-faktor yang tidak diketahui.ProviderKelasdo_GETDalam fungsi tersebut.

Apakah fail CSV?

Comma-Separated Values (CSV, kadang-kadang juga dikenali sebagai nilai pemisahan karakter kerana pemisahan karakter juga boleh bukan koma) adalah fail yang menyimpan data jadual dalam bentuk teks murni (nombor dan teks). Teks tulen bermaksud fail itu adalah satu siri karakter, tidak mengandungi data yang mesti difahami seperti nombor binari. Fail CSV terdiri daripada rekod tujuan apa-apa, dengan beberapa tanda gantian yang dipisahkan; setiap rekod terdiri daripada bidang, dengan tanda pemisahan pertalian adalah watak atau rentetan lain, yang paling biasa adalah koma atau tanda pertalian.

Tidak ada standard umum untuk format fail CSV, tetapi terdapat satu peraturan, biasanya untuk satu baris rekod, tajuk tindakan pertama; data dalam setiap baris menggunakan jarak koma.

Sebagai contoh, fail CSV yang kami gunakan untuk ujian dibuka dengan buku catatan seperti ini:img

Perhatikan bahawa baris pertama fail CSV adalah tajuk borang.

,open,high,low,close,vol

Kami akan mengurutkan data ini dan membina format untuk membuat permintaan sumber data yang disesuaikan dengan sistem penyesuaian, yang telah diuruskan dalam kod kami dalam artikel sebelumnya, hanya dengan sedikit perubahan.

Kod yang diubahsuai

import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global isOnlySupportCSV, filePathForCSV
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)

            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            if isOnlySupportCSV:
                # 处理CSV读取,filePathForCSV路径
                listDataSequence = []
                with open(filePathForCSV, "r") as f:
                    reader = csv.reader(f)
                    # 获取表头
                    header = next(reader)
                    headerIsNoneCount = 0
                    if len(header) != len(data["schema"]):
                        Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                        return 
                    for ele in header:
                        for i in range(len(data["schema"])):
                            if data["schema"][i] == ele or ele == "":
                                if ele == "":
                                    headerIsNoneCount += 1
                                if headerIsNoneCount > 1:
                                    Log("CSV文件格式有误,请检查!", "#FF0000")
                                    return 
                                listDataSequence.append(i)
                                break
                    
                    # 读取内容
                    while True:
                        record = next(reader, -1)
                        if record == -1:
                            break
                        index = 0
                        arr = [0, 0, 0, 0, 0, 0]
                        for ele in record:
                            arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                            index += 1
                        data["data"].append(arr)
                
                Log("数据:", data, "响应回测系统请求。")
                self.wfile.write(json.dumps(data).encode())
                return 
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    if (isOnlySupportCSV):
        try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))         # 本机测试
            _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
            Log("开启自定义数据源服务线程,数据由CSV文件提供。", "#FF0000")
        except BaseException as e:
            Log("启动自定义数据源服务失败!")
            Log("错误信息:", e)
            raise Exception("stop")
        while True:
            LogStatus(_D(), "只启动自定义数据源服务,不收集数据!")
            Sleep(2000)
    
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Percubaan berjalan

Pertama, kita mulakan robot pengumpul pasaran, kita tambah pertukaran kepada robot, dan robot itu berjalan. Perisian:img

img

Kemudian kami mencipta satu strategi ujian:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
}

Rancangan ini sangat mudah, hanya mengambil dan mencetak tiga baris data K.

Halaman retargeting, sumber data sistem retargeting ditetapkan sebagai sumber data tersuai, dan alamat mengisi alamat pelayan yang dikendalikan oleh bot pengumpul pasaran. Oleh kerana data dalam fail CSV kami adalah 1 minit K baris. Oleh itu, ketika retargeting, kita menetapkan kitaran K baris 1 minit.

img

Apabila anda mengklik untuk mula mengesan semula, robot pengumpul pasaran menerima permintaan data:img

Apabila strategi pelaksanaan sistem pemeriksa selesai, grafik baris K dihasilkan berdasarkan data baris K dalam sumber data.img

Perbandingan data dalam fail:img

img

RecordsCollector (dipertingkatkan untuk menyediakan ciri sumber data tersuai, menyokong sumber fail data CSV)

Pergilah ke sini dan terima kasih untuk komen anda.


Berkaitan

Lebih lanjut

Berjaya.Adakah Python perlu dipasang pada pelayan hos?

Spada bermain kuantitatifSekarang, sumber data tersuai ini diulang semula pada penyemak imbas, dan data yang tidak tepat, anda boleh cuba.

AiKPM-/upload/asset/19cfcf5244f5e2cd73173.png /upload/asset/19c100ceb1eb25a38a970.png Hubungi bot, bagaimana ia akan mengisi alamat di sana, alamat pelayan yang saya masukkan port 9090 pengumpul tidak bertindak balas.

weixxSila tanyakan mengapa saya telah menetapkan sumber data CSV tersuai di pelayan yang dihoskan, dengan permintaan halaman yang mempunyai pulangan data, dan kemudian tidak ada pulangan data dalam pemeriksaan semula, apabila data ditetapkan secara langsung sebagai hanya dua data, terminal pelayan https dapat menerima permintaan, /upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c4c3d286587b3e.ng /upload/asset/169e8dcdbf9c0c544pbac8.png

weixxSila tanyakan mengapa saya telah menetapkan sumber data CSV tersuai di pelayan yang dihoskan, dengan permintaan halaman yang mempunyai pulangan data, dan kemudian tidak ada pulangan data dalam pemeriksaan semula, dan tidak ada permintaan ke terminal pelayan https://upload/asset/1691b2d9549fcec81c12a.png /upload/asset/168f8050b3db4a84d7e2f.png /upload/asset/16a67eaa598e95b11edb9.png /upload/asset/169c6c4c3d28d658795b3e.png /upload/asset/169e8ddbf9c0c544png

qq89520Saya ingin tahu bagaimana anda menetapkan parameter.

KhotbahDi sini, kita akan melihat bagaimana harga saham akan meningkat, dan bagaimana harga saham akan meningkat.

Dsaidasi 666

Mimpi kecilPerisian Python diperlukan.

Spada bermain kuantitatifIni adalah bug sistem yang telah diperbaiki.

Mimpi kecilUntuk maklumat mengenai ketepatan dalam dokumentasi API, anda boleh cuba lihat di bawah.

Mimpi kecilUntuk memahami artikel, kod. Ini adalah mengenai menggunakan fail CSV sebagai sumber data untuk memberikan data kepada sistem pengukuran semula.

Mimpi kecilLihat penerangan dalam dokumentasi API.

weixxData khusus menggunakan kaedah exchange.GetData ((), untuk mengulangi apakah K baris boleh menjadi data khusus?

Mimpi kecilPerkhidmatan yang menyediakan sumber data tersuai mesti berada di pelayan dan mestilah IP awam. Sistem pemindaian perkhidmatan tempatan tidak dapat diakses.

weixxSila tanyakan bagaimana saya boleh membina semula data tempatan di terminal HTTP, adakah semula tempatan tidak menyokong semula sumber data tersuai? Saya menambah pertukaran dalam pemulihan tempatan: [{ "eid":" Huobi","currency":"ETH_USDT","feeder":"http://127.0.0.1:9090"}] Parameter ini, serta IP bot yang diubah, juga tidak meminta kepada pelayan

Mimpi kecilData yang terlalu banyak. Halaman web tidak dapat dimuatkan, selain itu DEMO anda telah meneliti, seharusnya tidak ada masalah, saya kira anda telah menetapkannya dengan salah.

weixxSaya data csv adalah satu minit K baris adalah data mata wang lain, dan kemudian kerana pasangan dagangan tidak boleh dipilih secara rawak pada masa retargeting, bot dan pertukaran yang dipilih retargeting ditetapkan sebagai huobi, pasangan dagangan adalah BTC-USDT, data permintaan ini saya kadang-kadang dapat menerima permintaan di pihak bot, tetapi retargeting tidak dapat mendapatkan data, dan saya mengubah garis masa csv dari saat ke detik juga tidak dapat memperoleh data.

Mimpi kecilPerdagangan terhadap BTC_USDT, yang mana yang anda maksudkan secara khusus? Adakah data yang ditakrifkan ini diperlukan?

Mimpi kecilData yang besar juga boleh, saya telah menguji semasa ujian.

weixxData yang sedikit boleh diambil, tetapi apabila saya menetapkan fail CSV dengan data lebih dari satu minit setahun dan tidak dapat diambil, adakah data yang terlalu besar akan memberi kesan?

weixxPada bot yang saya buat sekarang adalah pertukaran HUOBI, maka pasangan dagangan yang juga ditetapkan adalah BTC-USDT, yang juga dikonfigurasi ketika diulang, dan kod yang diulang adalah menggunakan fungsi exchange.GetRecords ((), adakah data yang ditakrifkan ini diperlukan?

Mimpi kecilAnda boleh berada di hujung penyemak imbas kerana parameter pertanyaan yang anda tentukan, sistem tindak balas tidak dapat mencetuskan Robot menjawab, menunjukkan bahawa bot tidak menerima permintaan, menjelaskan bahawa tempat itu dikonfigurasi dengan salah semasa tindak balas, memeriksa, debugging dan mencari masalah.

Mimpi kecilJika anda ingin membaca fail CSV anda sendiri, anda boleh menetapkan laluan untuk fail ini, seperti yang ditunjukkan di bawah.