[TOC]

Sistem pengujian ulang dari Inventor Quantitative Trading Platform adalah sistem pengujian ulang yang terus-menerus beriterasi, diperbarui, dan ditingkatkan. Dari fungsi pengujian ulang dasar awal, sistem ini secara bertahap menambahkan fungsi dan mengoptimalkan kinerja. Seiring dengan berkembangnya platform, sistem pengujian ulang akan terus dioptimalkan dan ditingkatkan. Hari ini kita akan membahas topik yang didasarkan pada sistem pengujian ulang: “Pengujian strategi berdasarkan kondisi pasar acak”.
Di bidang perdagangan kuantitatif, pengembangan dan optimalisasi strategi tidak dapat dipisahkan dari verifikasi data pasar nyata. Namun, dalam aplikasi sesungguhnya, karena lingkungan pasar yang kompleks dan berubah, mengandalkan data historis untuk pengujian ulang mungkin memiliki kekurangan, seperti kurangnya cakupan kondisi pasar yang ekstrem atau skenario khusus. Oleh karena itu, merancang generator pasar acak yang efisien menjadi alat yang efektif bagi pengembang strategi kuantitatif.
Saat kita perlu menguji ulang strategi di bursa atau mata uang tertentu menggunakan data historis, kita dapat menggunakan sumber data resmi platform FMZ untuk pengujian ulang. Terkadang kita juga ingin melihat bagaimana suatu strategi bekerja di pasar yang sama sekali “asing”. Pada saat ini, kita dapat “membuat” beberapa data untuk menguji strategi tersebut.
Pentingnya penggunaan data pasar acak adalah:
Bisakah strategi beradaptasi dengan tren dan peralihan guncangan? Akankah strategi ini mengakibatkan kerugian besar dalam kondisi pasar yang ekstrem?
Apakah strategi tersebut terlalu bergantung pada struktur pasar tertentu? Apakah ada risiko overfitting parameter?
Namun, strategi tersebut juga perlu dievaluasi secara rasional. Untuk data pasar yang dihasilkan secara acak, harap perhatikan:
Setelah mengatakan semua itu, bagaimana kita bisa “memalsukan” beberapa data? Bagaimana kita dapat dengan mudah, cepat dan nyaman “membuat” data untuk digunakan dalam sistem pengujian ulang?
Artikel ini dirancang untuk memberikan titik awal diskusi dan memberikan perhitungan pembangkitan pasar acak yang relatif sederhana. Faktanya, ada berbagai algoritma simulasi, model data, dan teknologi lain yang dapat diterapkan. Karena ruang diskusi yang terbatas , kami tidak akan menggunakan metode simulasi data yang sangat rumit.
Menggabungkan fungsi sumber data khusus dari sistem pengujian ulang platform, kami menulis sebuah program dalam Python.
Untuk beberapa standar pembangkitan dan penyimpanan file data K-line, kontrol parameter berikut dapat ditetapkan:

Pola data yang dihasilkan secara acak Untuk simulasi jenis fluktuasi data K-line, desain sederhana dilakukan dengan menggunakan probabilitas angka acak positif dan negatif yang berbeda. Jika data yang dihasilkan tidak besar, pola pasar yang dibutuhkan mungkin tidak tercermin. Jika ada cara yang lebih baik, Anda dapat mengganti bagian kode ini. Berdasarkan desain sederhana ini, penyesuaian rentang pembangkitan bilangan acak dan beberapa koefisien dalam kode dapat memengaruhi efek data yang dihasilkan.
Verifikasi data Data K-line yang dihasilkan juga perlu diperiksa rasionalitasnya, untuk memeriksa apakah harga pembukaan tinggi dan harga penutupan rendah melanggar definisi, untuk memeriksa kontinuitas data K-line, dll.
import _thread
import json
import math
import csv
import random
import os
import datetime as dt
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
arrTrendType = ["down", "slow_up", "sharp_down", "sharp_up", "narrow_range", "wide_range", "neutral_random"]
def url2Dict(url):
query = urlparse(url).query
params = parse_qs(query)
result = {key: params[key][0] for key in params}
return result
class Provider(BaseHTTPRequestHandler):
def do_GET(self):
global filePathForCSV, pround, vround, ct
try:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
dictParam = url2Dict(self.path)
Log("自定义数据源服务接收到请求,self.path:", self.path, "query 参数:", dictParam)
eid = dictParam["eid"]
symbol = dictParam["symbol"]
arrCurrency = symbol.split(".")[0].split("_")
baseCurrency = arrCurrency[0]
quoteCurrency = arrCurrency[1]
fromTS = int(dictParam["from"]) * int(1000)
toTS = int(dictParam["to"]) * int(1000)
priceRatio = math.pow(10, int(pround))
amountRatio = math.pow(10, int(vround))
data = {
"detail": {
"eid": eid,
"symbol": symbol,
"alias": symbol,
"baseCurrency": baseCurrency,
"quoteCurrency": quoteCurrency,
"marginCurrency": quoteCurrency,
"basePrecision": vround,
"quotePrecision": pround,
"minQty": 0.00001,
"maxQty": 9000,
"minNotional": 5,
"maxNotional": 9000000,
"priceTick": 10 ** -pround,
"volumeTick": 10 ** -vround,
"marginLevel": 10,
"contractType": ct
},
"schema" : ["time", "open", "high", "low", "close", "vol"],
"data" : []
}
listDataSequence = []
with open(filePathForCSV, "r") as f:
reader = csv.reader(f)
header = next(reader)
headerIsNoneCount = 0
if len(header) != len(data["schema"]):
Log("CSV文件格式有误,列数不同,请检查!", "#FF0000")
return
for ele in header:
for i in range(len(data["schema"])):
if data["schema"][i] == ele or ele == "":
if ele == "":
headerIsNoneCount += 1
if headerIsNoneCount > 1:
Log("CSV文件格式有误,请检查!", "#FF0000")
return
listDataSequence.append(i)
break
while True:
record = next(reader, -1)
if record == -1:
break
index = 0
arr = [0, 0, 0, 0, 0, 0]
for ele in record:
arr[listDataSequence[index]] = int(ele) if listDataSequence[index] == 0 else (int(float(ele) * amountRatio) if listDataSequence[index] == 5 else int(float(ele) * priceRatio))
index += 1
data["data"].append(arr)
Log("数据data.detail:", data["detail"], "响应回测系统请求。")
self.wfile.write(json.dumps(data).encode())
except BaseException as e:
Log("Provider do_GET error, e:", e)
return
def createServer(host):
try:
server = HTTPServer(host, Provider)
Log("Starting server, listen at: %s:%s" % host)
server.serve_forever()
except BaseException as e:
Log("createServer error, e:", e)
raise Exception("stop")
class KlineGenerator:
def __init__(self, start_time, end_time, interval):
self.start_time = dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
self.end_time = dt.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
self.interval = self._parse_interval(interval)
self.timestamps = self._generate_time_series()
def _parse_interval(self, interval):
unit = interval[-1]
value = int(interval[:-1])
if unit == "m":
return value * 60
elif unit == "h":
return value * 3600
elif unit == "d":
return value * 86400
else:
raise ValueError("不支持的K线周期,请使用 'm', 'h', 或 'd'.")
def _generate_time_series(self):
timestamps = []
current_time = self.start_time
while current_time <= self.end_time:
timestamps.append(int(current_time.timestamp() * 1000))
current_time += dt.timedelta(seconds=self.interval)
return timestamps
def generate(self, initPrice, trend_type="neutral", volatility=1):
data = []
current_price = initPrice
angle = 0
for timestamp in self.timestamps:
angle_radians = math.radians(angle % 360)
cos_value = math.cos(angle_radians)
if trend_type == "down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "slow_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 0.5) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_down":
upFactor = random.uniform(0, 0.5)
change = random.uniform(-10, 0.5 * upFactor) * volatility * random.uniform(1, 3)
elif trend_type == "sharp_up":
downFactor = random.uniform(0, 0.5)
change = random.uniform(-0.5 * downFactor, 10) * volatility * random.uniform(1, 3)
elif trend_type == "narrow_range":
change = random.uniform(-0.2, 0.2) * volatility * random.uniform(1, 3)
elif trend_type == "wide_range":
change = random.uniform(-3, 3) * volatility * random.uniform(1, 3)
else:
change = random.uniform(-0.5, 0.5) * volatility * random.uniform(1, 3)
change = change + cos_value * random.uniform(-0.2, 0.2) * volatility
open_price = current_price
high_price = open_price + random.uniform(0, abs(change))
low_price = max(open_price - random.uniform(0, abs(change)), random.uniform(0, open_price))
close_price = open_price + change if open_price + change < high_price and open_price + change > low_price else random.uniform(low_price, high_price)
if (high_price >= open_price and open_price >= close_price and close_price >= low_price) or (high_price >= close_price and close_price >= open_price and open_price >= low_price):
pass
else:
Log("异常数据:", high_price, open_price, low_price, close_price, "#FF0000")
high_price = max(high_price, open_price, close_price)
low_price = min(low_price, open_price, close_price)
base_volume = random.uniform(1000, 5000)
volume = base_volume * (1 + abs(change) * 0.2)
kline = {
"Time": timestamp,
"Open": round(open_price, 2),
"High": round(high_price, 2),
"Low": round(low_price, 2),
"Close": round(close_price, 2),
"Volume": round(volume, 2),
}
data.append(kline)
current_price = close_price
angle += 1
return data
def save_to_csv(self, filename, data):
with open(filename, mode="w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["", "open", "high", "low", "close", "vol"])
for idx, kline in enumerate(data):
writer.writerow(
[kline["Time"], kline["Open"], kline["High"], kline["Low"], kline["Close"], kline["Volume"]]
)
Log("当前路径:", os.getcwd())
with open("data.csv", "r") as file:
lines = file.readlines()
if len(lines) > 1:
Log("文件写入成功,以下是文件内容的一部分:")
Log("".join(lines[:5]))
else:
Log("文件写入失败,文件为空!")
def main():
Chart({})
LogReset(1)
try:
# _thread.start_new_thread(createServer, (("localhost", 9090), ))
_thread.start_new_thread(createServer, (("0.0.0.0", 9090), ))
Log("开启自定义数据源服务线程,数据由CSV文件提供。", ", 地址/端口:0.0.0.0:9090", "#FF0000")
except BaseException as e:
Log("启动自定义数据源服务失败!")
Log("错误信息:", e)
raise Exception("stop")
while True:
cmd = GetCommand()
if cmd:
if cmd == "createRecords":
Log("生成器参数:", "起始时间:", startTime, "结束时间:", endTime, "K线周期:", KLinePeriod, "初始价格:", firstPrice, "波动类型:", arrTrendType[trendType], "波动性系数:", ratio)
generator = KlineGenerator(
start_time=startTime,
end_time=endTime,
interval=KLinePeriod,
)
kline_data = generator.generate(firstPrice, trend_type=arrTrendType[trendType], volatility=ratio)
generator.save_to_csv("data.csv", kline_data)
ext.PlotRecords(kline_data, "%s_%s" % ("records", KLinePeriod))
LogStatus(_D())
Sleep(2000)



/*backtest
start: 2024-10-01 08:00:00
end: 2024-10-31 08:55:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT","feeder":"http://xxx.xxx.xxx.xxx:9090"}]
args: [["ContractType","quarter",358374]]
*/
Konfigurasikan sesuai informasi di atas dan buat penyesuaian khusus.http://xxx.xxx.xxx.xxx:9090Ini adalah alamat IP server dan port terbuka dari strategi pembangkitan pasar acak pada disk nyata.
Ini adalah sumber data kustom. Anda dapat merujuk ke bagian sumber data kustom dalam dokumentasi API platform untuk informasi lebih lanjut.


Pada titik ini, sistem pengujian ulang diuji menggunakan data simulasi “buatan” kami. Berdasarkan data pada grafik pasar selama pengujian ulang, bandingkan data pada grafik waktu nyata yang dihasilkan oleh kondisi pasar acak. Waktu saat ini adalah pukul 17:00 pada tanggal 16 Oktober 2024. Datanya sama.
Kode sumber strategi:Generator Kutipan Acak Sistem Pengujian Ulang
Terima kasih atas dukungan dan bacaan Anda.