[TOC]

Inventor Quantitative Trading Platform의 백테스팅 시스템은 끊임없이 반복, 업데이트 및 업그레이드되는 백테스팅 시스템입니다. 초기 기본 백테스팅 기능에서 점진적으로 기능을 추가하고 성능을 최적화합니다. 플랫폼이 개발됨에 따라 백테스팅 시스템은 계속해서 최적화되고 업그레이드될 것입니다. 오늘은 백테스팅 시스템을 기반으로 한 주제인 “무작위 시장 상황에 기반한 전략 테스트”에 대해 논의하겠습니다.
양적 거래 분야에서 전략 개발과 최적화는 실제 시장 데이터 검증과 분리될 수 없습니다. 그러나 실제 적용에서는 복잡하고 변화하는 시장 환경으로 인해 백테스팅을 위해 과거 데이터에 의존하는 데에는 극단적인 시장 상황이나 특수 시나리오를 다루기 어렵다는 단점이 있습니다. 따라서 효율적인 무작위 시장 생성기를 설계하는 것은 양적 전략 개발자에게 효과적인 도구가 됩니다.
특정 거래소나 통화에 대한 전략을 과거 데이터를 사용하여 백테스트해야 하는 경우 FMZ 플랫폼의 공식 데이터 소스를 백테스트에 사용할 수 있습니다. 때때로 우리는 완전히 “익숙하지 않은” 시장에서 전략이 어떻게 수행되는지 보고 싶어합니다. 이때 우리는 전략을 테스트하기 위해 일부 데이터를 “조작”할 수 있습니다.
무작위 시장 데이터를 사용하는 중요성은 다음과 같습니다.
해당 전략이 추세와 충격 전환에 적응할 수 있는가? 이 전략은 극단적인 시장 상황에서 상당한 손실을 초래할 수 있습니까?
해당 전략이 특정 시장 구조에 지나치게 의존하고 있지는 않은가? 매개변수에 과도한 적합이 발생할 위험이 있습니까?
그러나 전략을 합리적으로 평가하는 것도 필요합니다. 무작위로 생성된 시장 데이터의 경우 다음을 참고하세요.
이 모든 것을 말한 후에, 우리는 어떻게 데이터를 “조작”할 수 있을까요? 백테스팅 시스템에서 사용할 데이터를 편리하고 빠르고 쉽게 “조작”하려면 어떻게 해야 합니까?
이 글은 논의의 시작점을 제공하고 비교적 간단한 무작위 시장 생성 계산을 제공하기 위해 고안되었습니다. 사실, 적용할 수 있는 다양한 시뮬레이션 알고리즘, 데이터 모델 및 기타 기술이 있습니다. 논의의 제한된 공간으로 인해 우리는 특별히 복잡한 데이터 시뮬레이션 방법을 사용하지 않을 것입니다.
플랫폼 백테스팅 시스템의 사용자 정의 데이터 소스 기능을 결합하여 Python으로 프로그램을 작성했습니다.
일부 생성 표준 및 K-라인 데이터 파일 저장의 경우 다음과 같은 매개변수 제어를 정의할 수 있습니다.

무작위로 생성된 데이터 패턴 K-라인 데이터의 변동 유형을 시뮬레이션하기 위해 양수와 음수 난수의 다른 확률을 사용하여 간단한 설계를 수행합니다. 생성된 데이터가 크지 않으면 필요한 시장 패턴이 반영되지 않을 수 있습니다. 더 나은 방법이 있다면 코드의 이 부분을 바꿀 수 있습니다. 이러한 간단한 설계에 기초하여 난수 생성 범위와 코드의 일부 계수를 조정하면 생성되는 데이터의 효과에 영향을 미칠 수 있습니다.
데이터 검증 생성된 K-라인 데이터의 합리성도 검토해야 하며, 최고 시가와 최저 종가가 정의에 위배되는지 확인하고 K-라인 데이터의 연속성을 검토해야 합니다.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)



/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
위 정보에 따라 구성하고 구체적으로 조정하세요.http://xxx.xxx.xxx.xxx:9090이는 랜덤 마켓 생성 전략 실제 디스크의 서버 IP 주소와 오픈 포트입니다.
이것은 사용자 정의 데이터 소스입니다. 자세한 내용은 플랫폼 API 설명서의 사용자 정의 데이터 소스 섹션을 참조할 수 있습니다.


이 시점에서 백테스팅 시스템은 “조작된” 시뮬레이션 데이터를 사용하여 테스트됩니다. 백테스트 중 시장 차트의 데이터에 따르면, 무작위 시장 조건에서 생성된 실시간 차트의 데이터를 비교합니다. 시간은 2024년 10월 16일 17:00입니다. 데이터는 동일합니다.
전략 소스 코드:백테스팅 시스템 랜덤 견적 생성기
여러분의 지지와 독서에 감사드립니다.