Saya sering melepak di platform pencipta dan sentiasa mencari harta karun. Hari ini saya menemui seorang lelaki berusia 21 tahunStrategi Trend, Saya mengagumi struktur kod pengarang asal yang indah dan sempurna, dan ia mempunyai fleksibiliti yang tinggi. Strategi asal ialah versi JS, yang telah ditulis semula untuk kemudahan rakan Python.
Secara jujur, ramai pemula mengambil banyak lencongan apabila mula-mula memasuki perdagangan kuantitatif. Mereka sering menghadapi masalah seperti pesanan yang gagal, kerugian daripada pengurusan risiko yang lemah dan kehilangan data selepas memulakan semula strategi. Kemudian, saya secara beransur-ansur menyedari kepentingan rangka kerja yang baik, yang boleh membantu kita mengelakkan banyak perangkap. Rangka kerja strategi trend ini adalah alat yang sangat berharga. Ia lebih daripada sekadar strategi dagangan mudah; ia lebih seperti kotak alat, memberikan anda fungsi asas lagi penting seperti penempatan pesanan, pesanan henti rugi dan pengurusan data. Anda hanya perlu memberi tumpuan kepada soalan teras “bila hendak membeli” dan “bila hendak menjual.” Tambahan pula, rangka kerja ini sangat terbuka, membolehkan anda menukar EMA dengan mudah untuk MACD, RSI atau mana-mana penunjuk lain yang anda suka. Ingin mengikuti trend? Tiada masalah. Ingin mencuba min reversion? Malah mahu menggabungkan berbilang penunjuk? betul-betul. Fleksibiliti ini amat berguna; anda boleh mengubah suai kod yang sama untuk mencuba idea yang berbeza.
Saya berkongsi rangka kerja ini hari ini, berharap ia dapat membantu mereka yang meneroka pelaburan kuantitatif. Di bawah ialah pengenalan terperinci kepada setiap komponen rangka kerja ini, yang saya percaya anda akan dapati berguna.
Berbanding dengan pelbagai fungsi bebas yang digunakan dalam rangka kerja dagangan berbilang komoditi, rangka kerja ini cuba mengatur dan mengurus pelbagai bahagian strategi menggunakan format kelas. Reka bentuk berorientasikan objek ini bukan sahaja meningkatkan kebolehselenggaraan dan kebolehskalaan kod, tetapi juga menjadikan komponen strategi lebih modular, memudahkan pelarasan dan pengoptimuman seterusnya. Rangka kerja terutamanya terdiri daripada bahagian berikut, masing-masing dengan fungsi khususnya sendiri, memastikan fleksibiliti dan kepraktisan strategi.
fungsi init
__init__Fungsi ini ialah kaedah permulaan kelas strategi, bertanggungjawab untuk menetapkan konfigurasi asas strategi, memulakan pembolehubah, dan mendapatkan maklumat pasaran. Fungsi ini memastikan bahawa parameter yang diperlukan dikonfigurasikan sebelum strategi dijalankan, memastikan operasi dagangan seterusnya dapat dilaksanakan dengan lancar.fungsi initDatas
fungsi saveStrategyRunTime
setStrategyRunTime fungsi
_GFungsi ini menyimpan cap masa yang diluluskan secara setempat.fungsi getDaysFromTimeStamp
fungsi saveUserDatasLocal
_GFungsi ini menyimpan data secara setempat.fungsi readUserDataLocal
fungsi clearUserDataLocal
_GFungsi mengosongkan data tempatan.Fungsi runCmd
orderDirectly berfungsi
fungsi openLong
orderDirectlyFungsi melakukan operasi beli.fungsi openShort
orderDirectlyFungsi ini menjalankan operasi jual.coverFungsi panjang
orderDirectlyFungsi ini menjalankan operasi jual.fungsi coverShort
orderDirectlyFungsi melakukan operasi beli.fungsi getRealOrderSize
getSinglePositionMargin fungsi
fungsi getSinglePositionProfit
fungsi hitungForcedPrice
fungsi getMaxOrderSize
fungsi getAccountAsset
fungsi mengira Untung
fungsi isEnoughAssetToOrder
fungsi runInKLinePeriod
TrueJika tidak, kembaliFalse。Fungsi trendJudgment (modul pertimbangan arah aliran teras)
fungsi stopLoss
fungsi takeProfit
fungsi trackingTakeProfit
fungsi pesanan
trendStrategi Fungsi
fungsi printLogStatus
LogStatusFungsi ini mengeluarkan data jadual ke bar status.fungsi utama
Rangka kerja ini bukan sahaja terpakai untuk pasaran mata wang digital, tetapi juga boleh digunakan dalamtrendJudgmentRangka kerja boleh dikembangkan dalam fungsi untuk menyesuaikan diri dengan keperluan strategi perdagangan yang berbeza. Selain itu, rangka kerja ini juga boleh diubah suai khusus untuk pasaran spot atau kontrak pelbagai variasi, dengan fleksibiliti dan skalabiliti yang tinggi.
Sebagai sistem perdagangan automatik yang komprehensif dan sangat fleksibel, rangka kerja ini sesuai untuk perdagangan trend dalam pasaran mata wang kripto. Melalui pengoptimuman dan pengembangan berterusan, ia dijangka menjadi alat yang berharga untuk pedagang mata wang kripto pada masa hadapan, membantu mereka membangunkan strategi kuantitatif mereka sendiri dengan lebih baik. “Rangka Kerja Perdagangan Strategi Arah Mata Wang Kripto” mempunyai struktur yang komprehensif. Walaupun agak besar dalam kod, ia pada asasnya merangkumi modul fungsi teras yang diperlukan untuk perdagangan trend dari perspektif perdagangan dunia sebenar. Oleh itu, rangka kerja ini mempunyai nilai rujukan yang signifikan dan kepentingan praktikal, kedua-duanya untuk mempelajari strategi perdagangan dan untuk aplikasi praktikal. Kefungsian dan fleksibiliti komprehensifnya membolehkannya menyesuaikan diri dengan persekitaran pasaran yang pelbagai, memberikan sokongan yang kukuh.
Platform Inventor ialah khazanah pengetahuan dan strategi perdagangan kuantitatif, masing-masing merangkumi kebijaksanaan dan pengalaman pembangunnya. Kami mengalu-alukan semua orang untuk menerokai strategi dan teknik dagangan yang berharga di sini. Terima kasih kepada semua pengguna kami yang inovatif dan berkongsi. Berkat sumbangan anda, platform ini telah menjadi tempat penting untuk pembelajaran dan pertukaran dalam perdagangan kuantitatif, membantu semua orang meningkatkan kemahiran dan kepakaran mereka.
'''backtest
start: 2024-11-26 00:00:00
end: 2024-12-03 00:00:00
period: 1d
basePeriod: 1d
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}]
'''
import json, talib
import numpy as np
class TrendStrategy:
def __init__(self):
# 基本设置
self._Currency = TradeCurrency
self._Interval = Interval
self._UseQuarter = UseQuarter
self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter')
self._OnlyTrendJudgment = OnlyTrendJudgment
self._EnableMessageSend = EnableMessageSend
# 趋势判断
self._RunInKLinePeriod = RunInKLinePeriod
self._KLinePeriod = KLinePeriod
self._EmaLength = EmaLength
self._EmaCoefficient = EmaCoefficient
self._UseStddev = UseStddev
self._UseRecordsMiddleValue = UseRecordsMiddleValue
self._StddevLength = StddevLength
self._StddevDeviations = StddevDeviations
# 下单设置
self._MarginLevel = MarginLevel
self._OrderSize = OrderSize
self._OrderByMargin = OrderByMargin
self._OrderMarginPercent = OrderMarginPercent
self._PricePrecision = None
self._AmountPrecision = None
self._OneSizeInCurrentCoin = None
self._QuarterOneSizeValue = None
# 止盈止损
self._UseStopLoss = UseStopLoss
self._StopLossPercent = StopLossPercent
self._UseTakeProfit = UseTakeProfit
self._TakeProfitPercent = TakeProfitPercent
self._UseTrackingTakeProfit = UseTrackingTakeProfit
self._UsePositionRetracement = UsePositionRetracement
self._TakeProfitTriggerPercent = TakeProfitTriggerPercent
self._CallBakcPercent = CallBakcPercent
# 策略变量
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# 统计变量
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# 相对固定参数
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# 获取合约信息
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('获取市场信息:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# 初始化数据
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("已经重新计算下单张数:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("止盈和回调止盈不能同时使用!")
# 设置合约
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# 保存程序起始运行时间 秒级时间戳
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# 设置程序起始运行时间 秒级时间戳
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# 计算两个时间戳之间的天数,参数是秒级时间戳
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# 保存数据到本地
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# 存储到本地
_G(exchange.GetLabel(), self._UserDatas)
Log("已把所有数据保存到本地.")
# 读取用户本地数据,程序启动时候运行一次
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# 清除用户本地数据,交互按钮点击运行
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ":已清除本地数据.")
# 策略交互
def runCmd(self):
cmd = GetCommand()
if cmd:
# 检测交互命令
Log("接收到的命令:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# 清除本地数据
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# 保存数据到本地
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# 清除日志
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# 修改下单张数
if self._OrderByMargin:
Log("已经使用保证金数量来下单,无法直接修改下单数量!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("下单张数已经修改为:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# 修改下单保证金百分比
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("下单保证金百分比:", self._OrderMarginPercent, "%")
else:
Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")
# 交易函数
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("设置的参数有误,下单数量已经小于0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# 重新计算下单数量
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # u本位数量(杠杆放大数量)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # 币本位数量(杠杆放大数量)
else:
self._OrderSize = amount
return self._OrderSize
# 获取单个持仓占用保证金
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# 获取单向持仓的收益和收益%
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# 计算强平价格
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# 计算最大可下单张数
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# 获取账户资产
def getAccountAsset(self, position, account, ticker):
# 计算不同情况下的账户初始资产
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# 收益统计
def calculateProfit(self, ticker):
# 重新获取一下账户持仓与资产
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# 当前总收益 - 上一次总收益 = 本次的收益
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " 本次收益:", _N(current_profit, 6))
self.saveUserDatasLocal()
# 是否还够资金下单
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# 按照K线周期运行策略核心
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# 趋势判断模块(可编辑具体指标)
def trendJudgment(self, records):
# 检查价格是否穿过均线
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # 完成穿过
# EMA的多空判断
ema_long = False
ema_short = False
price = records[-2].Close # 已经收盘的K线的收盘价格
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # 收盘K线对应ema值
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# 标准差判断
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # 将 list 转换为 np.array
stddev = np.std(records_data, ddof=1) # 使用 numpy 计算标准差
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# 趋势判断
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# 止损
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 止盈
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 回调止盈
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# 多单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格高点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # 计算回调的止盈价格
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # 平多
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = 1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格高点
Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# 空单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格低点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # 计算回调的止盈价格
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # 平空
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = -1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格低点
Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
# 下单
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# 趋势多
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# 多单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
elif short:
# 趋势空
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# 空单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
# 趋势策略
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("同时有多空持仓!")
# 策略交互
self.runCmd()
# 状态栏信息打印
self.printLogStatus(ticker, account, position)
# 止损
self.stopLoss(position, ticker)
# 止盈
self.takeProfit(position, ticker)
# 回调止盈
self.trackingTakeProfit(position, ticker)
# 按照K线周期运行策略
if not self.runInKLinePeriod(records):
return
# 趋势判断和下单
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# 状态栏信息打印
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "策略总览",
"cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
"rows": []
}
table_account = {
"type": "table",
"title": "账户资金",
"cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
"rows": []
}
table_position = {
"type": "table",
"title": "持仓情况",
"cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
"rows": []
}
i = 0
# 策略总览
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
monthly_rate_of_profit = 0
if the_running_days > 1:
monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
# 账户资金
current_asset = self.getAccountAsset(position, account, ticker)
max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
asset_profit = current_asset - self._InitAsset
asset_profit_percent = asset_profit / self._InitAsset
table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
_N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
str(_N(asset_profit_percent * 100, 2)) + "%"])
# 持仓情况
position_direction = ""
forced_cover_up_price = 0
position_profit_percent = 0
position_profit = 0
position_margin = 0
if len(position) == 0:
table_position["rows"].append(["无持仓", "-", "-", "-", "-", "-", "-", "-", "-"])
else:
position_direction = "多单" if position[0].Type == PD_LONG else "空单"
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
position_margin = self.getSinglePositionMargin(position, ticker)
forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount,
_N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"])
# 打印表格
LogStatus('`' + json.dumps(table_overview) + '`\n'
+ '`' + json.dumps(table_account) + '`\n'
+ '`' + json.dumps(table_position) + '`\n')
# main
def main():
exchange.IO('simulate', True)
strategy = TrendStrategy()
strategy.setContract()
strategy.initDatas()
while True:
strategy.trendStrategy()
Sleep(strategy._Interval)