手がかりは,行事収集器にリクエストをアップグレードするために,カスタマイズされたデータソースの機能を教えます

作者: リン・ハーン小さな夢, 作成日:2020-05-07 17:43:54, 更新日:2023-10-09 22:47:43

img

手がかりは,行事収集器にリクエストをアップグレードするために,カスタマイズされたデータソースの機能を教えます

前回の記事手をつなぐと 市場収集機を 実現する方法を教える市場データを収集し,次にどう使うのかという市場データを収集するロボットプログラムを実装しました. もちろん,リトークシステムには,発明者の量化取引プラットフォームリトークシステムのカスタムデータソース機能に依存しています. 収集したデータをリトークシステムのデータソースとして直接使って,リトークシステムをリトークしたい市場に適用することができます.

市場収集器は,自作データ源として,反測システムにデータを提供することもできます.

需要があるなら,行動!

準備する

前回の記事の準備とは違って,最後に私の本来のMAC上で実行されている管理者プログラムで,mongodbデータベースをインストールしてデータベースサービスを起動しました.今回は,実行環境をVPSに切り替えて,Ali Cloud Linuxサーバーを使用して,私たちのプログラムを実行しました.

  • mongodbデータベース

    前回の記事と同様に,市場収集プログラムを実行しているデバイスにmongodbデータベースをインストールし,サービスを開く必要があります. マックコンピュータにmongodbをインストールする基本的には,オンラインで多くのチュートリアルがあり,検索して見ることができます. とても簡単です.

  • Python3をインストールする Python3言語を使用しているため,いくつかのライブラリを使用していることに注意してください.

    • パイモンゴ
    • http
    • urllib
  • 管理者 発明者による量化取引プラットフォームの管理者.

市場収集機を改造する

行動収集器はRecordsCollecter (教育)この戦略は. このページは,このページのページのページのページのページです. プログラムがデータ収集のwhileループに入る前に,複数のスレッドバリエーを使用し,同時に実行して,発明者の量化取引プラットフォームの復習システムからのデータ要求を監視するためのサービスを起動する. (他の細かい変更は無視できます)

RecordsCollector (アップグレードによりカスタマイズされたデータソース機能)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

テスト

ロボットの配置img

ロボットや行動収集機を操作する.img

試行錯誤を繰り返します. 試行錯誤を繰り返します. 試行錯誤を繰り返します.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

設定リセットオプション,交換所をBinanに設定する. 暫定的なカスタマイズデータソースが自社で取引所名を作成できないため,リスト内のある取引所の設定を借りることができる. リセット時に表示されるのはBinanであり,実際にはwexApp模擬盤のデータである.

img

比較回線システムは,市場収集機によって,カスタムデータソース回線として生成されたチャートと,wexApp取引所のページ上の1時間K線チャートの同一かどうかを比較する.

img

img

この方法により,VPS上のロボットがK線データを自力で収集し,私たちがいつでも収集したデータを直接復習システムで復習することができます. フォローアップは,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex へのアクセス,Forex

投稿を歓迎します.


関連性

もっと

武蔵野このログのプリントを頼むのは のカスタマイズされたデータソースサービスが 受け取っていないことです

武蔵野復習時に,カスタマイズされたデータを選択します. 選択したデータに対しての取引から始めます.

lcgs006上記に提供されていない通貨対のデータを収集した場合は,DOT_USDTのような小さな種類の通貨対を回計する場合は,回計時に通貨がカスタマイズできない場合は,どのように実現しますか?

スルティムトップ

今も決して復習がカスタマイズされたデータ源を選択すると,トランザクションペアのみがサポートされますか?

小さな夢この投稿で言及した"行事収集器"をサーバー上で実行する必要があります. FMZ回線システムにデータ源をカスタマイズする機能として,データを提供します.

武蔵野VPS は,どのサービスが必要なのか? VPS は,どのサービスが必要なのか?

小さな夢カスタムデータソースの機能を使用した後,右側のコントロールにカスタムデータソースのサービスアドレスを入力する必要があります.

小さな夢このディスクは,サーバーに外部IPが搭載されている状態で実行されなければ,復習システムページにアクセスできない.

小さな夢このEOS_USDTの実際のデータを,BTC_USDTがFMZへの返信システムとして提供します. 返信時にBTC_USDTが表示されますが,それは問題ではありません. データ価格はEOSです.

lcgs006代替手段はどこにあるのか? 関連教科書はあるのか?

小さな夢名前で取引するのではなく,データで代わって,あなたが収集したデータで価格を設定できます.

小さな夢このサービスプログラムは,データソースに提供されるデータに対して,いくつかの異なるトランザクションを書き込み,回測システムが必要に応じて呼び出すことができます.