Tangan mengajar anda untuk menaik taraf pengumpul data untuk mengesan semula fungsi sumber data tersuai

Penulis:Mimpi kecil, Dicipta: 2020-05-07 17:43:54, Dikemas kini: 2023-10-09 22:47:43

img

Tangan mengajar anda untuk menaik taraf pengumpul data untuk mengesan semula fungsi sumber data tersuai

Artikel TerdahuluTangan akan mengajar anda untuk mewujudkan pengumpul perdagangan.Bersama-sama kami melaksanakan program bot yang mengumpul pasaran, mengumpul data pasaran dan bagaimana ia akan digunakan seterusnya? Sudah tentu untuk sistem retargeting, di sini bergantung pada fungsi sumber data tersuai untuk sistem retargeting platform perdagangan kuantitatif pencipta, kita boleh menggunakan data yang dikumpulkan secara langsung sebagai sumber data sistem retargeting, supaya kita boleh menggunakan sistem retargeting untuk mana-mana pasaran yang kita ingin retargeting data sejarah.

Oleh itu, kita boleh meningkatkan "Pengumpul Perdagangan" untuk membolehkan pengumpul Perdagangan juga menyediakan data kepada sistem pengukuran semula sebagai sumber data tersuai.

Jika ada permintaan, mulakan!

Bersedia

Perancangan yang berbeza daripada artikel sebelum ini, kali terakhir adalah program pentadbir yang berjalan di komputer MAC tempatan saya, memasang pangkalan data mongodb untuk memulakan perkhidmatan pangkalan data. Kali ini kami menukar persekitaran operasi ke VPS dan menggunakan pelayan Ali Cloud Linux untuk menjalankan program kami.

  • Pangkalan data mongodb

    Seperti artikel sebelumnya, anda perlu memasang pangkalan data mongodb pada peranti yang menjalankan program pengumpul pasaran, dan membuka perkhidmatan tersebut. Pada asasnya, terdapat banyak tutorial di internet yang boleh dicari dan dilihat, sangat mudah.

  • Pemasangan Python 3 Program ini menggunakan bahasa python3 dan perhatikan bahawa beberapa pustaka digunakan, jika tidak perlu dipasang.

    • Pymongo
    • http
    • urllib
  • Pengurus Pengurus yang menjalankan platform perdagangan kuantiti pencipta.

Mengubah "Pengumpul Perdagangan"

Pengumpul dataRecordsCollecter (Pengajaran)Taktik ini. Kita akan buat beberapa perubahan: Sebelum program memasuki loop sementara untuk mengumpul data, perpustakaan pelbagai utas digunakan untuk menjalankan secara serentak untuk memulakan perkhidmatan yang digunakan untuk memantau permintaan data yang dibuat oleh inventor untuk mengutip sistem platform perdagangan kuantitatif. (Sesuatu perubahan lain boleh diabaikan)

RecordsCollector (peningkatan untuk menyediakan ciri sumber data tersuai)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

Ujian

Mengkonfigurasi robotimg

Mengendalikan robot, mengendalikan pengumpul duit.img

Buka satu strategi ujian, lakukan ujian semula, contohnya, strategi ujian semula seperti ini, uji semula.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

Memasang pilihan penyesuaian semula, menetapkan bursa sebagai Binance kerana sumber data khusus sementara belum dapat membuat nama bursa sendiri, hanya boleh meminjam konfigurasi bursa dalam senarai, ketika penyesuaian semula menunjukkan Binance, sebenarnya data wexApp analog.

img

Adakah carta yang dihasilkan oleh sistem pengumpul pasaran sebagai sumber data yang disesuaikan dengan grafik garis K 1 jam di halaman bursa wexApp?

img

img

Ini membolehkan bot di VPS mengumpul data K-line sendiri, dan kita boleh mendapatkan data yang dikumpulkan secara langsung dalam sistem retesting pada bila-bila masa. Selain itu, anda juga boleh terus mengembangkan, seperti menyokong pengesanan semula sumber data tersuai pada tahap cakera sebenar, menyokong pelbagai jenis, pengumpulan data pelbagai pasaran, dan sebagainya.

Selamat datang untuk komen.


Berkaitan

Lebih lanjut

wuzhentaoDan tidak ada perkhidmatan sumber data khusus yang menerima cetakan log yang diminta.

wuzhentaoApabila anda mengulas semula, pilih data tersuai, dan mulakan dengan transaksi terhadap data yang dipilih di bawah.

lcgs006Jika data pasangan mata wang yang tidak disediakan di atas dikumpulkan, bagaimana ia boleh dilakukan untuk mengulangi beberapa pasangan mata wang yang lebih kecil, seperti DOT_USDT, yang tidak dapat disesuaikan ketika diulangi?

ZltimPuncak

Tidak pernahJika anda memilih sumber data yang disesuaikan, adakah anda hanya menyokong satu pasangan transaksi?

Mimpi kecil"Pengumpul Peristiwa" yang disebut dalam catatan ini perlu dijalankan di pelayan; sebagai fungsi sumber data tersuai untuk FMZ Retesting System, berikan data; boleh dilakukan seperti yang dinyatakan dalam catatan ini, yang telah diuji.

wuzhentaoSemua, sekarang setelah alamat sumber data tersuai diisi, data semula di bawah tidak menunjukkan data baru. Apakah perkhidmatan yang diperlukan oleh vps?

Mimpi kecilSelepas menggunakan fungsi sumber data tersuai, anda juga perlu mengisi alamat perkhidmatan sumber data tersuai di sebelah kanan kawalan.

Mimpi kecilIni mesti berjalan pada pelayan yang mempunyai IP luar untuk halaman sistem penyesuaian dapat diakses.

Mimpi kecilJika anda tidak faham apa yang saya maksudkan, saya maksudkan data yang diberikan oleh sumber data tersuai anda seperti sebenarnya EOS_USDT, tetapi pada FMZ anda hanya boleh memilih pasangan dagangan seperti BTC_USDT, anda mengambil data sebenar EOS_USDT sebagai data yang diberikan oleh BTC_USDT kepada FMZ.

lcgs006Di mana ia perlu diganti, atau ada tutorial yang berkaitan?

Mimpi kecilTidak perlu berdagang dengan nama, anda boleh menggunakan data yang anda kumpulkan sebagai harga untuk menggantikannya.

Mimpi kecilProgram perkhidmatan ini boleh menulis beberapa transaksi yang berbeza untuk sumber data, yang akan dipanggil oleh sistem pengukuran sendiri.