avatar of 发明者量化-小小梦 发明者量化-小小梦
집중하다 사신
4
집중하다
1271
수행원

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

만든 날짜: 2024-11-29 16:35:44, 업데이트 날짜: 2024-12-02 09:12:43
comments   0
hits   783

[TOC]

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

머리말

Inventor Quantitative Trading Platform의 백테스팅 시스템은 끊임없이 반복, 업데이트 및 업그레이드되는 백테스팅 시스템입니다. 초기 기본 백테스팅 기능에서 점진적으로 기능을 추가하고 성능을 최적화합니다. 플랫폼이 개발됨에 따라 백테스팅 시스템은 계속해서 최적화되고 업그레이드될 것입니다. 오늘은 백테스팅 시스템을 기반으로 한 주제인 “무작위 시장 상황에 기반한 전략 테스트”에 대해 논의하겠습니다.

필요

양적 거래 분야에서 전략 개발과 최적화는 실제 시장 데이터 검증과 분리될 수 없습니다. 그러나 실제 적용에서는 복잡하고 변화하는 시장 환경으로 인해 백테스팅을 위해 과거 데이터에 의존하는 데에는 극단적인 시장 상황이나 특수 시나리오를 다루기 어렵다는 단점이 있습니다. 따라서 효율적인 무작위 시장 생성기를 설계하는 것은 양적 전략 개발자에게 효과적인 도구가 됩니다.

특정 거래소나 통화에 대한 전략을 과거 데이터를 사용하여 백테스트해야 하는 경우 FMZ 플랫폼의 공식 데이터 소스를 백테스트에 사용할 수 있습니다. 때때로 우리는 완전히 “익숙하지 않은” 시장에서 전략이 어떻게 수행되는지 보고 싶어합니다. 이때 우리는 전략을 테스트하기 위해 일부 데이터를 “조작”할 수 있습니다.

무작위 시장 데이터를 사용하는 중요성은 다음과 같습니다.

  • 1. 전략의 견고성 평가 무작위 시장 생성기는 극심한 변동성, 낮은 변동성, 추세 시장, 변동성 있는 시장을 포함한 다양한 시장 시나리오를 만들어낼 수 있습니다. 이러한 시뮬레이션 환경에서 전략을 테스트하면 다양한 시장 상황에서도 전략의 성과가 안정적인지 평가하는 데 도움이 될 수 있습니다. 예를 들어:

해당 전략이 추세와 충격 전환에 적응할 수 있는가? 이 전략은 극단적인 시장 상황에서 상당한 손실을 초래할 수 있습니까?

  • 2. 전략의 잠재적인 약점을 파악하세요 비정상적인 시장 상황(가상의 블랙스완 사건 등)을 시뮬레이션하면 전략의 잠재적인 약점을 발견하고 개선할 수 있습니다. 예를 들어:

해당 전략이 특정 시장 구조에 지나치게 의존하고 있지는 않은가? 매개변수에 과도한 적합이 발생할 위험이 있습니까?

    1. 전략 매개변수 최적화 무작위로 생성된 데이터는 과거 데이터에만 전적으로 의존하지 않고도 전략 매개변수 조정을 위한 더 다양한 테스트 환경을 제공합니다. 이를 통해 보다 포괄적인 범위의 전략 매개변수가 가능해지고 과거 데이터의 특정 시장 패턴에 국한되는 것을 피할 수 있습니다.
    1. 역사적 데이터의 격차를 메우다 일부 시장(신흥 시장이나 소규모 통화로 거래되는 시장 등)에서는 과거 데이터만으로는 모든 가능한 시장 상황을 포괄하기에 충분하지 않을 수 있습니다. 무작위화 기법은 더욱 포괄적인 테스트를 용이하게 하기 위해 많은 양의 보충 데이터를 제공할 수 있습니다.
    1. 빠른 반복적 개발 빠른 테스트를 위해 무작위 데이터를 활용하면 실시간 시장 상황이나 시간이 많이 걸리는 데이터 정리 및 구성에 의존하지 않고도 전략 개발 반복 작업을 가속화할 수 있습니다.

그러나 전략을 합리적으로 평가하는 것도 필요합니다. 무작위로 생성된 시장 데이터의 경우 다음을 참고하세요.

  • 1. 무작위 시장 생성기는 유용하지만 그 중요성은 생성된 데이터의 품질과 대상 시나리오의 설계에 따라 달라집니다.
  • 2. 생성 논리는 실제 시장과 가까워야 합니다. 무작위로 생성된 시장 조건이 현실과 전혀 맞지 않으면 테스트 결과에 참조 가치가 부족할 수 있습니다. 예를 들어, 생성기는 실제 시장 통계적 특성(예: 변동성 분포, 추세 비율)과 결합하여 설계될 수 있습니다.
  • 3. 실제 데이터 테스트를 완전히 대체할 수 없습니다. 무작위 데이터는 전략의 개발 및 최적화를 보완할 수 있을 뿐입니다. 최종 전략은 여전히 ​​실제 시장 데이터에서 효과를 검증해야 합니다.

이 모든 것을 말한 후에, 우리는 어떻게 데이터를 “조작”할 수 있을까요? 백테스팅 시스템에서 사용할 데이터를 편리하고 빠르고 쉽게 “조작”하려면 어떻게 해야 합니까?

디자인 아이디어

이 글은 논의의 시작점을 제공하고 비교적 간단한 무작위 시장 생성 계산을 제공하기 위해 고안되었습니다. 사실, 적용할 수 있는 다양한 시뮬레이션 알고리즘, 데이터 모델 및 기타 기술이 있습니다. 논의의 제한된 공간으로 인해 우리는 특별히 복잡한 데이터 시뮬레이션 방법을 사용하지 않을 것입니다.

플랫폼 백테스팅 시스템의 사용자 정의 데이터 소스 기능을 결합하여 Python으로 프로그램을 작성했습니다.

  • 1. K-라인 데이터 세트를 무작위로 생성하여 지속적인 기록을 위해 CSV 파일에 기록한 후, 생성된 데이터를 저장할 수 있습니다.
  • 2. 그런 다음 백테스팅 시스템에 대한 데이터 소스 지원을 제공하는 서비스를 만듭니다.
  • 3. 생성된 K-라인 데이터를 차트에 표시합니다.

일부 생성 표준 및 K-라인 데이터 파일 저장의 경우 다음과 같은 매개변수 제어를 정의할 수 있습니다.

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

  • 무작위로 생성된 데이터 패턴 K-라인 데이터의 변동 유형을 시뮬레이션하기 위해 양수와 음수 난수의 다른 확률을 사용하여 간단한 설계를 수행합니다. 생성된 데이터가 크지 않으면 필요한 시장 패턴이 반영되지 않을 수 있습니다. 더 나은 방법이 있다면 코드의 이 부분을 바꿀 수 있습니다. 이러한 간단한 설계에 기초하여 난수 생성 범위와 코드의 일부 계수를 조정하면 생성되는 데이터의 효과에 영향을 미칠 수 있습니다.

  • 데이터 검증 생성된 K-라인 데이터의 합리성도 검토해야 하며, 최고 시가와 최저 종가가 정의에 위배되는지 확인하고 K-라인 데이터의 연속성을 검토해야 합니다.

백테스팅 시스템 랜덤 견적 생성기

import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse

arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]

def url2Dict(url):
    query = urlparse(url).query  
    params = parse_qs(query)  
    result = {key: params[key][0] for key in params}  
    return result

class Provider(BaseHTTPRequestHandler):
    def do_GET(self):
        global filePathForCSV, pround, vround, ct
        try:
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()

            dictParam = url2Dict(self.path)
            Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)            
            
            eid = dictParam["eid"]
            symbol = dictParam["symbol"]
            arrCurrency = symbol.split(".")[0].split("_")
            baseCurrency = arrCurrency[0]
            quoteCurrency = arrCurrency[1]
            fromTS = int(dictParam["from"]) * int(1000)
            toTS = int(dictParam["to"]) * int(1000)
            priceRatio = math.pow(10, int(pround))
            amountRatio = math.pow(10, int(vround))

            data = {
                "detail": {
                    "eid": eid,
                    "symbol": symbol,
                    "alias": symbol,
                    "baseCurrency": baseCurrency,
                    "quoteCurrency": quoteCurrency,
                    "marginCurrency": quoteCurrency,
                    "basePrecision": vround,
                    "quotePrecision": pround,
                    "minQty": 0.00001,
                    "maxQty": 9000,
                    "minNotional": 5,
                    "maxNotional": 9000000,
                    "priceTick": 10 ** -pround,
                    "volumeTick": 10 ** -vround,
                    "marginLevel": 10,
                    "contractType": ct
                },
                "schema" : ["time", "open", "high", "low", "close", "vol"],
                "data" : []
            }
            
            listDataSequence = []
            with open(filePathForCSV, "r") as f:
                reader = csv.reader(f)
                header = next(reader)
                headerIsNoneCount = 0
                if len(header) != len(data["schema"]):
                    Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
                    return 
                for ele in header:
                    for i in range(len(data["schema"])):
                        if data["schema"][i] == ele or ele == "":
                            if ele == "":
                                headerIsNoneCount += 1
                            if headerIsNoneCount > 1:
                                Log("CSV文件格式有误,请检查!", "#FF0000")
                                return 
                            listDataSequence.append(i)
                            break
                
                while True:
                    record = next(reader, -1)
                    if record == -1:
                        break
                    index = 0
                    arr = [0, 0, 0, 0, 0, 0]
                    for ele in record:
                        arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
                        index += 1
                    data["data"].append(arr)            
            Log("数据data.detail:", data["detail"], "响应回测系统请求。")
            self.wfile.write(json.dumps(data).encode())
        except BaseException as e:
            Log("Provider do_GET error, e:", e)
        return 

def createServer(host):
    try:
        server = HTTPServer(host, Provider)
        Log("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    except BaseException as e:
        Log("createServer error, e:", e)
        raise Exception("stop")

class KlineGenerator:
    def __init__(self, start_time, end_time, interval):
        self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
        self.interval = self._parse_interval(interval)
        self.timestamps = self._generate_time_series()

    def _parse_interval(self, interval):
        unit = interval[-1]
        value = int(interval[:-1])

        if unit == "m":
            return value * 60
        elif unit == "h":
            return value * 3600
        elif unit == "d":
            return value * 86400
        else:
            raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")

    def _generate_time_series(self):
        timestamps = []
        current_time = self.start_time
        while current_time <= self.end_time:
            timestamps.append(int(current_time.timestamp() * 1000))
            current_time += dt.timedelta(seconds=self.interval)
        return timestamps

    def generate(self, initPrice, trend_type="neutral", volatility=1):
        data = []
        current_price = initPrice
        angle = 0
        for timestamp in self.timestamps:
            angle_radians = math.radians(angle % 360)
            cos_value = math.cos(angle_radians)

            if trend_type == "down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "slow_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_down":
                upFactor = random.uniform(0, 0.5)
                change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
            elif trend_type == "sharp_up":
                downFactor = random.uniform(0, 0.5)
                change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
            elif trend_type == "narrow_range":
                change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
            elif trend_type == "wide_range":
                change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
            else:
                change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)

            change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
            open_price = current_price
            high_price = open_price + random.uniform(0, abs(change))
            low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
            close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)

            if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
                pass
            else:
                Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")

            high_price = max(high_price, open_price, close_price)
            low_price = min(low_price, open_price, close_price)

            base_volume = random.uniform(1000, 5000)
            volume = base_volume * (1 + abs(change) * 0.2)

            kline = {
                "Time": timestamp,
                "Open": round(open_price, 2),
                "High": round(high_price, 2),
                "Low": round(low_price, 2),
                "Close": round(close_price, 2),
                "Volume": round(volume, 2),
            }
            data.append(kline)
            current_price = close_price
            angle += 1
        return data

    def save_to_csv(self, filename, data):
        with open(filename, mode="w", newline="") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["", "open", "high", "low", "close", "vol"])
            for idx, kline in enumerate(data):
                writer.writerow(
                    [kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
                )
        
        Log("当前路径:", os.getcwd())
        with open("data.csv", "r") as file:
            lines = file.readlines()
            if len(lines) > 1:
                Log("文件写入成功,以下是文件内容的一部分:")
                Log("".join(lines[:5]))
            else:
                Log("文件写入失败,文件为空!")

def main():
    Chart({})
    LogReset(1)
    
    try:
        # _thread.start_new_thread(createServer, (("localhost", 9090), ))
        _thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
        Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
    except BaseException as e:
        Log("启动自定义数据源服务失败!")
        Log("错误信息:", e)
        raise Exception("stop")
    
    while True:
        cmd = GetCommand()
        if cmd:
            if cmd == "createRecords":
                Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
                generator = KlineGenerator(
                    start_time=startTime,
                    end_time=endTime,
                    interval=KLinePeriod,
                )
                kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
                generator.save_to_csv("data.csv", kline_data)
                ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
        LogStatus(_D())
        Sleep(2000)

백테스팅 시스템 연습

  1. 위의 정책 인스턴스를 생성하고 매개변수를 구성한 후 실행합니다.
  2. 실제 시장(전략 인스턴스)은 백테스팅 시스템이 액세스하고 데이터를 얻으려면 공용 IP가 필요하기 때문에 서버에 배포된 호스트에서 실행해야 합니다.
  3. 대화형 버튼을 클릭하면 전략이 자동으로 무작위 시장 데이터를 생성하기 시작합니다.

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의 랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

  1. 생성된 데이터는 쉽게 관찰할 수 있도록 차트에 표시되며, 데이터는 로컬 data.csv 파일에 기록됩니다.

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

  1. 이제 우리는 이 무작위로 생성된 데이터를 사용하고 백테스팅을 위한 모든 전략을 사용할 수 있습니다.

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/

위 정보에 따라 구성하고 구체적으로 조정하세요.http://xxx.xxx.xxx.xxx:9090이는 랜덤 마켓 생성 전략 실제 디스크의 서버 IP 주소와 오픈 포트입니다. 이것은 사용자 정의 데이터 소스입니다. 자세한 내용은 플랫폼 API 설명서의 사용자 정의 데이터 소스 섹션을 참조할 수 있습니다.

  1. 백테스트 시스템이 데이터 소스를 설정한 후 무작위 시장 데이터를 테스트할 수 있습니다.

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

랜덤 마켓 제너레이터 기반 전략 테스트 방법에 대한 논의

이 시점에서 백테스팅 시스템은 “조작된” 시뮬레이션 데이터를 사용하여 테스트됩니다. 백테스트 중 시장 차트의 데이터에 따르면, 무작위 시장 조건에서 생성된 실시간 차트의 데이터를 비교합니다. 시간은 2024년 10월 16일 17:00입니다. 데이터는 동일합니다.

  1. 오, 맞아요. 거의 말하는 걸 깜빡했어요! 이 무작위 시장 생성기 Python 프로그램이 실제 시장을 생성하는 이유는 생성된 K-라인 데이터의 시연, 작동 및 표시를 용이하게 하기 위해서입니다. 실제 응용 프로그램에서는 독립적인 Python 스크립트를 작성할 수 있으므로 실제 디스크를 실행할 필요가 없습니다.

전략 소스 코드:백테스팅 시스템 랜덤 견적 생성기

여러분의 지지와 독서에 감사드립니다.