Ich treibe mich oft auf der Erfinderplattform herum und finde immer wieder Schätze. Heute habe ich einen 21-jährigen gefundenTrendstrategieIch bewundere die exquisite und perfekte Codestruktur des ursprünglichen Autors und seine hohe Flexibilität. Die ursprüngliche Strategie ist die JS-Version, die zur Vereinfachung für Python-Freunde neu geschrieben wurde.
Ehrlich gesagt, machen viele Anfänger beim Einstieg in den quantitativen Handel viele Umwege. Sie stoßen oft auf Probleme wie fehlgeschlagene Orders, Verluste durch schlechtes Risikomanagement und Datenverlust nach dem Neustart einer Strategie. Später erkannte ich allmählich, wie wichtig ein gutes Framework ist, das uns helfen kann, viele Fallstricke zu vermeiden. Dieses Trendstrategie-Framework ist ein äußerst wertvolles Werkzeug. Es ist mehr als nur eine einfache Handelsstrategie; es ist eher wie ein Werkzeugkasten, der Ihnen grundlegende, aber wichtige Funktionen wie Orderplatzierung, Stop-Loss-Orders und Datenmanagement bietet. Sie müssen sich nur auf die Kernfragen konzentrieren: „Wann kaufen?“ und „Wann verkaufen?“. Darüber hinaus ist das Framework sehr offen, sodass Sie den EMA problemlos durch MACD, RSI oder einen anderen Indikator Ihrer Wahl ersetzen können. Sie möchten Trends folgen? Kein Problem. Sie möchten eine Mean Reversion ausprobieren? Oder sogar mehrere Indikatoren kombinieren? Auf jeden Fall. Diese Flexibilität ist unglaublich nützlich; Sie können denselben Code anpassen, um mit verschiedenen Ideen zu experimentieren.
Ich teile dieses Framework heute mit Ihnen und hoffe, dass es für alle hilfreich ist, die sich mit quantitativen Investitionen beschäftigen. Nachfolgend finden Sie eine detaillierte Einführung in die einzelnen Komponenten dieses Frameworks, die Sie meiner Meinung nach nützlich finden werden.
Im Vergleich zu den zahlreichen unabhängigen Funktionen, die in Multi-Commodity-Trading-Frameworks verwendet werden, versucht dieses Framework, die verschiedenen Teile einer Strategie mithilfe eines Klassenformats zu organisieren und zu verwalten. Dieses objektorientierte Design verbessert nicht nur die Wartbarkeit und Skalierbarkeit des Codes, sondern macht die Strategiekomponenten auch modularer und erleichtert so spätere Anpassungen und Optimierungen. Das Framework besteht im Wesentlichen aus den folgenden Abschnitten mit jeweils eigener Funktion, die die Flexibilität und Praktikabilität der Strategie gewährleisten.
init-Funktion
__init__Diese Funktion ist die Initialisierungsmethode der Strategieklasse und ist für die Festlegung der Grundkonfiguration der Strategie, die Initialisierung von Variablen und das Abrufen von Marktinformationen verantwortlich. Diese Funktion stellt sicher, dass die erforderlichen Parameter vor der Ausführung der Strategie konfiguriert werden, um eine reibungslose Ausführung nachfolgender Handelsvorgänge zu gewährleisten.initDatas-Funktion
saveStrategyRunTime-Funktion
setStrategyRunTime-Funktion
_GDie Funktion speichert den übergebenen Zeitstempel lokal.getDaysFromTimeStamp-Funktion
saveUserDatasLocal-Funktion
_GDie Funktion speichert die Daten lokal.readUserDataLocal-Funktion
clearUserDataLocal-Funktion
_GFunktion löscht lokale Daten.runCmd-Funktion
orderDirectly-Funktion
openLong-Funktion
orderDirectlyDie Funktion führt eine Kaufoperation durch.openShort-Funktion
orderDirectlyDie Funktion führt eine Verkaufsoperation durch.coverLong-Funktion
orderDirectlyDie Funktion führt eine Verkaufsoperation durch.coverShort-Funktion
orderDirectlyDie Funktion führt eine Kaufoperation durch.getRealOrderSize-Funktion
getSinglePositionMargin-Funktion
getSinglePositionProfit-Funktion
Funktion „calculateForcedPrice“
getMaxOrderSize-Funktion
getAccountAsset-Funktion
Funktion „Berechnen des Gewinns“
isEnoughAssetToOrder-Funktion
runInKLinePeriod-Funktion
TrueAndernfalls zurückFalse。TrendJudgment-Funktion (Kernmodul zur Trendbeurteilung)
StopLoss-Funktion
TakeProfit-Funktion
TrackingTakeProfit-Funktion
Bestellfunktion
trendStrategy-Funktion
printLogStatus-Funktion
LogStatusDie Funktion gibt die Tabellendaten in die Statusleiste aus.Hauptfunktion
Dieser Rahmen ist nicht nur auf den digitalen Währungsmarkt anwendbar, sondern kann auch in dertrendJudgmentDas Framework kann funktional erweitert werden, um sich an unterschiedliche Anforderungen der Handelsstrategie anzupassen. Darüber hinaus kann das Framework auch speziell für den Spotmarkt oder für Multi-Varieté-Kontrakte modifiziert werden, mit hoher Flexibilität und Skalierbarkeit.
Als umfassendes und hochflexibles automatisiertes Handelssystem eignet sich dieses Framework für den Trendhandel im Kryptowährungsmarkt. Durch kontinuierliche Optimierung und Erweiterung wird es sich voraussichtlich in Zukunft zu einem wertvollen Werkzeug für Kryptowährungshändler entwickeln und ihnen helfen, ihre eigenen quantitativen Strategien besser zu entwickeln. Das „Cryptocurrency Trend Strategy Trading Framework“ zeichnet sich durch eine umfassende Struktur aus. Obwohl der Code relativ umfangreich ist, deckt es im Wesentlichen die wichtigsten Funktionsmodule ab, die für den Trendhandel aus realer Handelsperspektive erforderlich sind. Daher hat dieses Framework einen hohen Referenzwert und eine hohe praktische Bedeutung, sowohl für das Erlernen von Handelsstrategien als auch für die praktische Anwendung. Seine umfassende Funktionalität und Flexibilität ermöglichen die Anpassung an unterschiedliche Marktumgebungen und bieten so umfassende Unterstützung.
Die Inventor-Plattform ist eine wahre Fundgrube an quantitativem Handelswissen und -strategien, die jeweils die Weisheit und Erfahrung ihrer Entwickler widerspiegeln. Wir laden alle ein, hier wertvolle Handelsstrategien und -techniken zu entdecken. Vielen Dank an alle unsere innovativen und austauschfreudigen Nutzer. Dank Ihrer Beiträge ist diese Plattform zu einem wichtigen Ort für Lernen und Austausch im quantitativen Handel geworden und hilft jedem, seine Fähigkeiten und sein Fachwissen zu verbessern.
”`python “‘backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{“eid”:“Futures_Binance”,“currency”:“BTC_USDT”}] “’
import json, talib import numpy as np
class TrendStrategy: def init(self): # 基本设置 self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + (‘.swap’ if self._UseQuarter else ‘.quarter’) self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # 趋势判断 self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # 下单设置 self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # 止盈止损 self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent
# 策略变量
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# 统计变量
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# 相对固定参数
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# 获取合约信息
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('获取市场信息:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# 初始化数据
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("已经重新计算下单张数:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("止盈和回调止盈不能同时使用!")
# 设置合约
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# 保存程序起始运行时间 秒级时间戳
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# 设置程序起始运行时间 秒级时间戳
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# 计算两个时间戳之间的天数,参数是秒级时间戳
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# 保存数据到本地
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# 存储到本地
_G(exchange.GetLabel(), self._UserDatas)
Log("已把所有数据保存到本地.")
# 读取用户本地数据,程序启动时候运行一次
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# 清除用户本地数据,交互按钮点击运行
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ":已清除本地数据.")
# 策略交互
def runCmd(self):
cmd = GetCommand()
if cmd:
# 检测交互命令
Log("接收到的命令:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# 清除本地数据
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# 保存数据到本地
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# 清除日志
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# 修改下单张数
if self._OrderByMargin:
Log("已经使用保证金数量来下单,无法直接修改下单数量!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("下单张数已经修改为:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# 修改下单保证金百分比
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("下单保证金百分比:", self._OrderMarginPercent, "%")
else:
Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")
# 交易函数
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("设置的参数有误,下单数量已经小于0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# 重新计算下单数量
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # u本位数量(杠杆放大数量)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # 币本位数量(杠杆放大数量)
else:
self._OrderSize = amount
return self._OrderSize
# 获取单个持仓占用保证金
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# 获取单向持仓的收益和收益%
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# 计算强平价格
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# 计算最大可下单张数
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# 获取账户资产
def getAccountAsset(self, position, account, ticker):
# 计算不同情况下的账户初始资产
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# 收益统计
def calculateProfit(self, ticker):
# 重新获取一下账户持仓与资产
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# 当前总收益 - 上一次总收益 = 本次的收益
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " 本次收益:", _N(current_profit, 6))
self.saveUserDatasLocal()
# 是否还够资金下单
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# 按照K线周期运行策略核心
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# 趋势判断模块(可编辑具体指标)
def trendJudgment(self, records):
# 检查价格是否穿过均线
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # 完成穿过
# EMA的多空判断
ema_long = False
ema_short = False
price = records[-2].Close # 已经收盘的K线的收盘价格
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # 收盘K线对应ema值
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# 标准差判断
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # 将 list 转换为 np.array
stddev = np.std(records_data, ddof=1) # 使用 numpy 计算标准差
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# 趋势判断
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# 止损
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 止盈
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 回调止盈
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# 多单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格高点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # 计算回调的止盈价格
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # 平多
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = 1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格高点
Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# 空单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格低点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # 计算回调的止盈价格
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # 平空
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = -1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格低点
Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
# 下单
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# 趋势多
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# 多单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
elif short:
# 趋势空
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# 空单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
# 趋势策略
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("同时有多空持仓!")
# 策略交互
self.runCmd()
# 状态栏信息打印
self.printLogStatus(ticker, account, position)
# 止损
self.stopLoss(position, ticker)
# 止盈
self.takeProfit(position, ticker)
# 回调止盈
self.trackingTakeProfit(position, ticker)
# 按照K线周期运行策略
if not self.runInKLinePeriod(records):
return
# 趋势判断和下单
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# 状态栏信息打印
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "策略总览",
"cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
"rows": []
}
table_account = {
"type": "table",
"title": "账户资金",
"cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
"rows": []
}
table_position = {
"type": "table",
"title": "持仓情况",
"cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
"rows": []
}
i = 0
# 策略总览
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
monthly_rate_of_profit = 0
if the_running_days > 1:
monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
# 账户资金
current_asset = self.getAccountAsset(position, account, ticker)
max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
asset_profit = current_asset - self._InitAsset
asset_profit_percent = asset_profit / self._InitAsset
table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
_N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
str(_N(asset_profit_percent * 100, 2)) + "%"])
# 持仓情况
position_direction = ""
forced_cover_up_price = 0
position_profit_percent = 0
position_profit = 0
position_margin = 0
if len(position) == 0:
table_position["rows"].append(["无持仓", "-", "-", "-", "-", "-", "-", "-", "-"])
else:
position_direction = "多单" if position[0].Type == PD_LONG else "空单"
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
position_margin = self.getSinglePositionMargin(position, ticker)
forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
table_position["rows"].append([exchange.GetCurrency(), self._M