Tôi thường xuyên lên diễn đàn phát minh và luôn tìm thấy kho báu. Hôm nay tôi tìm thấy một người 21 tuổi.Chiến lược xu hướngTôi rất ngưỡng mộ cấu trúc mã nguồn tinh tế và hoàn hảo của tác giả gốc, và tính linh hoạt cao. Chiến lược ban đầu là phiên bản JS, đã được viết lại để thuận tiện cho những người dùng Python.
Thành thật mà nói, nhiều người mới bắt đầu thường đi chệch hướng khi mới bước vào giao dịch định lượng. Họ thường gặp phải các vấn đề như lệnh thất bại, thua lỗ do quản lý rủi ro kém và mất dữ liệu sau khi khởi động lại chiến lược. Sau này, tôi dần nhận ra tầm quan trọng của một khuôn khổ tốt, điều này có thể giúp chúng ta tránh được nhiều cạm bẫy. Khuôn khổ chiến lược xu hướng này là một công cụ vô cùng hữu ích. Nó không chỉ là một chiến lược giao dịch đơn giản; nó giống như một hộp công cụ, cung cấp cho bạn các chức năng cơ bản nhưng thiết yếu như đặt lệnh, lệnh cắt lỗ và quản lý dữ liệu. Bạn chỉ cần tập trung vào các câu hỏi cốt lõi là “khi nào nên mua” và “khi nào nên bán”. Hơn nữa, khuôn khổ này rất mở, cho phép bạn dễ dàng hoán đổi đường EMA với MACD, RSI hoặc bất kỳ chỉ báo nào khác mà bạn thích. Muốn theo dõi xu hướng? Không vấn đề gì. Muốn thử nghiệm hồi quy trung bình? Thậm chí muốn kết hợp nhiều chỉ báo? Hoàn toàn có thể. Tính linh hoạt này cực kỳ hữu ích; bạn có thể sửa đổi cùng một mã để thử nghiệm các ý tưởng khác nhau.
Hôm nay tôi chia sẻ khuôn khổ này, hy vọng nó sẽ hữu ích cho những ai đang tìm hiểu về đầu tư định lượng. Dưới đây là phần giới thiệu chi tiết về từng thành phần của khuôn khổ này, mà tôi tin rằng bạn sẽ thấy hữu ích.
So với nhiều chức năng độc lập được sử dụng trong các khuôn khổ giao dịch đa hàng hóa, khuôn khổ này cố gắng tổ chức và quản lý các phần khác nhau của chiến lược bằng định dạng lớp. Thiết kế hướng đối tượng này không chỉ cải thiện khả năng bảo trì và khả năng mở rộng mã, mà còn làm cho các thành phần chiến lược trở nên mô-đun hơn, tạo điều kiện thuận lợi cho việc điều chỉnh và tối ưu hóa sau này. Khuôn khổ chủ yếu bao gồm các phần sau, mỗi phần có chức năng riêng, đảm bảo tính linh hoạt và tính thực tiễn của chiến lược.
hàm init
__init__Hàm này là phương thức khởi tạo của lớp chiến lược, chịu trách nhiệm thiết lập cấu hình cơ bản của chiến lược, khởi tạo các biến và thu thập thông tin thị trường. Hàm này đảm bảo các tham số cần thiết được cấu hình trước khi chiến lược được chạy, đảm bảo các hoạt động giao dịch tiếp theo được thực hiện suôn sẻ.hàm initDatas
hàm saveStrategyRunTime
hàm setStrategyRunTime
_GHàm này lưu dấu thời gian đã trôi qua cục bộ.hàm getDaysFromTimeStamp
hàm saveUserDatasLocal
_GChức năng này lưu dữ liệu cục bộ.hàm readUserDataLocal
hàm clearUserDataLocal
_GChức năng xóa dữ liệu cục bộ.Hàm runCmd
hàm orderDirectly
hàm openLong
orderDirectlyHàm này thực hiện thao tác mua.hàm openShort
orderDirectlyHàm này thực hiện thao tác bán.hàm coverLong
orderDirectlyHàm này thực hiện thao tác bán.hàm coverShort
orderDirectlyHàm này thực hiện thao tác mua.hàm getRealOrderSize
hàm getSinglePositionMargin
hàm getSinglePositionProfit
hàm calculateForcedPrice
hàm getMaxOrderSize
hàm getAccountAsset
hàm calculateProfit
hàm isEnoughAssetToOrder
hàm runInKLinePeriod
TrueNếu không, hãy quay lạiFalse。Chức năng phán đoán xu hướng (mô-đun phán đoán xu hướng cốt lõi)
hàm stopLoss
hàm takeProfit
chức năng theo dõi TakeProfit
chức năng đặt hàng
Hàm trendStrategy
hàm printLogStatus
LogStatusHàm này xuất dữ liệu bảng ra thanh trạng thái.chức năng chính
Khung này không chỉ áp dụng cho thị trường tiền kỹ thuật số mà còn có thể được sử dụng trongtrendJudgmentKhung này có thể được mở rộng chức năng để thích ứng với các yêu cầu chiến lược giao dịch khác nhau. Ngoài ra, khung này cũng có thể được điều chỉnh cụ thể cho thị trường giao ngay hoặc hợp đồng đa dạng, với tính linh hoạt và khả năng mở rộng cao.
Là một hệ thống giao dịch tự động toàn diện và linh hoạt cao, khuôn khổ này phù hợp cho giao dịch theo xu hướng trên thị trường tiền điện tử. Thông qua việc liên tục tối ưu hóa và mở rộng, nó được kỳ vọng sẽ trở thành một công cụ hữu ích cho các nhà giao dịch tiền điện tử trong tương lai, giúp họ phát triển tốt hơn các chiến lược định lượng của riêng mình. “Khung Giao dịch Chiến lược Xu hướng Tiền điện tử” tự hào có cấu trúc toàn diện. Mặc dù mã nguồn khá lớn, nhưng về cơ bản nó bao gồm các mô-đun chức năng cốt lõi cần thiết cho giao dịch theo xu hướng từ góc độ giao dịch thực tế. Do đó, khuôn khổ này có giá trị tham khảo và ý nghĩa thực tiễn đáng kể, cả trong việc học các chiến lược giao dịch lẫn ứng dụng thực tế. Chức năng toàn diện và tính linh hoạt của nó cho phép nó thích ứng với các môi trường thị trường đa dạng, mang lại sự hỗ trợ mạnh mẽ.
Nền tảng Inventor là kho tàng kiến thức và chiến lược giao dịch định lượng, mỗi nền tảng đều thể hiện trí tuệ và kinh nghiệm của các nhà phát triển. Chúng tôi hoan nghênh mọi người khám phá các chiến lược và kỹ thuật giao dịch giá trị tại đây. Xin chân thành cảm ơn tất cả những người dùng sáng tạo và chia sẻ của chúng tôi. Nhờ những đóng góp của bạn, nền tảng này đã trở thành một địa điểm quan trọng để học hỏi và trao đổi về giao dịch định lượng, giúp mọi người nâng cao kỹ năng và chuyên môn.
”`python “‘backtest start: 2024-11-26 00:00:00 end: 2024-12-03 00:00:00 period: 1d basePeriod: 1d exchanges: [{“eid”:“Futures_Binance”,“currency”:“BTC_USDT”}] “’
import json, talib import numpy as np
class TrendStrategy: def init(self): # 基本设置 self._Currency = TradeCurrency self._Interval = Interval self._UseQuarter = UseQuarter self._UseContract = TradeCurrency + (‘.swap’ if self._UseQuarter else ‘.quarter’) self._OnlyTrendJudgment = OnlyTrendJudgment self._EnableMessageSend = EnableMessageSend # 趋势判断 self._RunInKLinePeriod = RunInKLinePeriod self._KLinePeriod = KLinePeriod self._EmaLength = EmaLength self._EmaCoefficient = EmaCoefficient self._UseStddev = UseStddev self._UseRecordsMiddleValue = UseRecordsMiddleValue self._StddevLength = StddevLength self._StddevDeviations = StddevDeviations # 下单设置 self._MarginLevel = MarginLevel self._OrderSize = OrderSize self._OrderByMargin = OrderByMargin self._OrderMarginPercent = OrderMarginPercent self._PricePrecision = None self._AmountPrecision = None self._OneSizeInCurrentCoin = None self._QuarterOneSizeValue = None # 止盈止损 self._UseStopLoss = UseStopLoss self._StopLossPercent = StopLossPercent self._UseTakeProfit = UseTakeProfit self._TakeProfitPercent = TakeProfitPercent self._UseTrackingTakeProfit = UseTrackingTakeProfit self._UsePositionRetracement = UsePositionRetracement self._TakeProfitTriggerPercent = TakeProfitTriggerPercent self._CallBakcPercent = CallBakcPercent
# 策略变量
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# 统计变量
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# 相对固定参数
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# 获取合约信息
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('获取市场信息:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# 初始化数据
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("已经重新计算下单张数:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("止盈和回调止盈不能同时使用!")
# 设置合约
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# 保存程序起始运行时间 秒级时间戳
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# 设置程序起始运行时间 秒级时间戳
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# 计算两个时间戳之间的天数,参数是秒级时间戳
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# 保存数据到本地
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# 存储到本地
_G(exchange.GetLabel(), self._UserDatas)
Log("已把所有数据保存到本地.")
# 读取用户本地数据,程序启动时候运行一次
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# 清除用户本地数据,交互按钮点击运行
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ":已清除本地数据.")
# 策略交互
def runCmd(self):
cmd = GetCommand()
if cmd:
# 检测交互命令
Log("接收到的命令:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# 清除本地数据
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# 保存数据到本地
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# 清除日志
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# 修改下单张数
if self._OrderByMargin:
Log("已经使用保证金数量来下单,无法直接修改下单数量!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("下单张数已经修改为:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# 修改下单保证金百分比
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("下单保证金百分比:", self._OrderMarginPercent, "%")
else:
Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")
# 交易函数
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("设置的参数有误,下单数量已经小于0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# 重新计算下单数量
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # u本位数量(杠杆放大数量)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # 币本位数量(杠杆放大数量)
else:
self._OrderSize = amount
return self._OrderSize
# 获取单个持仓占用保证金
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# 获取单向持仓的收益和收益%
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# 计算强平价格
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# 计算最大可下单张数
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# 获取账户资产
def getAccountAsset(self, position, account, ticker):
# 计算不同情况下的账户初始资产
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# 收益统计
def calculateProfit(self, ticker):
# 重新获取一下账户持仓与资产
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# 当前总收益 - 上一次总收益 = 本次的收益
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " 本次收益:", _N(current_profit, 6))
self.saveUserDatasLocal()
# 是否还够资金下单
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# 按照K线周期运行策略核心
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# 趋势判断模块(可编辑具体指标)
def trendJudgment(self, records):
# 检查价格是否穿过均线
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # 完成穿过
# EMA的多空判断
ema_long = False
ema_short = False
price = records[-2].Close # 已经收盘的K线的收盘价格
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # 收盘K线对应ema值
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# 标准差判断
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # 将 list 转换为 np.array
stddev = np.std(records_data, ddof=1) # 使用 numpy 计算标准差
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# 趋势判断
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# 止损
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 止盈
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# 回调止盈
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# 多单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格高点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # 计算回调的止盈价格
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # 平多
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = 1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格高点
Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# 空单持仓
if self._TriggeredTakeProfit:
# 已达到触发价格,监控是否止盈
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # 更新价格低点
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # 计算回调的止盈价格
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # 计算回调的止盈价格
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # 平空
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # 复位触发标记
self._TrendWhenTakeProfitOrStopLoss = -1 # 记录止盈时候的趋势
self._HadTakeProfit = True # 记录发生了止盈
Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# 监控是否达到回调止盈的触发价格
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # 触发回调止盈
self._PeakPriceInPosition = price # 记录价格低点
Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
# 下单
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# 趋势多
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# 多单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
elif short:
# 趋势空
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# 空单持仓,不重复下单
return
else:
# 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("下单金额数量不足!")
# 趋势策略
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("同时有多空持仓!")
# 策略交互
self.runCmd()
# 状态栏信息打印
self.printLogStatus(ticker, account, position)
# 止损
self.stopLoss(position, ticker)
# 止盈
self.takeProfit(position, ticker)
# 回调止盈
self.trackingTakeProfit(position, ticker)
# 按照K线周期运行策略
if not self.runInKLinePeriod(records):
return
# 趋势判断和下单
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# 状态栏信息打印
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "策略总览",
"cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
"rows": []
}
table_account = {
"type": "table",
"title": "账户资金",
"cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
"rows": []
}
table_position = {
"type": "table",
"title": "持仓情况",
"cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
"rows": []
}
i = 0
# 策略总览
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())