Saya sering nongkrong di platform penemu dan selalu menemukan harta karun. Hari ini saya menemukan seorang penemu berusia 21 tahunStrategi TrenSaya mengagumi struktur kode penulis aslinya yang indah dan sempurna, serta fleksibilitasnya yang tinggi. Strategi aslinya adalah versi JS, yang telah ditulis ulang demi kenyamanan teman-teman Python.
Sejujurnya, banyak pemula mengambil banyak jalan memutar saat pertama kali terjun ke perdagangan kuantitatif. Mereka sering menghadapi masalah seperti order yang gagal, kerugian akibat manajemen risiko yang buruk, dan kehilangan data setelah memulai kembali strategi. Kemudian, saya perlahan menyadari pentingnya kerangka kerja yang baik, yang dapat membantu kita menghindari banyak jebakan. Kerangka kerja strategi tren ini adalah alat yang sangat berharga. Ini lebih dari sekadar strategi perdagangan sederhana; lebih seperti kotak peralatan, menyediakan fungsi-fungsi dasar namun krusial seperti penempatan order, order stop-loss, dan manajemen data. Anda hanya perlu fokus pada pertanyaan inti “kapan harus membeli” dan “kapan harus menjual.” Lebih lanjut, kerangka kerja ini sangat terbuka, memungkinkan Anda untuk dengan mudah mengganti EMA dengan MACD, RSI, atau indikator lain yang Anda sukai. Ingin mengikuti tren? Tidak masalah. Ingin mencoba mean reversion? Bahkan ingin menggabungkan beberapa indikator? Tentu saja. Fleksibilitas ini sangat berguna; Anda dapat memodifikasi kode yang sama untuk bereksperimen dengan berbagai ide.
Saya membagikan kerangka kerja ini hari ini, berharap dapat bermanfaat bagi mereka yang sedang mempelajari investasi kuantitatif. Berikut adalah pengantar mendetail untuk setiap komponen kerangka kerja ini, yang saya yakin akan bermanfaat bagi Anda.
Dibandingkan dengan beberapa fungsi independen yang digunakan dalam kerangka kerja perdagangan multi-komoditas, kerangka kerja ini berupaya mengatur dan mengelola berbagai bagian strategi menggunakan format kelas. Desain berorientasi objek ini tidak hanya meningkatkan kemudahan pemeliharaan dan skalabilitas kode, tetapi juga membuat komponen strategi lebih modular, sehingga memudahkan penyesuaian dan optimasi selanjutnya. Kerangka kerja ini terutama terdiri dari bagian-bagian berikut, masing-masing dengan fungsi spesifiknya sendiri, yang menjamin fleksibilitas dan kepraktisan strategi.
fungsi init
__init__Fungsi ini merupakan metode inisialisasi kelas strategi, yang bertanggung jawab untuk menetapkan konfigurasi dasar strategi, menginisialisasi variabel, dan memperoleh informasi pasar. Fungsi ini memastikan bahwa parameter yang diperlukan telah dikonfigurasi sebelum strategi dijalankan, sehingga operasi perdagangan selanjutnya dapat dijalankan dengan lancar.fungsi initDatas
fungsi saveStrategyRunTime
fungsi setStrategyRunTime
_GFungsi ini menyimpan stempel waktu yang dilewatkan secara lokal.fungsi getDaysFromTimeStamp
fungsi saveUserDatasLocal
_GFungsi ini menyimpan data secara lokal.fungsi readUserDataLocal
fungsi clearUserDataLocal
_GFungsi menghapus data lokal.Fungsi runCmd
fungsi orderDirectly
fungsi openLong
orderDirectlyFungsi ini melakukan operasi pembelian.fungsi openShort
orderDirectlyFungsi ini melakukan operasi penjualan.fungsi coverLong
orderDirectlyFungsi ini melakukan operasi penjualan.fungsi coverShort
orderDirectlyFungsi ini melakukan operasi pembelian.fungsi getRealOrderSize
fungsi getSinglePositionMargin
fungsi getSinglePositionProfit
fungsi calculateForcedPrice
fungsi getMaxOrderSize
fungsi getAccountAsset
fungsi hitungKeuntungan
fungsi isEnoughAssetToOrder
fungsi runInKLinePeriod
TrueJika tidak, kembalikanFalse。fungsi trendJudgment (modul inti penilaian tren)
fungsi stopLoss
fungsi takeProfit
fungsi pelacakanTakeProfit
fungsi pesanan
Fungsi TrendStrategy
fungsi printLogStatus
LogStatusFungsi ini mengeluarkan data tabel ke bilah status.fungsi utama
Kerangka kerja ini tidak hanya berlaku untuk pasar mata uang digital, tetapi juga dapat digunakan ditrendJudgmentKerangka kerja ini dapat diperluas fungsinya untuk beradaptasi dengan berbagai kebutuhan strategi perdagangan. Selain itu, kerangka kerja ini juga dapat dimodifikasi khusus untuk pasar spot atau kontrak multi-variasi, dengan fleksibilitas dan skalabilitas yang tinggi.
Sebagai sistem perdagangan otomatis yang komprehensif dan sangat fleksibel, kerangka kerja ini cocok untuk perdagangan tren di pasar mata uang kripto. Melalui optimalisasi dan pengembangan yang berkelanjutan, kerangka kerja ini diharapkan menjadi alat yang berharga bagi para pedagang mata uang kripto di masa mendatang, membantu mereka mengembangkan strategi kuantitatif mereka sendiri dengan lebih baik. “Kerangka Kerja Perdagangan Strategi Tren Mata Uang Kripto” menawarkan struktur yang komprehensif. Meskipun kodenya relatif besar, kerangka kerja ini pada dasarnya mencakup modul-modul fungsional inti yang diperlukan untuk perdagangan tren dari perspektif perdagangan dunia nyata. Oleh karena itu, kerangka kerja ini memiliki nilai referensi dan signifikansi praktis yang signifikan, baik untuk mempelajari strategi perdagangan maupun untuk aplikasi praktis. Fungsionalitas dan fleksibilitasnya yang komprehensif memungkinkannya untuk beradaptasi dengan beragam lingkungan pasar, memberikan dukungan yang kuat.
Platform Inventor adalah gudang pengetahuan dan strategi perdagangan kuantitatif, yang masing-masing mewujudkan kebijaksanaan dan pengalaman para pengembangnya. Kami menyambut semua orang untuk mengeksplorasi strategi dan teknik perdagangan yang berharga di sini. Terima kasih kepada semua pengguna kami yang inovatif dan suka berbagi. Berkat kontribusi Anda, platform ini telah menjadi wadah penting untuk belajar dan bertukar informasi dalam perdagangan kuantitatif, membantu semua orang meningkatkan keterampilan dan keahlian mereka.
'''backtest
start: 2024-11-26 00:00:00
end: 2024-12-03 00:00:00
period: 1d
basePeriod: 1d
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}]
'''
import json, talib
import numpy as np
class TrendStrategy:
def __init__(self):
# 基本设置
self._Currency = TradeCurrency
self._Interval = Interval
self._UseQuarter = UseQuarter
self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter')
self._OnlyTrendJudgment = OnlyTrendJudgment
self._EnableMessageSend = EnableMessageSend
# 趋势判断
self._RunInKLinePeriod = RunInKLinePeriod
self._KLinePeriod = KLinePeriod
self._EmaLength = EmaLength
self._EmaCoefficient = EmaCoefficient
self._UseStddev = UseStddev
self._UseRecordsMiddleValue = UseRecordsMiddleValue
self._StddevLength = StddevLength
self._StddevDeviations = StddevDeviations
# 下单设置
self._MarginLevel = MarginLevel
self._OrderSize = OrderSize
self._OrderByMargin = OrderByMargin
self._OrderMarginPercent = OrderMarginPercent
self._PricePrecision = None
self._AmountPrecision = None
self._OneSizeInCurrentCoin = None
self._QuarterOneSizeValue = None
# 止盈止损
self._UseStopLoss = UseStopLoss
self._StopLossPercent = StopLossPercent
self._UseTakeProfit = UseTakeProfit
self._TakeProfitPercent = TakeProfitPercent
self._UseTrackingTakeProfit = UseTrackingTakeProfit
self._UsePositionRetracement = UsePositionRetracement
self._TakeProfitTriggerPercent = TakeProfitTriggerPercent
self._CallBakcPercent = CallBakcPercent
# 策略变量
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# 统计变量
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# 相对固定参数
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# 获取合约信息
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('获取市场信息:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# 初始化数据
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("已经重新计算下单张数:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("止盈和回调止盈不能同时使用!")
# 设置合约
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# 保存程序起始运行时间 秒级时间戳
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# 设置程序起始运行时间 秒级时间戳
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# 计算两个时间戳之间的天数,参数是秒级时间戳
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# 保存数据到本地
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# 存储到本地
_G(exchange.GetLabel(), self._UserDatas)
Log("已把所有数据保存到本地.")
# 读取用户本地数据,程序启动时候运行一次
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# 清除用户本地数据,交互按钮点击运行
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ":已清除本地数据.")
# 策略交互
def runCmd(self):
cmd = GetCommand()
if cmd:
# 检测交互命令
Log("接收到的命令:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# 清除本地数据
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# 保存数据到本地
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# 清除日志
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# 修改下单张数
if self._OrderByMargin:
Log("已经使用保证金数量来下单,无法直接修改下单数量!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("下单张数已经修改为:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# 修改下单保证金百分比
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("下单保证金百分比:", self._OrderMarginPercent, "%")
else:
Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")
# 交易函数
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("设置的参数有误,下单数量已经小于0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# 重新计算下单数量
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # u本位数量(杠杆放大数量)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # 币本位数量(杠杆放大数量)
else:
self._OrderSize = amount
return self._OrderSize
# 获取单个持仓占用保证金
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# 获取单向持仓的收益和收益%
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# 计算强平价格
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# 计算最大可下单张数
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# 获取账户资产
def getAccountAsset(self, position, account, ticker):
# 计算不同情况下的账户初始资产
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# 收益统计
def calculateProfit(self, ticker):
# 重新获取一下账户持仓与资产
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# 当前总收益 - 上一次总收益 = 本次的收益
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " 本次收益:", _N(current_profit, 6))
self.saveUserDatasLocal()
# 是否还够资金下单
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# 按照K线周期运行策略核心
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# 趋势判断模块(可编辑具体指标)
def trendJudgment(self, records):
# 检查价格是否穿过均线
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # 完成穿过
# EMA的多空判断
ema_long = False
ema_short = False
price = records[-2].Close # 已经收盘的K线的收盘价格
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # 收盘K线对应ema值
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# 标准差判断
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # 将 list 转换为 np.array
stddev = np.std(records_data, ddof=1) # 使用 numpy 计算标准差
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# 趋势判断
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# 止损
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 止盈
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 回调止盈
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# 多单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格高点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # 计算回调的止盈价格
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # 平多
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = 1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格高点
Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# 空单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格低点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # 计算回调的止盈价格
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # 平空
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = -1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格低点
Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
# 下单
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# 趋势多
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# 多单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
elif short:
# 趋势空
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# 空单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
# 趋势策略
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("同时有多空持仓!")
# 策略交互
self.runCmd()
# 状态栏信息打印
self.printLogStatus(ticker, account, position)
# 止损
self.stopLoss(position, ticker)
# 止盈
self.takeProfit(position, ticker)
# 回调止盈
self.trackingTakeProfit(position, ticker)
# 按照K线周期运行策略
if not self.runInKLinePeriod(records):
return
# 趋势判断和下单
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# 状态栏信息打印
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "策略总览",
"cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
"rows": []
}
table_account = {
"type": "table",
"title": "账户资金",
"cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
"rows": []
}
table_position = {
"type": "table",
"title": "持仓情况",
"cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
"rows": []
}
i = 0
# 策略总览
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
monthly_rate_of_profit = 0
if the_running_days > 1:
monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
# 账户资金
current_asset = self.getAccountAsset(position, account, ticker)
max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
asset_profit = current_asset - self._InitAsset
asset_profit_percent = asset_profit / self._InitAsset
table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
_N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
str(_N(asset_profit_percent * 100, 2)) + "%"])
# 持仓情况
position_direction = ""
forced_cover_up_price = 0
position_profit_percent = 0
position_profit = 0
position_margin = 0
if len(position) == 0:
table_position["rows"].append(["无持仓", "-", "-", "-", "-", "-", "-", "-", "-"])
else:
position_direction = "多单" if position[0].Type == PD_LONG else "空单"
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
position_margin = self.getSinglePositionMargin(position, ticker)
forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount,
_N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"])
# 打印表格
LogStatus('`' + json.dumps(table_overview) + '`\n'
+ '`' + json.dumps(table_account) + '`\n'
+ '`' + json.dumps(table_position) + '`\n')
# main
def main():
exchange.IO('simulate', True)
strategy = TrendStrategy()
strategy.setContract()
strategy.initDatas()
while True:
strategy.trendStrategy()
Sleep(strategy._Interval)