Costumo frequentar a plataforma do inventor e sempre encontro tesouros. Hoje encontrei um jovem de 21 anosEstratégia de TendênciasAdmiro a estrutura requintada e perfeita do código original do autor, que possui alta flexibilidade. A estratégia original é a versão JS, que foi reescrita para a conveniência dos amigos do Python.
Honestamente, muitos iniciantes fazem muitos desvios ao começar a negociar quantitativamente. Frequentemente, eles encontram problemas como ordens falhadas, perdas por má gestão de risco e perda de dados após reiniciar uma estratégia. Mais tarde, gradualmente percebi a importância de uma boa estrutura, que pode nos ajudar a evitar muitas armadilhas. Esta estrutura de estratégia de tendências é uma ferramenta muito valiosa. É mais do que uma simples estratégia de negociação; é mais como uma caixa de ferramentas, fornecendo funções básicas, porém cruciais, como colocação de ordens, ordens stop-loss e gerenciamento de dados. Você só precisa se concentrar nas questões centrais de “quando comprar” e “quando vender”. Além disso, a estrutura é bastante aberta, permitindo que você troque facilmente a MME pelo MACD, RSI ou qualquer outro indicador de sua preferência. Quer acompanhar tendências? Sem problemas. Quer tentar a reversão à média? Quer até mesmo combinar vários indicadores? Com certeza. Essa flexibilidade é incrivelmente útil; você pode modificar o mesmo código para experimentar ideias diferentes.
Compartilho esta estrutura hoje, esperando que seja útil para quem está explorando o investimento quantitativo. Abaixo, uma introdução detalhada a cada componente desta estrutura, que acredito que você achará útil.
Em comparação com as múltiplas funções independentes utilizadas em frameworks de negociação de múltiplas commodities, este framework busca organizar e gerenciar as diversas partes de uma estratégia utilizando um formato de classe. Este design orientado a objetos não apenas melhora a manutenibilidade e a escalabilidade do código, como também torna os componentes da estratégia mais modulares, facilitando ajustes e otimizações subsequentes. O framework consiste principalmente nas seguintes seções, cada uma com sua função específica, garantindo a flexibilidade e a praticidade da estratégia.
função init
__init__Esta função é o método de inicialização da classe de estratégia, responsável por definir a configuração básica da estratégia, inicializar variáveis e obter informações de mercado. Esta função garante que os parâmetros necessários sejam configurados antes da execução da estratégia, garantindo que as operações de negociação subsequentes possam ser executadas sem problemas.função initDatas
função saveStrategyRunTime
função setStrategyRunTime
_GA função salva o registro de data e hora passado localmente.função getDaysFromTimeStamp
função saveUserDatasLocal
_GA função salva os dados localmente.função readUserDataLocal
função clearUserDataLocal
_GA função limpa os dados locais.Função runCmd
função orderDirectly
função openLong
orderDirectlyA função executa uma operação de compra.função openShort
orderDirectlyA função executa uma operação de venda.função coverLong
orderDirectlyA função executa uma operação de venda.função coverShort
orderDirectlyA função executa uma operação de compra.função getRealOrderSize
função getSinglePositionMargin
função getSinglePositionProfit
função calculateForcedPrice
função getMaxOrderSize
função getAccountAsset
função calcularLucro
função isEnoughAssetToOrder
função runInKLinePeriod
TrueCaso contrário, retorneFalse。Função trendJudgment (módulo principal de julgamento de tendências)
função stopLoss
função takeProfit
função trackingTakeProfit
função de ordem
Função trendStrategy
função printLogStatus
LogStatusA função envia os dados da tabela para a barra de status.função principal
Esta estrutura não é aplicável apenas ao mercado de moeda digital, mas também pode ser usada notrendJudgmentA estrutura pode ser expandida na função para se adaptar a diferentes requisitos de estratégias de negociação. Além disso, a estrutura também pode ser modificada especificamente para o mercado à vista ou contratos multivariados, com alta flexibilidade e escalabilidade.
Como um sistema de negociação automatizado abrangente e altamente flexível, esta estrutura é adequada para negociação de tendências no mercado de criptomoedas. Por meio de otimização e expansão contínuas, espera-se que se torne uma ferramenta valiosa para traders de criptomoedas no futuro, ajudando-os a desenvolver melhor suas próprias estratégias quantitativas. A “Estrutura de Negociação de Estratégias de Tendências em Criptomoedas” apresenta uma estrutura abrangente. Embora relativamente extensa em código, ela abrange essencialmente os principais módulos funcionais necessários para a negociação de tendências de uma perspectiva de negociação do mundo real. Portanto, esta estrutura possui um valor de referência significativo e importância prática, tanto para o aprendizado de estratégias de negociação quanto para a aplicação prática. Sua funcionalidade abrangente e flexibilidade permitem que ela se adapte a diversos ambientes de mercado, fornecendo um suporte sólido.
A Plataforma Inventor é um tesouro de conhecimento e estratégias de negociação quantitativa, cada uma incorporando a sabedoria e a experiência de seus desenvolvedores. Convidamos todos a explorar estratégias e técnicas de negociação valiosas aqui. Agradecemos a todos os nossos usuários inovadores e dedicados. É graças às suas contribuições que esta plataforma se tornou um espaço vital para aprendizado e intercâmbio em negociação quantitativa, ajudando todos a aprimorar suas habilidades e expertise.
”`python “‘backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{“eid”:“Futures_Binance”,“currency”:“BTC_USDT”}] “’
import json, talib import numpy as np
class TrendStrategy: def init(self): # 基本设置 self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + (‘.swap’ if self._UseQuarter else ‘.quarter’) self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # 趋势判断 self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # 下单设置 self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # 止盈止损 self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent
# 策略变量
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# 统计变量
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# 相对固定参数
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# 获取合约信息
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('获取市场信息:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# 初始化数据
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("已经重新计算下单张数:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("止盈和回调止盈不能同时使用!")
# 设置合约
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# 保存程序起始运行时间 秒级时间戳
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# 设置程序起始运行时间 秒级时间戳
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# 计算两个时间戳之间的天数,参数是秒级时间戳
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# 保存数据到本地
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# 存储到本地
_G(exchange.GetLabel(), self._UserDatas)
Log("已把所有数据保存到本地.")
# 读取用户本地数据,程序启动时候运行一次
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# 清除用户本地数据,交互按钮点击运行
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ":已清除本地数据.")
# 策略交互
def runCmd(self):
cmd = GetCommand()
if cmd:
# 检测交互命令
Log("接收到的命令:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# 清除本地数据
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# 保存数据到本地
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# 清除日志
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# 修改下单张数
if self._OrderByMargin:
Log("已经使用保证金数量来下单,无法直接修改下单数量!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("下单张数已经修改为:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# 修改下单保证金百分比
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("下单保证金百分比:", self._OrderMarginPercent, "%")
else:
Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")
# 交易函数
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("设置的参数有误,下单数量已经小于0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# 重新计算下单数量
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # u本位数量(杠杆放大数量)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # 币本位数量(杠杆放大数量)
else:
self._OrderSize = amount
return self._OrderSize
# 获取单个持仓占用保证金
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# 获取单向持仓的收益和收益%
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# 计算强平价格
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# 计算最大可下单张数
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# 获取账户资产
def getAccountAsset(self, position, account, ticker):
# 计算不同情况下的账户初始资产
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# 收益统计
def calculateProfit(self, ticker):
# 重新获取一下账户持仓与资产
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# 当前总收益 - 上一次总收益 = 本次的收益
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " 本次收益:", _N(current_profit, 6))
self.saveUserDatasLocal()
# 是否还够资金下单
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# 按照K线周期运行策略核心
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# 趋势判断模块(可编辑具体指标)
def trendJudgment(self, records):
# 检查价格是否穿过均线
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # 完成穿过
# EMA的多空判断
ema_long = False
ema_short = False
price = records[-2].Close # 已经收盘的K线的收盘价格
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # 收盘K线对应ema值
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# 标准差判断
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # 将 list 转换为 np.array
stddev = np.std(records_data, ddof=1) # 使用 numpy 计算标准差
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# 趋势判断
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# 止损
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 止盈
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 回调止盈
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# 多单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格高点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # 计算回调的止盈价格
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # 平多
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = 1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格高点
Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# 空单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格低点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # 计算回调的止盈价格
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # 平空
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = -1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格低点
Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
# 下单
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# 趋势多
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# 多单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
elif short:
# 趋势空
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# 空单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
# 趋势策略
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("同时有多空持仓!")
# 策略交互
self.runCmd()
# 状态栏信息打印
self.printLogStatus(ticker, account, position)
# 止损
self.stopLoss(position, ticker)
# 止盈
self.takeProfit(position, ticker)
# 回调止盈
self.trackingTakeProfit(position, ticker)
# 按照K线周期运行策略
if not self.runInKLinePeriod(records):
return
# 趋势判断和下单
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# 状态栏信息打印
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "策略总览",
"cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
"rows": []
}
table_account = {
"type": "table",
"title": "账户资金",
"cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
"rows": []
}
table_position = {
"type": "table",
"title": "持仓情况",
"cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
"rows": []
}
i = 0
# 策略总览
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
monthly_rate_of_profit = 0
if the_running_days > 1:
monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
# 账户资金
current_asset = self.getAccountAsset(position, account, ticker)
max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
asset_profit = current_asset - self._InitAsset
asset_profit_percent = asset_profit / self._InitAsset
table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
_N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
str(_N(asset_profit_percent * 100, 2)) + "%"])
# 持仓情况
position_direction = ""
forced_cover_up_price = 0
position_profit_percent = 0
position_profit = 0
position_margin = 0
if len(position) == 0:
table_position["rows"].append(["无持仓", "-", "-", "-", "-", "-", "-", "-", "-"])
else:
position_direction = "多单" if position[0].Type == PD_LONG else "空单"
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
position_margin = self.getSinglePositionMargin(position, ticker)
forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount,
_N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"])
# 打印表格
LogStatus('`' + json.dumps(table_overview) + '`\n'
+ '`' + json.dumps(table_account) + '`\n'
+ '`' + json.dumps(table_position) + '`\n')
def main(): exchange.IO(‘simulate’, True)
strategy = TrendStrategy()
strategy.setContract()
strategy.initDatas()
while True:
strategy.trendStrategy()
Sl