策略源码
'''backtest
start: 2024-11-26 00:00:00
end: 2024-12-03 00:00:00
period: 1d
basePeriod: 1d
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}]
'''
import json, talib
import numpy as np
class TrendStrategy:
def __init__(self):
# Basic settings
self._Currency = TradeCurrency
self._Interval = Interval
self._UseQuarter = UseQuarter
self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter')
self._OnlyTrendJudgment = OnlyTrendJudgment
self._EnableMessageSend = EnableMessageSend
# Trend judgment
self._RunInKLinePeriod = RunInKLinePeriod
self._KLinePeriod = KLinePeriod
self._EmaLength = EmaLength
self._EmaCoefficient = EmaCoefficient
self._UseStddev = UseStddev
self._UseRecordsMiddleValue = UseRecordsMiddleValue
self._StddevLength = StddevLength
self._StddevDeviations = StddevDeviations
# Order settings
self._MarginLevel = MarginLevel
self._OrderSize = OrderSize
self._OrderByMargin = OrderByMargin
self._OrderMarginPercent = OrderMarginPercent
self._PricePrecision = None
self._AmountPrecision = None
self._OneSizeInCurrentCoin = None
self._QuarterOneSizeValue = None
# Take profit and stop loss
self._UseStopLoss = UseStopLoss
self._StopLossPercent = StopLossPercent
self._UseTakeProfit = UseTakeProfit
self._TakeProfitPercent = TakeProfitPercent
self._UseTrackingTakeProfit = UseTrackingTakeProfit
self._UsePositionRetracement = UsePositionRetracement
self._TakeProfitTriggerPercent = TakeProfitTriggerPercent
self._CallBakcPercent = CallBakcPercent
# Strategy variables
self._LastBarTime = 0
self._TrendWhenTakeProfitOrStopLoss = 0
self._HadStopLoss = False
self._TriggeredTakeProfit = False
self._PeakPriceInPosition = 0
self._HadTakeProfit = False
self._PriceCrossEMAStatus = 0
# Statistical variables
self._InitAsset = 0
self._ProfitLocal = 0
self._TakeProfitCount = 0
self._TradeCount = 0
self.StrategyRunTimeStampString = "strategy_run_time"
self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
self._UserDatas = None
# Relatively fixed parameters
self._MaintenanceMarginRate = 0.004
self._TakerFee = 0.0005
self._IsUsdtStandard = False
# Get contract information
ticker = _C(exchange.GetTicker, self._UseContract)
marketInfo = exchange.GetMarkets()[self._UseContract]
Log('Get market information:', marketInfo)
self._PricePrecision = marketInfo['PricePrecision']
self._AmountPrecision = marketInfo['AmountPrecision']
self._OneSizeInCurrentCoin = marketInfo['CtVal']
self._QuarterOneSizeValue = marketInfo['CtVal']
exchange.SetCurrency(self._Currency)
exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)
# Initialize data
def initDatas(self):
self.saveStrategyRunTime()
self.readUserDataLocal()
self._InitAsset = self._UserDatas["init_assets"]
self._ProfitLocal = self._UserDatas["profit_local"]
self._TakeProfitCount = self._UserDatas["take_profit_count"]
self._TradeCount = self._UserDatas["trade_count"]
if self._OrderByMargin:
self.getRealOrderSize(-1, self._OrderSize)
Log("Order quantity has been recalculated:", self._OrderSize)
if self._UseTakeProfit and self._UseTrackingTakeProfit:
raise Exception("Take profit and trailing take profit cannot be used simultaneously!")
# Set contract
def setContract(self):
self._IsUsdtStandard = "USDT" in self._Currency
exchange.SetCurrency(self._Currency)
if self._UseQuarter:
exchange.SetContractType("quarter")
else:
exchange.SetContractType("swap")
# Save program start run time (second-level timestamp)
def saveStrategyRunTime(self):
local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)
if local_data_strategy_run_time is None:
self._StrategyDatas["start_run_timestamp"] = Unix()
_G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
else:
self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time
# Set program start run time (second-level timestamp)
def setStrategyRunTime(self, timestamp):
_G(self.StrategyRunTimeStampString, timestamp)
self._StrategyDatas["start_run_timestamp"] = timestamp
# Calculate days between two timestamps (parameters are second-level timestamps)
def getDaysFromTimeStamp(self, start_time, end_time):
if end_time < start_time:
return 0
return (end_time - start_time) // (60 * 60 * 24)
# Save data locally
def saveUserDatasLocal(self):
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": self._ProfitLocal,
"take_profit_count": self._TakeProfitCount,
"trade_count": self._TradeCount
}
# Store locally
_G(exchange.GetLabel(), self._UserDatas)
Log("All data has been saved locally.")
# Read user local data, run once when program starts
def readUserDataLocal(self):
user_data = _G(exchange.GetLabel())
if user_data is None:
self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
self._UserDatas = {
"init_assets": self._InitAsset,
"profit_local": 0,
"take_profit_count": 0,
"trade_count": 0
}
else:
self._UserDatas = user_data
# Clear user local data, run when interactive button is clicked
def clearUserDataLocal(self):
_G(exchange.GetLabel(), None)
Log(exchange.GetLabel(), ": Local data has been cleared.")
# Strategy interaction
def runCmd(self):
cmd = GetCommand()
if cmd:
# Detect interactive commands
Log("Received command:", cmd, "#FF1CAE")
if cmd.startswith("ClearLocalData:"):
# Clear local data
self.clearUserDataLocal()
elif cmd.startswith("SaveLocalData:"):
# Save data locally
self.saveUserDatasLocal()
elif cmd.startswith("ClearLog:"):
# Clear logs
log_reserve = cmd.replace("ClearLog:", "")
LogReset(int(log_reserve))
elif cmd.startswith("OrderSize:"):
# Modify order quantity
if self._OrderByMargin:
Log("Order by margin is already enabled, cannot modify order quantity directly!")
else:
order_size = int(cmd.replace("OrderSize:", ""))
self._OrderSize = order_size
Log("Order quantity has been modified to:", self._OrderSize)
elif cmd.startswith("OrderMarginPercent:"):
# Modify order margin percentage
if self._OrderByMargin:
order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
self._OrderMarginPercent = order_margin_percent
Log("Order margin percentage:", self._OrderMarginPercent, "%")
else:
Log("Order by margin is not enabled, cannot modify order margin percentage!")
# Trading functions
def orderDirectly(self, distance, price, amount):
tradeFunc = None
if amount <= 0:
raise Exception("Parameter error, order quantity is already less than 0!")
if distance == "buy":
tradeFunc = exchange.Buy
elif distance == "sell":
tradeFunc = exchange.Sell
elif distance == "closebuy":
tradeFunc = exchange.Sell
else:
tradeFunc = exchange.Buy
exchange.SetDirection(distance)
return tradeFunc(price, amount)
def openLong(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("buy", price, real_amount)
def openShort(self, price, amount):
real_amount = self.getRealOrderSize(price, amount)
return self.orderDirectly("sell", price, real_amount)
def coverLong(self, price, amount):
return self.orderDirectly("closebuy", price, amount)
def coverShort(self, price, amount):
return self.orderDirectly("closesell", price, amount)
# Recalculate order quantity
def getRealOrderSize(self, price, amount):
real_price = price if price != -1 else _C(exchange.GetTicker).Last
if self._OrderByMargin:
if self._IsUsdtStandard:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision) # USDT-margined quantity (leveraged quantity)
else:
self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision) # Coin-margined quantity (leveraged quantity)
else:
self._OrderSize = amount
return self._OrderSize
# Get margin occupied by single position
def getSinglePositionMargin(self, position, ticker):
position_margin = 0
if len(position) > 0:
if self._IsUsdtStandard:
position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
else:
position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel
return position_margin
# Get profit and profit percentage of unidirectional position
def getSinglePositionProfit(self, position, ticker):
if len(position) == 0:
return [0, 0]
price = ticker.Last
position_margin = self.getSinglePositionMargin(position, ticker)
position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
position_profit = position_margin * position_profit_percent
return [position_profit, position_profit_percent]
# Calculate liquidation price
def calculateForcedPrice(self, account, position, ticker):
position_profit = 0
total_avail_balance = 0
forced_price = 0
position_margin = self.getSinglePositionMargin(position, ticker)
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
if self._IsUsdtStandard:
total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
if position[0].Type == PD_LONG:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
else:
total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
if position[0].Type == PD_LONG:
forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
else:
forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)
if forced_price < 0:
forced_price = 0
return forced_price
# Calculate maximum order quantity
def getMaxOrderSize(self, margin_level, ticker, account):
max_order_size = 0
if self._IsUsdtStandard:
max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
else:
max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level
return _N(max_order_size, self._AmountPrecision)
# Get account assets
def getAccountAsset(self, position, account, ticker):
# Calculate account initial assets under different situations
account_asset = 0
position_margin = self.getSinglePositionMargin(position, ticker)
if self._IsUsdtStandard:
if len(position) > 0:
account_asset = account.Balance + account.FrozenBalance + position_margin
else:
account_asset = account.Balance + account.FrozenBalance
else:
if len(position) > 0:
account_asset = account.Stocks + account.FrozenStocks + position_margin
else:
account_asset = account.Stocks + account.FrozenStocks
return account_asset
# Profit statistics
def calculateProfit(self, ticker):
# Re-obtain account position and assets
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
# Current total profit - previous total profit = current profit
current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
self._ProfitLocal += current_profit
if current_profit > 0:
self._TakeProfitCount += 1
self._TradeCount += 1
LogProfit(_N(self._ProfitLocal, 4), " Current profit:", _N(current_profit, 6))
self.saveUserDatasLocal()
# Check if there are enough funds to place order
def isEnoughAssetToOrder(self, order_size, ticker):
is_enough = True
account = _C(exchange.GetAccount)
if self._IsUsdtStandard:
if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
is_enough = False
else:
if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
is_enough = False
return is_enough
# Run strategy core according to K-line period
def runInKLinePeriod(self, records):
bar_time = records[-1].Time
if self._RunInKLinePeriod and self._LastBarTime == bar_time:
return False
self._LastBarTime = bar_time
return True
# Trend judgment module (editable specific indicators)
def trendJudgment(self, records):
# Check if price crosses EMA
def checkPriceCrossEma(price, ema_value):
if self._PriceCrossEMAStatus == 0:
if price <= ema_value:
self._PriceCrossEMAStatus = -1
else:
self._PriceCrossEMAStatus = 1
elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
self._PriceCrossEMAStatus = 2 # Completed crossing
# EMA long/short judgment
ema_long = False
ema_short = False
price = records[-2].Close # Close price of already closed K-line
ema = TA.EMA(records, self._EmaLength)
ema_value = ema[-2] # EMA value corresponding to closed K-line
ema_upper = ema_value * (1 + self._EmaCoefficient)
ema_lower = ema_value * (1 - self._EmaCoefficient)
checkPriceCrossEma(price, ema_value)
if price > ema_upper:
ema_long = True
elif price < ema_lower:
ema_short = True
# Standard deviation judgment
in_trend = False
if self._UseStddev:
records_data = []
for i in range(len(records)):
records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)
records_data = np.array(records_data) # Convert list to np.array
stddev = np.std(records_data, ddof=1) # Use numpy to calculate standard deviation
if stddev > self._StddevDeviations:
in_trend = True
else:
in_trend = True
# Trend judgment
long = in_trend and ema_long
short = in_trend and ema_short
if long:
Log("Current trend is: Long", self._EnableMessageSend and "@" or "#00FF7F")
elif short:
Log("Current trend is: Short", self._EnableMessageSend and "@" or "#FF0000")
else:
Log("Current trend is: Sideways", self._EnableMessageSend and "@" or "#007FFF")
return [long, short]
# Stop loss
def stopLoss(self, position, ticker):
stop_loss_price = 0
price = ticker.Last
if len(position) == 1 and self._UseStopLoss:
if position[0].Type == PD_LONG:
stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
if price < stop_loss_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadStopLoss = True
Log("Long position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
if price > stop_loss_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadStopLoss = True
Log("Short position stop loss. Stop loss price:", _N(stop_loss_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# Take profit
def takeProfit(self, position, ticker):
take_profit_price = 0
price = ticker.Last
if len(position) == 1 and self._UseTakeProfit:
if position[0].Type == PD_LONG:
take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
if price > take_profit_price:
self.coverLong(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = 1
self._HadTakeProfit = True
Log("Long position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
elif position[0].Type == PD_SHORT:
take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
if price < take_profit_price:
self.coverShort(-1, position[0].Amount)
self.calculateProfit(ticker)
self._TrendWhenTakeProfitOrStopLoss = -1
self._HadTakeProfit = True
Log("Short position take profit. Take profit price:", _N(take_profit_price, 6), ", Position price:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
# Trailing take profit
def trackingTakeProfit(self, position, ticker):
take_profit_price = 0
trigger_price = 0
price = ticker.Last
if len(position) > 0 and self._UseTrackingTakeProfit:
if position[0].Type == PD_LONG:
# Long position holding
if self._TriggeredTakeProfit:
# Trigger price reached, monitor for take profit
self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition # Update price peak
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100) # Calculate retracement take profit price
else:
take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100) # Calculate retracement take profit price
if price < take_profit_price:
self.coverLong(-1, position[0].Amount) # Close long
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # Reset trigger flag
self._TrendWhenTakeProfitOrStopLoss = 1 # Record trend when taking profit
self._HadTakeProfit = True # Record that take profit occurred
Log("Long position trailing take profit: Position price peak:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6),
", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# Monitor if trailing take profit trigger price is reached
trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
if price > trigger_price:
self._TriggeredTakeProfit = True # Trigger trailing take profit
self._PeakPriceInPosition = price # Record price peak
Log("Long position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6))
elif position[0].Type == PD_SHORT:
# Short position holding
if self._TriggeredTakeProfit:
# Trigger price reached, monitor for take profit
self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition # Update price low
if self._UsePositionRetracement:
take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100) # Calculate retracement take profit price
else:
take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100) # Calculate retracement take profit price
if price > take_profit_price:
self.coverShort(-1, position[0].Amount) # Close short
self.calculateProfit(ticker)
self._TriggeredTakeProfit = False # Reset trigger flag
self._TrendWhenTakeProfitOrStopLoss = -1 # Record trend when taking profit
self._HadTakeProfit = True # Record that take profit occurred
Log("Short position trailing take profit: Position price low:", _N(self._PeakPriceInPosition, 6), ", Take profit price:", _N(take_profit_price, 6), ", Current price:", _N(price, 6),
", Position price:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
else:
# Monitor if trailing take profit trigger price is reached
trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
if price < trigger_price:
self._TriggeredTakeProfit = True # Trigger trailing take profit
self._PeakPriceInPosition = price # Record price low
Log("Short position reached trailing take profit trigger price:", _N(trigger_price, 6), ", Current price:", _N(price, 6), ", Position price:", _N(position[0].Price, 6))
# Place order
def order(self, long, short, position, ticker):
position_size = position[0].Amount if len(position) > 0 else 0
position_type = position[0].Type if len(position) > 0 else None
if long:
# Long trend
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
# Stop loss or take profit occurred, and trend was long when it happened, do not go long again
return
if position_size > 0 and position_type == PD_SHORT:
self.coverShort(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_LONG:
# Long position holding, do not place duplicate orders
return
else:
# No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openLong(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("Insufficient order amount!")
elif short:
# Short trend
if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
# Stop loss or take profit occurred, and trend was short when it happened, do not go short again
return
if position_size > 0 and position_type == PD_LONG:
self.coverLong(-1, position_size)
self.calculateProfit(ticker)
elif position_size > 0 and position_type == PD_SHORT:
# Short position holding, do not place duplicate orders
return
else:
# No position, if first run or strategy restart, need to wait for price to cross EMA once before placing order
if self._PriceCrossEMAStatus != 2:
return
if self.isEnoughAssetToOrder(self._OrderSize, ticker):
self.openShort(-1, self._OrderSize)
self._HadStopLoss = False
self._HadTakeProfit = False
else:
raise Exception("Insufficient order amount!")
# Trend strategy
def trendStrategy(self):
ticker = _C(exchange.GetTicker)
position = _C(exchange.GetPosition)
account = _C(exchange.GetAccount)
records = _C(exchange.GetRecords, self._KLinePeriod * 60)
if len(position) > 1:
Log(position)
raise Exception("Simultaneous long and short positions!")
# Strategy interaction
self.runCmd()
# Status bar information printing
self.printLogStatus(ticker, account, position)
# Stop loss
self.stopLoss(position, ticker)
# Take profit
self.takeProfit(position, ticker)
# Trailing take profit
self.trackingTakeProfit(position, ticker)
# Run strategy according to K-line period
if not self.runInKLinePeriod(records):
return
# Trend judgment and order placement
long = False
short = False
[long, short] = self.trendJudgment(records)
if not self._OnlyTrendJudgment:
self.order(long, short, position, ticker)
# Status bar information printing
def printLogStatus(self, ticker, account, position):
table_overview = {
"type": "table",
"title": "Strategy Overview",
"cols": ["Start Time", "Running Days", "Trade Count", "Win Rate", "Est. Monthly %", "Est. Annual %"],
"rows": []
}
table_account = {
"type": "table",
"title": "Account Funds",
"cols": ["Current Asset", "Initial Asset", "Available Balance", "Frozen Balance", "Max Order Size", "Profit", "Profit %"],
"rows": []
}
table_position = {
"type": "table",
"title": "Position Info",
"cols": ["Symbol", "Leverage", "Avg Entry Price", "Direction", "Size", "Margin", "Est. Liq. Price", "Unrealized PnL", "Unrealized PnL %"],
"rows": []
}
i = 0
# Strategy Overview
the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
monthly_rate_of_profit = 0
if the_running_days > 1:
monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
# Account Funds
current_asset = self.getAccountAsset(position, account, ticker)
max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
asset_profit = current_asset - self._InitAsset
asset_profit_percent = asset_profit / self._InitAsset
table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
_N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
str(_N(asset_profit_percent * 100, 2)) + "%"])
# Position Info
position_direction = ""
forced_cover_up_price = 0
position_profit_percent = 0
position_profit = 0
position_margin = 0
if len(position) == 0:
table_position["rows"].append(["No Position", "-", "-", "-", "-", "-", "-", "-", "-"])
else:
position_direction = "Long" if position[0].Type == PD_LONG else "Short"
[position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
position_margin = self.getSinglePositionMargin(position, ticker)
forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount,
_N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"])
# Print tables
LogStatus('`' + json.dumps(table_overview) + '`\n'
+ '`' + json.dumps(table_account) + '`\n'
+ '`' + json.dumps(table_position) + '`\n')
# main
def main():
exchange.IO('simulate', True)
strategy = TrendStrategy()
strategy.setContract()
strategy.initDatas()
while True:
strategy.trendStrategy()
Sleep(strategy._Interval)