손잡이 사용법을 통해 고객 데이터 소스 기능을 업그레이드하고 검색할 수 있습니다

저자:작은 꿈, 2020-05-07 17:43:54로 작성, 2023-10-09 22:47:43로 업데이트

img

손잡이 사용법을 통해 고객 데이터 소스 기능을 업그레이드하고 검색할 수 있습니다

지난 기사손잡이, 손잡이, 손잡이우리는 함께 시장을 수집하는 로봇 프로그램을 구현하고 시장을 수집한 데이터를 다음으로 어떻게 사용할 것인가? 물론 리코드 시스템에 사용, 여기에 발명자의 양적 거래 플랫폼 리코드 시스템의 사용자 지정 데이터 소스 기능에 의존하여, 우리는 수집된 데이터를 직접 리코드 시스템의 데이터 소스로 사용할 수 있습니다. 그래서 우리는 리코드 시스템을 우리가 역사 데이터를 리코드하려는 시장에 적용 할 수 있습니다.

따라서 우리는 "시장 수집기"를 업그레이드 할 수 있습니다! 시장 수집기가 사용자 정의 데이터 소스로 재검토 시스템에 데이터를 제공 할 수 있습니다.

이 글은 제주도 서귀포시 서귀포시 서귀포시 서귀포시 서귀포시

준비

이전 기사에서 준비하는 것과는 다르게, 마지막으로 내 내적 MAC 컴퓨터에서 실행되는 관리자 프로그램을 설치하고 mongodb 데이터베이스를 시작하여 데이터베이스 서비스를 시작했습니다. 이번에는 운영 환경을 VPS로 변경하여 알리 클라우드 리눅스 서버를 사용하여 프로그램을 실행했습니다.

  • mongodb 데이터베이스

    이전 기사와 마찬가지로, 시장 수집자 프로그램을 실행하는 장치에mongodb 데이터베이스를 설치하고 서비스를 열어야합니다. MAC 컴퓨터에mongodb를 설치하는 것과 기본적으로 매우 간단합니다.

  • Python 3를 설치합니다. 이 프로그램은 파이썬3 언어를 사용하며, 일부 라이브러리를 사용하지만 설치가 필요하지 않습니다.

    • 피몬고 (pymongo)
    • http
    • urllib
  • 관리자 이 모든 것은 발명가 양적 거래 플랫폼을 운영하는 관리자입니다.

"시장 수집기"를 개조합니다.

이 자료를 수집하는레코드 컬렉터 (교육)이 전략은.. 이 사진은 이 사진의 사진으로 찍혔습니다. 프로그램이 데이터를 수집하는 와일 루프에 들어가기 전에, 멀티 스레드 라이브러리를 사용하여 동시에 실행하여 서비스를 시작하여 발명자의 양적 거래 플랫폼의 응답 시스템에 대한 데이터 요청을 감시하는 데 사용됩니다. (다른 세부 변경 사항은 무시할 수 있습니다.)

RecordsCollector (개편 데이터 소스 기능을 제공하는 업그레이드)

import _thread
import pymongo
import json
import math
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
            
            # 目前回测系统只能从列表中选择交易所名称,在添加自定义数据源时,设置为币安,即:Binance
            exName = exchange.GetName()                                     
            # 注意,period为底层K线周期
            tabName = "%s_%s" % ("records", int(int(dictParam["period"]) / 1000))  
            priceRatio = math.pow(10, int(dictParam["round"]))
            amountRatio = math.pow(10, int(dictParam["vround"]))
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            
            
            # 连接数据库
            Log("连接数据库服务,获取数据,数据库:", exName, "表:", tabName)
            myDBClient = pymongo.MongoClient("mongodb://localhost:27017")
            ex_DB = myDBClient[exName]
            exRecords = ex_DB[tabName]
            
            
            # 要求应答的数据
            data = {
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            # 构造查询条件:大于某个值{'age': {'$gt': 20}} 小于某个值{'age': {'$lt': 20}}
            dbQuery = {"$and":[{'Time': {'$gt': fromTS}}, {'Time': {'$lt': toTS}}]}
            Log("查询条件:", dbQuery, "查询条数:", exRecords.find(dbQuery).count(), "数据库总条数:", exRecords.find().count())
            
            for x in exRecords.find(dbQuery).sort("Time"):
                # 需要根据请求参数round和vround,处理数据精度
                bar = [x["Time"], int(x["Open"] * priceRatio), int(x["High"] * priceRatio), int(x["Low"] * priceRatio), int(x["Close"] * priceRatio), int(x["Volume"] * amountRatio)]
                data["data"].append(bar)
            
            Log("数据:", data, "响应回测系统请求。")
            # 写入数据应答
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)


def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

def main():
    LogReset(1)
    exName = exchange.GetName()
    period = exchange.GetPeriod()
    Log("收集", exName, "交易所的K线数据,", "K线周期:", period, "秒")
    
    # 连接数据库服务,服务地址 mongodb://127.0.0.1:27017 具体看服务器上安装的mongodb设置
    Log("连接托管者所在设备mongodb服务,mongodb://localhost:27017")
    myDBClient = pymongo.MongoClient("mongodb://localhost:27017")   
    # 创建数据库
    ex_DB = myDBClient[exName]
    
    # 打印目前数据库表
    collist = ex_DB.list_collection_names()
    Log("mongodb ", exName, " collist:", collist)
    
    # 检测是否删除表
    arrDropNames = json.loads(dropNames)
    if isinstance(arrDropNames, list):
        for i in range(len(arrDropNames)):
            dropName = arrDropNames[i]
            if isinstance(dropName, str):
                if not dropName in collist:
                    continue
                tab = ex_DB[dropName]
                Log("dropName:", dropName, "删除:", dropName)
                ret = tab.drop()
                collist = ex_DB.list_collection_names()
                if dropName in collist:
                    Log(dropName, "删除失败")
                else :
                    Log(dropName, "删除成功")
    
    # 开启一个线程,提供自定义数据源服务
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))     # 本机测试
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))         # VPS服务器上测试
        Log("开启自定义数据源服务线程", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    # 创建records表
    ex_DB_Records = ex_DB["%s_%d" % ("records", period)]
    Log("开始收集", exName, "K线数据", "周期:", period, "打开(创建)数据库表:", "%s_%d" % ("records", period), "#FF0000")
    preBarTime = 0
    index = 1
    while True:
        r = _C(exchange.GetRecords)
        if len(r) < 2:
            Sleep(1000)
            continue
        if preBarTime == 0:
            # 首次写入所有BAR数据
            for i in range(len(r) - 1):
                bar = r[i]
                # 逐根写入,需要判断当前数据库表中是否已经有该条数据,基于时间戳检测,如果有该条数据,则跳过,没有则写入
                retQuery = ex_DB_Records.find({"Time": bar["Time"]})
                if retQuery.count() > 0:
                    continue
                
                # 写入bar到数据库表
                ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})                
                index += 1
            preBarTime = r[-1]["Time"]
        elif preBarTime != r[-1]["Time"]:
            bar = r[-2]
            # 写入数据前检测,数据是否已经存在,基于时间戳检测
            retQuery = ex_DB_Records.find({"Time": bar["Time"]})
            if retQuery.count() > 0:
                continue
            
            ex_DB_Records.insert_one({"High": bar["High"], "Low": bar["Low"], "Open": bar["Open"], "Close": bar["Close"], "Time": bar["Time"], "Volume": bar["Volume"]})
            index += 1
            preBarTime = r[-1]["Time"]
        LogStatus(_D(), "preBarTime:", preBarTime, "_D(preBarTime):", _D(preBarTime/1000), "index:", index)
        # 增加画图展示
        ext.PlotRecords(r, "%s_%d" % ("records", period))
        Sleep(10000)
        

테스트

로봇을 구성합니다img

이 모든 것은 로봇을 운영하고, 통행자료를 수집하는 것을 운영하는 것입니다.img

테스트 전략을 열고, 다시 테스트를 수행합니다. 예를 들어, 다시 테스트 전략을 열고, 다시 테스트하십시오.

function main() {
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords())
    Log(exchange.GetRecords().length)
}

설정 리모컨 옵션, 거래소를 비안으로 설정하는 것은 일시적으로 사용자 지정 데이터 소스가 자체적으로 거래소 이름을 만들 수 없기 때문에 목록의 거래소 설정을 빌릴 수 있기 때문에 리모컨을 설정하면 비안이 표시되며 실제로는 wexApp 모형 디스크의 데이터입니다.

img

비교 리코드 시스템은 트레이드 수집가에서 사용자 지정 데이터 소스 리코드로 생성된 차트와 wexApp 거래소 페이지의 1시간 K선 차트와 동일하는지 여부를 확인합니다.

img

img

이렇게 하면 VPS에 있는 로봇이 직접 K 라인 데이터를 수집할 수 있고, 우리는 언제든 수집된 데이터를 직접 검색 시스템에서 검색할 수 있습니다. 유추를 버리고, 당신은 또한 더 확장 할 수 있습니다. 예를 들어, 실제 디스크 수준에서 사용자 정의 데이터 소스를 검색하는 기능을 지원하고, 다양한 종류, 여러 시장 데이터 수집 등을 지원합니다.

이 글은 멘토와 함께 작성되었습니다.


관련

더 많은

타오이 로그를 인쇄해달라고 요청한 사용자 지정 데이터 소스 서비스가 수신되지 않았습니다.

타오다시 한번 확인해보면, 사용자 정의 데이터를 선택합니다. 아래에서 선택된 트랜잭션 대 데이터로 시작합니다.

lcgs006위와 같이 제공되지 않은 통화 쌍의 데이터를 수집한 경우, DOT_USDT와 같은 작은 종류의 통화 쌍을 리코딩하려면, 리코딩 시 통화가 커스터마이즈 될 수 없습니다.

zltim꼭대기

이제 절대다시 테스트를 통해 사용자 정의 데이터 소스를 선택하면 하나의 트랜잭션 쌍만을 지원합니까?

작은 꿈이 게시물에 언급된 "행위 수집기"를 서버에서 실행해야 합니다. FMZ 리코딩 시스템에 사용자 정의 데이터 소스 기능을 제공하여 데이터를 제공합니다. 게시물에 따라 수행 할 수 있습니다.

타오이 경우, 사용자 지정 데이터 소스 주소를 채운 후, 아래의 검색 데이터에는 새로운 데이터가 표시되지 않습니다.

작은 꿈사용자 지정 데이터 소스 기능을 사용 한 다음 오른쪽 컨트롤에서 사용자 지정 데이터 소스의 서비스 주소를 입력해야합니다.

작은 꿈이 디스크는 서버에서 외부 IP가 있어야 검색 시스템 페이지에 액세스 할 수 있습니다.

작은 꿈제가 무슨 말을 하는지 모르겠네요. 제가 말하는 것은 여러분의 사용자 지정 데이터 소스가 제공하는 데이터입니다. 예를 들어, 실제로는 EOS_USDT입니다. 하지만 FMZ에서는 BTC_USDT 같은 거래 쌍만 선택할 수 있습니다.

lcgs006이 글은 이 글에 대해 설명하고 있습니다.

작은 꿈이름과 같은 거래를 하지 않고, 데이터 대신 데이터를 수집할 수 있는 가격으로 사용할 수 있습니다.

작은 꿈데이터 소스에 제공되는 이 서비스 프로그램은 데이터에 제공되는 몇 가지 다른 트랜잭션을 여러 번 작성할 수 있으며, 회수 시스템은 필요한 것을 자체적으로 호출합니다.