Harga pasaran pengumpul menaik taraf lagi

Penulis:Kebaikan, Dicipta: 2020-05-26 14:25:15, Dikemas kini: 2023-11-02 19:53:34

img

Menyokong import fail format CSV untuk menyediakan sumber data tersuai

Baru-baru ini, seorang peniaga perlu menggunakan fail format CSV sendiri sebagai sumber data untuk sistem backtest platform FMZ. sistem backtest platform kami mempunyai banyak fungsi dan mudah dan cekap digunakan, sehingga selagi pengguna mempunyai data mereka sendiri, mereka boleh melakukan backtesting mengikut data ini, yang tidak lagi terhad kepada bursa dan varieti yang disokong oleh pusat data platform kami.

Idea reka bentuk

Idea reka bentuk sebenarnya sangat mudah. kita hanya perlu mengubahnya sedikit berdasarkan pengumpul pasaran sebelumnya. kita menambah parameterisOnlySupportCSVkepada pengumpul pasaran untuk mengawal sama ada hanya fail CSV digunakan sebagai sumber data untuk sistem backtest.filePathForCSVdigunakan untuk menetapkan laluan fail data CSV yang diletakkan di pelayan di mana robot pengumpul pasaran berjalan.isOnlySupportCSVParameter ditetapkan kepadaTrueuntuk memutuskan sumber data yang akan digunakan (dikumpulkan oleh anda sendiri atau data dalam fail CSV), perubahan ini adalah terutamanya dalamdo_GETFungsiProvider class.

Apakah fail CSV?

Nilai yang dipisahkan dengan koma, juga dikenali sebagai CSV, kadang-kadang dirujuk sebagai nilai yang dipisahkan dengan aksara, kerana aksara pemisah juga tidak boleh menjadi koma. Failnya menyimpan data jadual (nombor dan teks) dalam teks biasa. Teks biasa bermaksud bahawa fail adalah urutan aksara dan tidak mengandungi data yang mesti ditafsirkan seperti nombor binari. Fail CSV terdiri daripada mana-mana bilangan rekod, dipisahkan oleh beberapa watak baris baru; setiap rekod terdiri daripada medan, dan pemisah antara medan adalah watak atau rentetan lain, dan yang paling biasa adalah koma atau tab. Secara umum, semua rekod mempunyai urutan medan yang sama persis. Mereka biasanya fail teks biasa.WORDPADatauExceluntuk membuka.

Standard umum format fail CSV tidak wujud, tetapi terdapat peraturan tertentu, biasanya satu rekod setiap baris, dan baris pertama adalah tajuk. Data di setiap baris dipisahkan dengan koma.

Sebagai contoh, fail CSV yang kami gunakan untuk ujian dibuka dengan Notepad seperti ini:

img

Perhatikan bahawa baris pertama fail CSV adalah tajuk jadual.

,open,high,low,close,vol

Kami hanya perlu menganalisis dan menyusun data ini, dan kemudian membina ke dalam format yang diperlukan oleh sumber data tersuai sistem backtest. kod ini dalam artikel kami sebelumnya telah diproses, dan hanya perlu diubah suai sedikit.

Kod yang diubahsuai

import _thread
import pymongo
import json
import math
import csv
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global isOnlySupportCSV, filePathForCSV
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("The custom data source service receives the request,self.path:", self.path, "query parameter:", dictParam)
            
            # At present, the backtest system can only select the exchange name from the list. When adding a custom data source, set it to Binance, that is: Binance
            exName = exchange.GetName()                                     
            # Note that period is the bottom K-line period
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)

            # Request data
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            if isOnlySupportCSV:
                # Handle CSV reading, filePathForCSV path
                listDataSequence = []
                with open(filePathForCSV, "r") as f:
                    reader = csv.reader(f)
                    # Get table header
                    header = next(reader)
                    headerIsNoneCount = 0
                    if len(header) != len(data["schema"]):
                        Log("The CSV file format is wrong, the number of columns is different, please check!", "#FF0000")
                        return 
                    for ele in header:
                        for i in range(len(data["schema"])):
                            if data["schema"][i] == ele or ele == "":
                                if ele == "":
                                    headerIsNoneCount += 1
                                if headerIsNoneCount > 1:
                                    Log("The CSV file format is incorrect, please check!", "#FF0000")
                                    return 
                                listDataSequence.append(i)
                                break
                    
                    # Read content
                    while True:
                        record = next(reader, -1)
                        if record == -1:
                            break
                        index = 0
                        arr = [0, 0, 0, 0, 0, 0]
                        for ele in record:
                            arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                            index += 1
                        data["data"].append(arr)
                
                Log("data: ", data, "Respond to backtest system requests.")
                self.wfile.write(json.dumps(data).encode())
                return 
            
            # Connect to the database
            Log("Connect to the database service to obtain data, the database: ", exName, "table: ", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            # Construct query conditions: greater than a certain value {'age': {'$ gt': 20}} less than a certain value {'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("Query conditions: ", dbQuery, "Number of inquiries: ", exRecords.find(dbQuery).count(), "Total number of databases: ", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # Need to process data accuracy according to request parameters round and vround
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("data: ", data, "Respond to backtest system requests.")
            # Write data response
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    if (isOnlySupportCSV):
        try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))         # local test
            _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
            Log("Start the custom data source service thread, and the data is provided by the CSV file. ", "#FF0000")
        except BaseException as e:
            Log("Failed to start the custom data source service!")
            Log("Error message: ", e)
            raise Exception("stop")
        while True:
            LogStatus(_D(), "Only start the custom data source service, do not collect data!")
            Sleep(2000)
    
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("collect", exName, "Exchange K-line data,", "K line cycle:", period, "Second")
    
    # Connect to the database service, service address mongodb: //127.0.0.1: 27017 See the settings of mongodb installed on the server
    Log("Connect to the mongodb service of the hosting device, mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # Create a database
    ex_DB = myDBClient[exName]
    
    # Print the current database table
    collist = ex_DB.list_collection_names()
    Log("mongodb", exName, "collist:", collist)
    
    # Check if the table is deleted
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "delete:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "failed to delete")
                else :
                    Log(dropName, "successfully deleted")
    
    # Start a thread to provide a custom data source service
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # local test
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # Test on VPS server
        Log("Open the custom data source service thread", "#FF0000")
    except BaseException as e:
        Log("Failed to start the custom data source service!")
        Log("Error message:", e)
        raise Exception("stop")
    
    # Create the records table
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("Start collecting", exName, "K-line data", "cycle:", period, "Open (create) the database table:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # Write all BAR data for the first time
            for i in range(len(r) - 1):
                bar = r[i]
                # Write root by root, you need to determine whether the data already exists in the current database table, based on timestamp detection, if there is the data, then skip, if not write
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # Write bar to the database table
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # Check before writing data, whether the data already exists, based on time stamp detection
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # Increase drawing display
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)

Ujian berjalan

Pertama, kita mulakan robot pengumpul pasaran, kita tambahkan pertukaran kepada robot dan biarkan robot berjalan.

Konfigurasi parameter:

img img

Kemudian kita mencipta strategi ujian:

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
}

Strategi ini sangat mudah, hanya mendapatkan dan mencetak data K-line tiga kali.

Pada halaman backtest, tetapkan sumber data sistem backtest sebagai sumber data tersuai, dan isi alamat pelayan di mana robot pengumpul pasaran berjalan.

img

Klik untuk memulakan backtest, dan robot pengumpul pasaran menerima permintaan data:

img

Selepas strategi pelaksanaan sistem backtest selesai, carta garis K dihasilkan berdasarkan data garis K dalam sumber data.

img

Bandingkan data dalam fail:

img img

Harga pasaran pengumpul menaik taraf lagi


Berkaitan

Lebih lanjut