avatar of ianzeng123 ianzeng123
关注 私信
2
关注
319
关注者

发明者平台淘金记:高灵活性Python趋势交易框架实战解析

创建于: 2025-08-22 17:25:52, 更新于: 2025-08-26 09:56:27
comments   0
hits   468

发明者平台淘金记:高灵活性Python趋势交易框架实战解析

经常在发明者平台闲逛,总会找到宝贝,今天找到了一篇21年的趋势策略,感叹于原作者的精妙和完善的代码架构,并且具有高度的灵活性。原始策略是JS版本的,为方便Python朋友,进行了改写。

说实话,刚接触量化交易的时候,不少新手朋友会走很多弯路。经常遇到下单失败、风控没做好亏钱、策略重启后数据丢失等各种问题。后来慢慢意识到,有一个好的框架真的很重要,可以帮我们避开很多坑。这个趋势策略框架就是这样一个好东西。它不只是一个简单的交易策略,更像是一个工具箱,把下单、止损、数据管理这些基础但重要的功能都给你做好了。你只需要专心考虑”什么时候买”、”什么时候卖”这些核心问题就行。而且这个框架的设计很开放,你可以很容易地把EMA换成MACD、RSI或者其他你喜欢的指标。想做趋势跟踪?没问题。想试试均值回归?也行。甚至想组合多个指标一起用?完全可以。这种灵活性让我觉得很实用,一套代码可以改来改去试验不同的想法。

今天把这个框架分享出来,希望能帮到在量化路上摸索的朋友们。下面来详细介绍一下这个框架的各个部分,相信看完你也会觉得很有用。

框架结构与功能

相对于多品种交易框架中使用的多个独立函数,本框架尝试使用类(Class)的格式来组织和管理策略的各个部分。这种面向对象的设计方式不仅提高了代码的可维护性和可扩展性,还使得策略的各个组件更加模块化,便于后续的调整与优化。该框架主要由以下几个板块组成,每个板块中的函数都具有特定的功能,确保策略的灵活性和实用性。

初始化与设置

init函数

  • 功能__init__ 函数是策略类的初始化方法,负责设置策略的基本配置、初始化变量以及获取市场信息。该函数确保策略在运行之前已配置好所需的参数,确保后续的交易操作能够顺利执行。
  • 步骤
    1. 基础配置:设置交易货币、合约类型、止盈止损规则等。
    2. 市场信息:获取合约的价格精度、数量精度等,确保订单合法。
    3. 初始化变量:包括趋势判断、止盈止损参数、统计变量等,帮助策略根据市场情况进行决策。
    4. 交易所设置:根据市场信息配置交易所API接口,如设置保证金、精度等。

initDatas 函数

  • 功能:初始化策略运行时的数据,包括账户资产、收益统计等。
  • 步骤
    1. 保存策略运行时间。
    2. 读取本地用户数据。
    3. 初始化账户资产、收益统计等数据。
    4. 检查止盈和回调止盈是否同时启用。

数据管理与保存

saveStrategyRunTime 函数

  • 功能:保存策略的起始运行时间,用于后续的统计和监控。
  • 步骤
    1. 检查本地是否已保存运行时间。
    2. 若未保存,则记录当前时间并保存到本地。
    3. 若已保存,则读取本地保存的时间。

setStrategyRunTime 函数

  • 功能:设置策略的起始运行时间,并保存到本地存储中。
  • 步骤
    1. 使用平台的_G函数将传入的时间戳保存到本地。
    2. 更新策略数据中的起始运行时间。

getDaysFromTimeStamp 函数

  • 功能:计算两个时间戳之间的天数差,用于统计策略运行时长。
  • 步骤
    1. 检查结束时间是否早于开始时间,若是则返回0。
    2. 计算两个时间戳之间的秒数差,并转换为天数。
    3. 返回天数差。

saveUserDatasLocal 函数

  • 功能:将策略运行中的关键数据保存到本地,以便在策略重启时恢复。
  • 步骤
    1. 将账户资产、收益统计等数据打包。
    2. 使用平台的_G函数将数据保存到本地。

readUserDataLocal 函数

  • 功能:读取本地保存的用户数据,用于策略重启时的数据恢复。
  • 步骤
    1. 检查本地是否有保存的数据。
    2. 若没有,则初始化数据并保存到本地。
    3. 若有,则读取并加载到策略中。

clearUserDataLocal 函数

  • 功能:清除本地保存的用户数据,通常用于策略重置或调试。
  • 步骤
    1. 使用平台的_G函数清除本地数据。
    2. 记录清除操作。

策略交互与命令处理

runCmd 函数

  • 功能:处理用户通过交互界面发送的命令,如清除本地数据、修改下单数量等。
  • 步骤
    1. 获取用户发送的命令。
    2. 根据命令类型执行相应操作,如清除本地数据、修改下单数量等。
    3. 记录命令执行结果。

交易与订单管理

orderDirectly 函数

  • 功能:根据方向和价格直接下单,支持开仓、平仓操作。
  • 步骤
    1. 根据方向选择交易函数(买入或卖出)。
    2. 设置交易方向。
    3. 执行下单操作并返回结果。

openLong 函数

  • 功能:开多仓,根据价格和数量下单。
  • 步骤
    1. 计算实际下单数量。
    2. 调用orderDirectly函数执行买入操作。

openShort 函数

  • 功能:开空仓,根据价格和数量下单。
  • 步骤
    1. 计算实际下单数量。
    2. 调用orderDirectly函数执行卖出操作。

coverLong 函数

  • 功能:平多仓,根据价格和数量下单。
  • 步骤
    1. 调用orderDirectly函数执行卖出操作。

coverShort 函数

  • 功能:平空仓,根据价格和数量下单。
  • 步骤
    1. 调用orderDirectly函数执行买入操作。

getRealOrderSize 函数

  • 功能:根据价格和数量重新计算实际下单数量,支持按保证金比例下单。
  • 步骤
    1. 根据是否按保证金比例下单,计算实际下单数量。
    2. 返回计算后的下单数量。

风险管理与收益统计

getSinglePositionMargin 函数

  • 功能:计算单个持仓占用的保证金。
  • 步骤
    1. 根据持仓方向和数量计算保证金。
    2. 返回计算结果。

getSinglePositionProfit 函数

  • 功能:计算单个持仓的收益和收益率。
  • 步骤
    1. 根据持仓方向和当前价格计算收益。
    2. 返回收益和收益率。

calculateForcedPrice 函数

  • 功能:计算持仓的强平价格。
  • 步骤
    1. 根据持仓方向和账户余额计算强平价格。
    2. 返回计算结果。

getMaxOrderSize 函数

  • 功能:计算最大可下单数量。
  • 步骤
    1. 根据账户余额和杠杆计算最大可下单数量。
    2. 返回计算结果。

getAccountAsset 函数

  • 功能:计算账户总资产,包括持仓和可用余额。
  • 步骤
    1. 根据持仓和账户余额计算总资产。
    2. 返回计算结果。

calculateProfit 函数

  • 功能:计算并记录策略的收益情况。
  • 步骤
    1. 计算当前总收益与初始资产的差值。
    2. 记录收益并更新统计变量。
    3. 保存收益数据到本地。

isEnoughAssetToOrder 函数

  • 功能:检查账户资金是否足够下单。
  • 步骤
    1. 获取账户余额信息。
    2. 根据交易货币类型(USDT本位或币本位)计算所需资金。
    3. 检查账户余额是否满足下单需求。
    4. 返回布尔值,表示资金是否足够。

趋势判断与交易逻辑

runInKLinePeriod 函数

  • 功能:根据K线周期判断是否执行策略逻辑。
  • 步骤
    1. 检查当前K线是否已处理。
    2. 若未处理,则标记为已处理并返回True,否则返回False

trendJudgment 函数(核心趋势判断模块)

  • 功能:根据技术指标判断当前趋势。这是整个框架最具灵活性的模块,用户可以根据自己的需求替换不同的技术指标来进行趋势判断。
  • 当前实现:使用EMA(指数移动平均线)结合标准差来判断趋势
  • 可扩展性:该函数设计为可插拔式模块,用户可以轻松替换为其他技术指标,如:
    • RSI(相对强弱指数):判断超买超卖状态
    • MACD(移动平均收敛发散):识别趋势转换点
    • 布林带(Bollinger Bands):基于价格波动性的趋势判断
    • KDJ指标:结合动量和趋势的综合判断
    • 多指标组合:可以组合多个指标进行更精确的趋势判断
  • 步骤
    1. 计算EMA指标并判断价格是否穿过EMA。
    2. 根据标准差判断是否处于趋势中。
    3. 返回当前趋势(多、空或震荡)。

stopLoss 函数

  • 功能:根据止损规则执行止损操作。
  • 步骤
    1. 检查持仓是否达到止损条件。
    2. 若达到,则执行平仓操作并记录止损信息。

takeProfit 函数

  • 功能:根据止盈规则执行止盈操作。
  • 步骤
    1. 检查持仓是否达到止盈条件。
    2. 若达到,则执行平仓操作并记录止盈信息。

trackingTakeProfit 函数

  • 功能:根据回调止盈规则执行止盈操作。
  • 步骤
    1. 检查持仓是否达到回调止盈触发条件。
    2. 若达到,则执行平仓操作并记录止盈信息。

order 函数

  • 功能:根据趋势判断结果执行下单操作。
  • 步骤
    1. 检查当前持仓情况。
    2. 根据趋势判断结果执行开仓或平仓操作。

策略核心逻辑

trendStrategy 函数

  • 功能:策略的核心逻辑函数,负责执行趋势判断、止损止盈、回调止盈以及下单操作。
  • 步骤
    1. 获取市场数据:获取当前行情、持仓信息、账户信息和K线数据。
    2. 检查持仓:确保没有同时持有多空仓位,否则抛出异常。
    3. 策略交互:处理用户通过交互界面发送的命令。
    4. 状态栏信息打印:更新并打印策略运行状态、账户信息和持仓情况。
    5. 止损:根据止损规则检查并执行止损操作。
    6. 止盈:根据止盈规则检查并执行止盈操作。
    7. 回调止盈:根据回调止盈规则检查并执行止盈操作。
    8. K线周期检查:确保策略逻辑按照K线周期执行。
    9. 趋势判断:根据技术指标判断当前趋势(多、空或震荡)。
    10. 下单:根据趋势判断结果执行开仓或平仓操作。

状态监控与日志输出

printLogStatus 函数

  • 功能:打印策略运行状态、账户信息和持仓情况。
  • 步骤
    1. 构建策略总览、账户资金和持仓情况的表格数据。
    2. 使用LogStatus函数将表格数据输出到状态栏。

主函数与策略执行

main 函数

  • 功能:策略的主函数,负责初始化策略并循环执行策略逻辑。
  • 步骤
    1. 初始化交易所模拟环境。
    2. 创建策略实例并初始化数据。
    3. 循环执行策略逻辑,每隔一定时间间隔检查市场情况并执行交易操作。

框架特性

  1. 灵活的趋势判断:通过EMA指标和标准差,策略能够灵活判断市场趋势,适用于不同的市场环境。该函数只是一个示例,用户可以根据需要使用不同的技术指标(如RSI、MACD、布林带等)进行趋势判断。
  2. 多样的止损止盈机制:支持固定百分比止损止盈和回调止盈,满足不同风险偏好的交易者需求。
  3. 本地数据管理:策略运行数据和用户数据均保存在本地,确保策略重启后能够恢复到之前的状态。
  4. 交互式命令:支持通过命令行与策略进行交互,方便用户调整策略参数或执行特定操作。

可适用性

该框架不仅适用于数字货币市场,还可以通过添加不同的技术指标(如RSI、MACD等)在trendJudgment函数中进行扩展,以适应不同的交易策略需求。此外,该框架还可以针对现货市场或多品种合约进行针对性的修改,具有较高的灵活性和可扩展性。

  1. 现货市场支持:目前该框架主要针对合约市场,未来可以扩展支持现货市场的交易策略。
  2. 多品种合约:通过添加多品种合约的支持,策略可以同时监控和交易多个数字货币,提高资金利用率。
  3. 机器学习集成:结合机器学习算法,进一步提升趋势判断的准确性和策略的智能化水平。
  4. 风险管理优化:进一步优化风险管理机制,如动态调整杠杆倍数、多层次止损止盈等,以提高策略的稳健性。

总结

作为一个功能全面、灵活性高的自动化交易系统,该框架适用于数字货币市场的趋势交易。通过不断优化和扩展,它有望在未来成为数字货币交易者的得力工具,帮助用户更好的开发属于自己的量化策略。”数字货币趋势策略交易框架”结构完善,代码量虽然较多,但从实盘交易的角度来看,它基本涵盖了趋势交易中所需的核心功能模块。因此,无论是从学习交易策略的角度,还是从实际应用的角度,该框架都具有重要的参考价值和实践意义。其功能全面性和灵活性使其能够适应不同的市场环境,为大家提供强大的支持。

发明者平台作为一个量化交易的宝库,包含了数字货币量化交易的大量知识和策略,每一个策略都凝聚着开发者的智慧和经验。欢迎大家来这里淘金,探索更多有价值的交易策略和技术分享。感谢各位富有创新精神和乐于分享的用户,正是因为大家的贡献,才让这个平台成为了量化交易学习和交流的重要场所,共同帮助大家提升量化交易的技能和水平。

'''backtest
start: 2024-11-26 00:00:00
end: 2024-12-03 00:00:00
period: 1d
basePeriod: 1d
exchanges: [{"eid":"Futures_Binance","currency":"BTC_USDT"}]
'''

import json, talib
import numpy as np

# 参数面板设置
# 参考策略:https://www.fmz.com/strategy/473950


class TrendStrategy:
    def __init__(self):
        # 基本设置
        self._Currency = TradeCurrency
        self._Interval = Interval
        self._UseQuarter = UseQuarter
        self._UseContract = TradeCurrency + ('.swap' if self._UseQuarter else '.quarter')
        self._OnlyTrendJudgment = OnlyTrendJudgment
        self._EnableMessageSend = EnableMessageSend
        # 趋势判断
        self._RunInKLinePeriod = RunInKLinePeriod
        self._KLinePeriod = KLinePeriod
        self._EmaLength = EmaLength
        self._EmaCoefficient = EmaCoefficient
        self._UseStddev = UseStddev
        self._UseRecordsMiddleValue = UseRecordsMiddleValue
        self._StddevLength = StddevLength
        self._StddevDeviations = StddevDeviations
        # 下单设置
        self._MarginLevel = MarginLevel
        self._OrderSize = OrderSize
        self._OrderByMargin = OrderByMargin
        self._OrderMarginPercent = OrderMarginPercent
        self._PricePrecision = None
        self._AmountPrecision = None
        self._OneSizeInCurrentCoin = None
        self._QuarterOneSizeValue = None
        # 止盈止损
        self._UseStopLoss = UseStopLoss
        self._StopLossPercent = StopLossPercent
        self._UseTakeProfit = UseTakeProfit
        self._TakeProfitPercent = TakeProfitPercent
        self._UseTrackingTakeProfit = UseTrackingTakeProfit
        self._UsePositionRetracement = UsePositionRetracement
        self._TakeProfitTriggerPercent = TakeProfitTriggerPercent
        self._CallBakcPercent = CallBakcPercent

        # 策略变量
        self._LastBarTime = 0
        self._TrendWhenTakeProfitOrStopLoss = 0
        self._HadStopLoss = False
        self._TriggeredTakeProfit = False
        self._PeakPriceInPosition = 0
        self._HadTakeProfit = False
        self._PriceCrossEMAStatus = 0

        # 统计变量
        self._InitAsset = 0
        self._ProfitLocal = 0
        self._TakeProfitCount = 0
        self._TradeCount = 0
        self.StrategyRunTimeStampString = "strategy_run_time"
        self._StrategyDatas = {"start_run_timestamp": 0, "others": ""}
        self._UserDatas = None

        # 相对固定参数
        self._MaintenanceMarginRate = 0.004
        self._TakerFee = 0.0005
        self._IsUsdtStandard = False

        # 获取合约信息
        ticker = _C(exchange.GetTicker, self._UseContract)
        marketInfo = exchange.GetMarkets()[self._UseContract]
        Log('获取市场信息:', marketInfo)
        self._PricePrecision = marketInfo['PricePrecision']
        self._AmountPrecision = marketInfo['AmountPrecision']
        self._OneSizeInCurrentCoin = marketInfo['CtVal']
        self._QuarterOneSizeValue = marketInfo['CtVal']

        exchange.SetCurrency(self._Currency)
        exchange.SetMarginLevel(self._UseContract, self._MarginLevel)
        exchange.SetPrecision(self._PricePrecision, self._AmountPrecision)

        # 初始化数据
    def initDatas(self):

        self.saveStrategyRunTime()
        self.readUserDataLocal()

        self._InitAsset = self._UserDatas["init_assets"]
        self._ProfitLocal = self._UserDatas["profit_local"]
        self._TakeProfitCount = self._UserDatas["take_profit_count"]
        self._TradeCount = self._UserDatas["trade_count"]

        if self._OrderByMargin:
            self.getRealOrderSize(-1, self._OrderSize)
            Log("已经重新计算下单张数:", self._OrderSize)
        if self._UseTakeProfit and self._UseTrackingTakeProfit:
            raise Exception("止盈和回调止盈不能同时使用!")

    # 设置合约
    def setContract(self):
        self._IsUsdtStandard = "USDT" in self._Currency

        exchange.SetCurrency(self._Currency)
        if self._UseQuarter:
            exchange.SetContractType("quarter")
        else:
            exchange.SetContractType("swap")

    # 保存程序起始运行时间 秒级时间戳
    def saveStrategyRunTime(self):
        local_data_strategy_run_time = _G(self.StrategyRunTimeStampString)

        if local_data_strategy_run_time is None:
            self._StrategyDatas["start_run_timestamp"] = Unix()
            _G(self.StrategyRunTimeStampString, self._StrategyDatas["start_run_timestamp"])
        else:
            self._StrategyDatas["start_run_timestamp"] = local_data_strategy_run_time

    # 设置程序起始运行时间 秒级时间戳
    def setStrategyRunTime(self, timestamp):
        _G(self.StrategyRunTimeStampString, timestamp)
        self._StrategyDatas["start_run_timestamp"] = timestamp

    # 计算两个时间戳之间的天数,参数是秒级时间戳
    def getDaysFromTimeStamp(self, start_time, end_time):
        if end_time < start_time:
            return 0

        return (end_time - start_time) // (60 * 60 * 24)

    # 保存数据到本地
    def saveUserDatasLocal(self):
        self._UserDatas = {
            "init_assets": self._InitAsset,
            "profit_local": self._ProfitLocal,
            "take_profit_count": self._TakeProfitCount,
            "trade_count": self._TradeCount
        }
        # 存储到本地
        _G(exchange.GetLabel(), self._UserDatas)
        Log("已把所有数据保存到本地.")

    # 读取用户本地数据,程序启动时候运行一次
    def readUserDataLocal(self):
        user_data = _G(exchange.GetLabel())
        if user_data is None:
            self._InitAsset = self.getAccountAsset(_C(exchange.GetPosition), _C(exchange.GetAccount), _C(exchange.GetTicker))
            self._UserDatas = {
                "init_assets": self._InitAsset,
                "profit_local": 0,
                "take_profit_count": 0,
                "trade_count": 0
            }
        else:
            self._UserDatas = user_data

    # 清除用户本地数据,交互按钮点击运行
    def clearUserDataLocal(self):
        _G(exchange.GetLabel(), None)
        Log(exchange.GetLabel(), ":已清除本地数据.")

    # 策略交互
    def runCmd(self):
        cmd = GetCommand()

        if cmd:
            # 检测交互命令
            Log("接收到的命令:", cmd, "#FF1CAE")
            if cmd.startswith("ClearLocalData:"):
                # 清除本地数据
                self.clearUserDataLocal()
            elif cmd.startswith("SaveLocalData:"):
                # 保存数据到本地
                self.saveUserDatasLocal()
            elif cmd.startswith("ClearLog:"):
                # 清除日志
                log_reserve = cmd.replace("ClearLog:", "")
                LogReset(int(log_reserve))
            elif cmd.startswith("OrderSize:"):
                # 修改下单张数
                if self._OrderByMargin:
                    Log("已经使用保证金数量来下单,无法直接修改下单数量!")
                else:
                    order_size = int(cmd.replace("OrderSize:", ""))
                    self._OrderSize = order_size
                    Log("下单张数已经修改为:", self._OrderSize)
            elif cmd.startswith("OrderMarginPercent:"):
                # 修改下单保证金百分比
                if self._OrderByMargin:
                    order_margin_percent = float(cmd.replace("OrderMarginPercent:", ""))
                    self._OrderMarginPercent = order_margin_percent
                    Log("下单保证金百分比:", self._OrderMarginPercent, "%")
                else:
                    Log("没有打开根据保证金数量下单,无法修改下单保证金百分比!")

    # 交易函数
    def orderDirectly(self, distance, price, amount):
        tradeFunc = None

        if amount <= 0:
            raise Exception("设置的参数有误,下单数量已经小于0!")

        if distance == "buy":
            tradeFunc = exchange.Buy
        elif distance == "sell":
            tradeFunc = exchange.Sell
        elif distance == "closebuy":
            tradeFunc = exchange.Sell
        else:
            tradeFunc = exchange.Buy

        exchange.SetDirection(distance)
        return tradeFunc(price, amount)

    def openLong(self, price, amount):
        real_amount = self.getRealOrderSize(price, amount)
        return self.orderDirectly("buy", price, real_amount)

    def openShort(self, price, amount):
        real_amount = self.getRealOrderSize(price, amount)
        return self.orderDirectly("sell", price, real_amount)

    def coverLong(self, price, amount):
        return self.orderDirectly("closebuy", price, amount)

    def coverShort(self, price, amount):
        return self.orderDirectly("closesell", price, amount)

    # 重新计算下单数量
    def getRealOrderSize(self, price, amount):
        real_price = price if price != -1 else _C(exchange.GetTicker).Last
        if self._OrderByMargin:
            if self._IsUsdtStandard:
                
                self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) / real_price * self._MarginLevel / self._OneSizeInCurrentCoin, self._AmountPrecision)  # u本位数量(杠杆放大数量)
                
            else:
                self._OrderSize = _N(self._InitAsset * (self._OrderMarginPercent / 100) * self._MarginLevel * real_price / self._QuarterOneSizeValue, self._AmountPrecision)  # 币本位数量(杠杆放大数量)
        else:
            self._OrderSize = amount
        return self._OrderSize

    # 获取单个持仓占用保证金
    def getSinglePositionMargin(self, position, ticker):
        position_margin = 0

        if len(position) > 0:
            if self._IsUsdtStandard:
                position_margin = position[0].Amount * self._OneSizeInCurrentCoin * ticker.Last / self._MarginLevel
            else:
                position_margin = position[0].Amount * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel

        return position_margin

    # 获取单向持仓的收益和收益%
    def getSinglePositionProfit(self, position, ticker):
        if len(position) == 0:
            return [0, 0]

        price = ticker.Last
        position_margin = self.getSinglePositionMargin(position, ticker)

        position_profit_percent = (price - position[0].Price) / position[0].Price * self._MarginLevel if position[0].Type == PD_LONG else (position[0].Price - price) / position[0].Price * self._MarginLevel
        position_profit = position_margin * position_profit_percent

        return [position_profit, position_profit_percent]

    # 计算强平价格
    def calculateForcedPrice(self, account, position, ticker):
        position_profit = 0
        total_avail_balance = 0
        forced_price = 0

        position_margin = self.getSinglePositionMargin(position, ticker)
        [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)

        if self._IsUsdtStandard:
            total_avail_balance = account.Balance + position_margin + account.FrozenBalance - position_profit if position_profit > 0 else account.Balance + position_margin + account.FrozenBalance
            if position[0].Type == PD_LONG:
                forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin + (position[0].Amount * position[0].Price) / (position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
            else:
                forced_price = ((self._MaintenanceMarginRate + self._TakerFee) * self._MarginLevel * account.FrozenBalance - total_avail_balance) / self._OneSizeInCurrentCoin - (position[0].Amount * position[0].Price) / (-1 * position[0].Amount - (self._MaintenanceMarginRate + self._TakerFee) * position[0].Amount)
        else:
            total_avail_balance = account.Stocks + position_margin + account.FrozenStocks - position_profit if position_profit > 0 else account.Stocks + position_margin + account.FrozenStocks
            if position[0].Type == PD_LONG:
                forced_price = (self._MaintenanceMarginRate * position[0].Amount + position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue + position[0].Amount / position[0].Price)
            else:
                forced_price = (self._MaintenanceMarginRate * position[0].Amount - position[0].Amount) / (total_avail_balance / self._QuarterOneSizeValue - position[0].Amount / position[0].Price)

        if forced_price < 0:
            forced_price = 0

        return forced_price

    # 计算最大可下单张数
    def getMaxOrderSize(self, margin_level, ticker, account):
        max_order_size = 0

        if self._IsUsdtStandard:
            max_order_size = account.Balance * margin_level / (self._OneSizeInCurrentCoin * ticker.Last)
        else:
            max_order_size = account.Stocks * ticker.Last / self._QuarterOneSizeValue * margin_level

        return _N(max_order_size, self._AmountPrecision)

    # 获取账户资产
    def getAccountAsset(self, position, account, ticker):
        # 计算不同情况下的账户初始资产
        account_asset = 0
        position_margin = self.getSinglePositionMargin(position, ticker)

        if self._IsUsdtStandard:
            if len(position) > 0:
                account_asset = account.Balance + account.FrozenBalance + position_margin
            else:
                account_asset = account.Balance + account.FrozenBalance
        else:
            if len(position) > 0:
                account_asset = account.Stocks + account.FrozenStocks + position_margin
            else:
                account_asset = account.Stocks + account.FrozenStocks

        return account_asset

    # 收益统计
    def calculateProfit(self, ticker):
        # 重新获取一下账户持仓与资产
        position = _C(exchange.GetPosition)
        account = _C(exchange.GetAccount)
        # 当前总收益 - 上一次总收益 = 本次的收益
        current_profit = (self.getAccountAsset(position, account, ticker) - self._InitAsset) - self._ProfitLocal
        self._ProfitLocal += current_profit

        if current_profit > 0:
            self._TakeProfitCount += 1
        self._TradeCount += 1

        LogProfit(_N(self._ProfitLocal, 4), "        本次收益:", _N(current_profit, 6))
        self.saveUserDatasLocal()

    # 是否还够资金下单
    def isEnoughAssetToOrder(self, order_size, ticker):
        is_enough = True
        account = _C(exchange.GetAccount)

        if self._IsUsdtStandard:
            if account.Balance < order_size * ticker.Last * self._OneSizeInCurrentCoin / self._MarginLevel:
                is_enough = False
        else:
            if account.Stocks < order_size * self._QuarterOneSizeValue / ticker.Last / self._MarginLevel:
                is_enough = False

        return is_enough

    # 按照K线周期运行策略核心
    def runInKLinePeriod(self, records):
        bar_time = records[-1].Time
        if self._RunInKLinePeriod and self._LastBarTime == bar_time:
            return False

        self._LastBarTime = bar_time
        return True

    # 趋势判断模块(可编辑具体指标)
    def trendJudgment(self, records):
        # 检查价格是否穿过均线
        def checkPriceCrossEma(price, ema_value):
            if self._PriceCrossEMAStatus == 0:
                if price <= ema_value:
                    self._PriceCrossEMAStatus = -1
                else:
                    self._PriceCrossEMAStatus = 1
            elif (self._PriceCrossEMAStatus == -1 and price >= ema_value) or (self._PriceCrossEMAStatus == 1 and price <= ema_value):
                self._PriceCrossEMAStatus = 2  # 完成穿过

        # EMA的多空判断
        ema_long = False
        ema_short = False
        price = records[-2].Close  # 已经收盘的K线的收盘价格
        ema = TA.EMA(records, self._EmaLength)
        ema_value = ema[-2]  # 收盘K线对应ema值
        ema_upper = ema_value * (1 + self._EmaCoefficient)
        ema_lower = ema_value * (1 - self._EmaCoefficient)

        checkPriceCrossEma(price, ema_value)
        if price > ema_upper:
            ema_long = True
        elif price < ema_lower:
            ema_short = True

        # 标准差判断
        in_trend = False
        if self._UseStddev:
            records_data = []
            for i in range(len(records)):
                records_data.append((records[i].High + records[i].Low) / 2 if self._UseRecordsMiddleValue else records[i].Close)

            records_data = np.array(records_data)  # 将 list 转换为 np.array
            stddev = np.std(records_data, ddof=1)  # 使用 numpy 计算标准差
            if stddev > self._StddevDeviations:
                in_trend = True
        else:
            in_trend = True

        # 趋势判断
        long = in_trend and ema_long 
        short = in_trend and ema_short

        if long:
            Log("当前趋势为:多", self._EnableMessageSend and "@" or "#00FF7F")
        elif short:
            Log("当前趋势为:空", self._EnableMessageSend and "@" or "#FF0000")
        else:
            Log("当前趋势为:震荡", self._EnableMessageSend and "@" or "#007FFF")

        return [long, short]

    # 止损
    def stopLoss(self, position, ticker):
        stop_loss_price = 0
        price = ticker.Last

        if len(position) == 1 and self._UseStopLoss:
            if position[0].Type == PD_LONG:
                stop_loss_price = position[0].Price * (1 - self._StopLossPercent / 100)
                if price < stop_loss_price:
                    self.coverLong(-1, position[0].Amount)
                    self.calculateProfit(ticker)
                    self._TrendWhenTakeProfitOrStopLoss = 1
                    self._HadStopLoss = True
                    Log("多单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
            elif position[0].Type == PD_SHORT:
                stop_loss_price = position[0].Price * (1 + self._StopLossPercent / 100)
                if price > stop_loss_price:
                    self.coverShort(-1, position[0].Amount)
                    self.calculateProfit(ticker)
                    self._TrendWhenTakeProfitOrStopLoss = -1
                    self._HadStopLoss = True
                    Log("空单止损。止损价格:", _N(stop_loss_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")

    # 止盈
    def takeProfit(self, position, ticker):
        take_profit_price = 0
        price = ticker.Last

        if len(position) == 1 and self._UseTakeProfit:
            if position[0].Type == PD_LONG:
                take_profit_price = position[0].Price * (1 + self._TakeProfitPercent / 100)
                if price > take_profit_price:
                    self.coverLong(-1, position[0].Amount)
                    self.calculateProfit(ticker)
                    self._TrendWhenTakeProfitOrStopLoss = 1
                    self._HadTakeProfit = True
                    Log("多单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")
            elif position[0].Type == PD_SHORT:
                take_profit_price = position[0].Price * (1 - self._TakeProfitPercent / 100)
                if price < take_profit_price:
                    self.coverShort(-1, position[0].Amount)
                    self.calculateProfit(ticker)
                    self._TrendWhenTakeProfitOrStopLoss = -1
                    self._HadTakeProfit = True
                    Log("空单止盈。止盈价格:", _N(take_profit_price, 6), ", 持仓价格:", _N(position[0].Price), self._EnableMessageSend and "@" or "#FF1CAE")

    # 回调止盈
    def trackingTakeProfit(self, position, ticker):
        take_profit_price = 0
        trigger_price = 0
        price = ticker.Last

        if len(position) > 0 and self._UseTrackingTakeProfit:
            if position[0].Type == PD_LONG:
                # 多单持仓
                if self._TriggeredTakeProfit:
                    # 已达到触发价格,监控是否止盈
                    self._PeakPriceInPosition = price if price > self._PeakPriceInPosition else self._PeakPriceInPosition  # 更新价格高点
                    if self._UsePositionRetracement:
                        take_profit_price = self._PeakPriceInPosition - (self._PeakPriceInPosition - position[0].Price) * (self._CallBakcPercent / 100)  # 计算回调的止盈价格
                    else:
                        take_profit_price = self._PeakPriceInPosition * (1 - self._CallBakcPercent / 100)  # 计算回调的止盈价格
                    if price < take_profit_price:
                        self.coverLong(-1, position[0].Amount)  # 平多
                        self.calculateProfit(ticker)
                        self._TriggeredTakeProfit = False  # 复位触发标记
                        self._TrendWhenTakeProfitOrStopLoss = 1  # 记录止盈时候的趋势
                        self._HadTakeProfit = True  # 记录发生了止盈
                        Log("多单回调止盈:持仓中价格高点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
                            ", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
                else:
                    # 监控是否达到回调止盈的触发价格
                    trigger_price = position[0].Price * (1 + self._TakeProfitTriggerPercent / 100)
                    if price > trigger_price:
                        self._TriggeredTakeProfit = True  # 触发回调止盈
                        self._PeakPriceInPosition = price  # 记录价格高点
                        Log("多单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))
            elif position[0].Type == PD_SHORT:
                # 空单持仓
                if self._TriggeredTakeProfit:
                    # 已达到触发价格,监控是否止盈
                    self._PeakPriceInPosition = price if price < self._PeakPriceInPosition else self._PeakPriceInPosition  # 更新价格低点
                    if self._UsePositionRetracement:
                        take_profit_price = self._PeakPriceInPosition + (position[0].Price - self._PeakPriceInPosition) * (self._CallBakcPercent / 100)  # 计算回调的止盈价格
                    else:
                        take_profit_price = self._PeakPriceInPosition * (1 + self._CallBakcPercent / 100)  # 计算回调的止盈价格
                    if price > take_profit_price:
                        self.coverShort(-1, position[0].Amount)  # 平空
                        self.calculateProfit(ticker)
                        self._TriggeredTakeProfit = False  # 复位触发标记
                        self._TrendWhenTakeProfitOrStopLoss = -1  # 记录止盈时候的趋势
                        self._HadTakeProfit = True  # 记录发生了止盈
                        Log("空单回调止盈:持仓中价格低点:", _N(self._PeakPriceInPosition, 6), ", 止盈价格:", _N(take_profit_price, 6), ", 当前价格:", _N(price, 6),
                            ", 持仓价格:", _N(position[0].Price, 6), self._EnableMessageSend and "@" or "#FF1CAE")
                else:
                    # 监控是否达到回调止盈的触发价格
                    trigger_price = position[0].Price * (1 - self._TakeProfitTriggerPercent / 100)
                    if price < trigger_price:
                        self._TriggeredTakeProfit = True  # 触发回调止盈
                        self._PeakPriceInPosition = price  # 记录价格低点
                        Log("空单已达到回调止盈的触发价格:", _N(trigger_price, 6), ", 当前价格:", _N(price, 6), ", 持仓价格:", _N(position[0].Price, 6))

    # 下单
    def order(self, long, short, position, ticker):
        position_size = position[0].Amount if len(position) > 0 else 0
        position_type = position[0].Type if len(position) > 0 else None
        
        if long:
            # 趋势多
            if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == 1:
                # 发生了止盈止损,并且止盈止损时候趋势为多,不再做多
                return
            if position_size > 0 and position_type == PD_SHORT:
                self.coverShort(-1, position_size)
                self.calculateProfit(ticker)
            elif position_size > 0 and position_type == PD_LONG:
                # 多单持仓,不重复下单
                return
            else:
                # 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
                if self._PriceCrossEMAStatus != 2:
                    return
            if self.isEnoughAssetToOrder(self._OrderSize, ticker):
                self.openLong(-1, self._OrderSize)
                self._HadStopLoss = False
                self._HadTakeProfit = False
            else:
                raise Exception("下单金额数量不足!")
        elif short:
            # 趋势空
            
            if (self._HadStopLoss or self._HadTakeProfit) and self._TrendWhenTakeProfitOrStopLoss == -1:
                # 发生了止盈止损,并且止盈止损时候趋势为空,不再做空
                return
            
            if position_size > 0 and position_type == PD_LONG:
                self.coverLong(-1, position_size)
                self.calculateProfit(ticker)
            elif position_size > 0 and position_type == PD_SHORT:
                # 空单持仓,不重复下单
                return
            else:
                # 没有持仓,如果是首次运行或者策略重启,需要等待价格穿过一次EMA均线才下单
                if self._PriceCrossEMAStatus != 2:
                    return
            
            if self.isEnoughAssetToOrder(self._OrderSize, ticker):
                self.openShort(-1, self._OrderSize)
                self._HadStopLoss = False
                self._HadTakeProfit = False
            else:
                raise Exception("下单金额数量不足!")

    # 趋势策略
    def trendStrategy(self):
        ticker = _C(exchange.GetTicker)
        position = _C(exchange.GetPosition)
        account = _C(exchange.GetAccount)
        records = _C(exchange.GetRecords, self._KLinePeriod * 60)
        if len(position) > 1:
            Log(position)
            raise Exception("同时有多空持仓!")
        # 策略交互
        self.runCmd()
        # 状态栏信息打印
        self.printLogStatus(ticker, account, position)
        # 止损
        self.stopLoss(position, ticker)
        # 止盈
        self.takeProfit(position, ticker)
        # 回调止盈
        self.trackingTakeProfit(position, ticker)

        # 按照K线周期运行策略
        if not self.runInKLinePeriod(records):
            return
        # 趋势判断和下单
        long = False
        short = False
        [long, short] = self.trendJudgment(records)
        if not self._OnlyTrendJudgment:
            self.order(long, short, position, ticker)

    # 状态栏信息打印
    def printLogStatus(self, ticker, account, position):
        table_overview = {
            "type": "table",
            "title": "策略总览",
            "cols": ["开始时间", "已运行天数", "交易次数", "胜率", "预估月化%", "预估年化%"],
            "rows": []
        }
        table_account = {
            "type": "table",
            "title": "账户资金",
            "cols": ["当前资产", "初始资产", "可用余额", "冻结余额", "可下单张数", "收益", "收益%"],
            "rows": []
        }
        table_position = {
            "type": "table",
            "title": "持仓情况",
            "cols": ["交易币种", "杠杆倍数", "持仓均价", "方向", "数量", "保证金", "预估强平价格", "浮动盈亏", "浮动盈亏%"],
            "rows": []
        }
        i = 0

        # 策略总览
        the_running_days = self.getDaysFromTimeStamp(self._StrategyDatas["start_run_timestamp"], Unix())
        monthly_rate_of_profit = 0
        if the_running_days > 1:
            monthly_rate_of_profit = self._ProfitLocal / self._InitAsset / the_running_days * 30
        table_overview["rows"].append([_D(self._StrategyDatas["start_run_timestamp"]), the_running_days, self._TradeCount,
                                       0 if self._TradeCount == 0 else (str(_N(self._TakeProfitCount / self._TradeCount * 100, 2)) + "%"),
                                       str(_N(monthly_rate_of_profit * 100, 2)) + "%", str(_N(monthly_rate_of_profit * 12 * 100, 2)) + "%"])
        # 账户资金
        current_asset = self.getAccountAsset(position, account, ticker)
        max_order_size = self.getMaxOrderSize(self._MarginLevel, ticker, account)
        asset_profit = current_asset - self._InitAsset
        asset_profit_percent = asset_profit / self._InitAsset
        table_account["rows"].append([_N(current_asset, 4), _N(self._InitAsset, 4), _N(account.Balance if self._IsUsdtStandard else account.Stocks, 4),
                                      _N(account.FrozenBalance if self._IsUsdtStandard else account.FrozenStocks, 4), max_order_size, _N(asset_profit, 4),
                                      str(_N(asset_profit_percent * 100, 2)) + "%"])
        # 持仓情况
        position_direction = ""
        forced_cover_up_price = 0
        position_profit_percent = 0
        position_profit = 0
        position_margin = 0
        if len(position) == 0:
            table_position["rows"].append(["无持仓", "-", "-", "-", "-", "-", "-", "-", "-"])
        else:
            position_direction = "多单" if position[0].Type == PD_LONG else "空单"
            [position_profit, position_profit_percent] = self.getSinglePositionProfit(position, ticker)
            position_margin = self.getSinglePositionMargin(position, ticker)
            forced_cover_up_price = self.calculateForcedPrice(account, position, ticker)
            table_position["rows"].append([exchange.GetCurrency(), self._MarginLevel, _N(position[0].Price, 4), position_direction, position[0].Amount,
                                           _N(position_margin, 4), _N(forced_cover_up_price, 4), _N(position_profit, 4), str(_N((position_profit_percent * 100), 2)) + "%"])
        # 打印表格
        LogStatus('`' + json.dumps(table_overview) + '`\n'
                  + '`' + json.dumps(table_account) + '`\n'
                  + '`' + json.dumps(table_position) + '`\n')

# main
def main():
    exchange.IO('simulate', True)

    strategy = TrendStrategy()
    
    strategy.setContract()
    
    strategy.initDatas()
    
    while True:
        
        strategy.trendStrategy()
        Sleep(strategy._Interval)
相关推荐