python多币种策略框架(期货)


创建日期: 2024-10-16 15:33:37 最后修改: 2025-08-29 13:42:47
复制: 0 点击次数: 220
avatar of ianzeng123 ianzeng123
2
关注
319
关注者
策略源码
import time
import json 

SYMBOLS = 'LINK,ETH,TRB'
QUOTO = 'USDT'
INTERVAL = 5

# 全局变量,储存数据
# SYMBOLS代表要交易的币种,格式如"BTC,ETH,LTC"
# QUOTO为基础货币,永续合约常见的有USDT,USDC
# INTERVAL代表循环的间隔
Info = {
    'trade_symbols': SYMBOLS.split(','),  # 交易币种
    'base_coin': QUOTO,                   # 基础货币
    'ticker': {},                         # 行情数据
    'order': {},                          # 订单信息
    'account': {},                        # 账户信息
    'precision': {},                      # 精度信息
    'position': {},                       # 仓位信息
    'time': {},                           # 时间相关数据
    'count': {},                          # 计数器
    'interval': INTERVAL                  # 循环的间隔时间
}

# 初始化策略
def InitInfo():

    # 初始化账户信息,初始余额为0
    Info['account'] = {
        'init_balance': 0,  # 初始余额
        'wallet_balance': 0,  # 钱包余额
        'margin_balance': 0,  # 保证金余额
        'margin_used': 0,  # 已用保证金
        'margin_free': 0,  # 可用保证金
        'profit': 0,  # 总收益
        'profit_rate': 0,  # 收益率
        'unrealised_profit': 0,  # 未实现收益
    }

    # 初始化时间数据,控制更新的时间
    Info['time'] = {
        'update_ticker_time': 0,    # 更新行情的时间
        'update_pos_time': 0,       # 更新仓位的时间
        'update_profit_time': 0,    # 更新利润的时间
        'update_account_time': 0,   # 更新账户信息的时间
        'update_status_time': 0,    # 更新状态的时间
        'last_loop_time': 0,        # 上一次主循环的时间
        'loop_delay': 0,             # 循环延迟
        'start_time': time.time()
    }

    # 初始化每个交易币种的数据
    for symbol in Info['trade_symbols']:
        Info['ticker'][symbol] = {'last': 0, 'ask': 0, 'bid': 0}  # 行情
        Info['order'][symbol] = {                                 # 订单信息
            'buy': {'id': 0, 'price': 0, 'amount': 0},
            'sell': {'id': 0, 'price': 0, 'amount': 0}
        }
        Info['position'][symbol] = {                              # 仓位信息
            'amount': 0, 'hold_price': 0, 'unrealised_profit': 0, 'open_time': 0, 'value': 0
        }
        Info['precision'][symbol] = {}  # 精度信息初始化为空
        Info['position'][symbol] = {                              # 仓位信息
            'amount': 0, 'hold_price': 0, 'unrealised_profit': 0, 'open_time': 0, 'value': 0
        }

# 获取精度信息
def GetPrecision():
    # 获取交易所的市场信息
    for presym in Info['trade_symbols']:
        curcontract = presym + '_USDT.swap'
        exchange.GetTicker(curcontract)
    exchange_info = exchange.GetMarkets()
    
    # 遍历市场中的所有交易对
    for pair, data in exchange_info.items():
        symbol = pair.split('_')[0]  # 永续合约交易对的格式为 BTC_USDT.swap
        
        # 检查该交易对是否为我们要交易的币种,基础货币是否匹配,且是永续合约
        if symbol in Info['trade_symbols'] and pair.split('.')[0].endswith(Info['base_coin']) and pair.endswith('swap'):
            # 获取该交易对的精度信息
            Info['precision'][symbol] = {
                'tick_size': data['TickSize'],                  # 价格精度
                'amount_size': data['AmountSize'],              # 数量精度
                'price_precision': data['PricePrecision'],      # 价格小数位精度
                'amount_precision': data['AmountPrecision'],    # 数量小数位精度
                'min_qty': data['MinQty'],                      # 最小下单数量
                'max_qty': data['MaxQty'],                      # 最大下单数量
                'min_notional': data['MinNotional'],            # 最小交易额
                'ctVal': data['CtVal']                          # 合约价值,如1张代表0.01个币
            }
            
            # 检查合约价值的计价货币是否为symbol,避免币本位情况
            if data['CtValCcy'] != symbol:
                raise Exception("不支持币本位")


# 更新行情信息
def UpdateTicker():
    
    # 记录当前更新时间戳
    Info['time']['update_ticker_time'] = time.time() * 1000 # 使用time.time()获取当前时间的时间戳
    
    # 遍历获取到的行情数据
    for ticpre in Info['trade_symbols']:
        
        curcontract = ticpre + '_' + QUOTO + '.swap'
        data = exchange.GetTicker(curcontract)
        
        if not data:
            Log("获取行情失败", GetLastError())
            return
        # 提取交易币种,永续合约格式如 'BTC_USDT.swap',这里取币种名 'BTC'
        symbol = data['Symbol'].split('_')[0]
        
        # 过滤掉不是基础货币(Info['base_coin'])的交易对或不是永续合约的交易对
        if not data['Symbol'].split('.')[0].endswith(Info['base_coin']) or symbol not in Info['trade_symbols'] or not data['Symbol'].endswith('swap'):
            continue
        
        # 更新行情的卖出价、买入价和最后成交价
        Info['ticker'][symbol]['ask'] = float(data['Sell'])  # 卖出价
        Info['ticker'][symbol]['bid'] = float(data['Buy'])   # 买入价
        Info['ticker'][symbol]['last'] = float(data['Last']) # 最后成交价


# 更新账户信息
def UpdateAccount():
   
    # 如果上次更新时间距现在不到1分钟,直接返回
    if time.time() - Info['time']['update_account_time'] < 60:
        return
    
    # 记录账户信息更新时间
    Info['time']['update_account_time'] = time.time() * 1000
    
    # 获取账户信息
    account = exchange.GetAccount()
    
    # 如果账户信息获取失败,记录日志并返回
    if account is None:
        Log("更新账户失败")
        return
    
    # 计算账户信息
    Info['account']['margin_used'] = round(account['Equity'] - account['Balance'], 2)  # 使用的保证金
    Info['account']['margin_balance'] = round(account['Equity'], 2)  # 当前账户余额
    Info['account']['margin_free'] = round(account['Balance'], 2)  # 可用余额
    Info['account']['wallet_balance'] = round(account['Equity'] - account['UPnL'], 2)  # 钱包余额
    Info['account']['unrealised_profit'] = round(account['UPnL'], 2)  # 未实现盈亏
    

    # 初始化账户初始余额
    if not Info['account']['init_balance']:
        if _G("init_balance") and _G("init_balance") > 0:
            Info['account']['init_balance'] = round(_G("init_balance"), 2)
        else:
            Info['account']['init_balance'] = Info['account']['margin_balance']
            _G("init_balance", Info['account']['init_balance'])
    
    # 计算账户利润及利润率
    Info['account']['profit'] = round(Info['account']['margin_balance'] - Info['account']['init_balance'], 2)
    Info['account']['profit_rate'] = round((100 * Info['account']['profit']) / Info['account']['init_balance'], 2)

    # 计算仓位总价值和杠杆率
    Info['count']['total'] = round(Info['count']['long'] + Info['count']['short'], 2)
    Info['count']['leverage'] = round(Info['count']['total'] / Info['account']['margin_balance'], 2)


# 更新仓位信息
def UpdatePosition():
    
    # 获取永续合约的仓位信息
    pos = exchange.GetPositions(Info['base_coin'] + ".swap")
    
    # 记录仓位信息更新时间
    Info['time']['update_pos_time'] = time.time() * 1000

    # 初始化仓位信息
    position_info = {symbol: {'amount': 0, 'hold_price': 0, 'unrealised_profit': 0} for symbol in Info['trade_symbols']}

    # 遍历仓位信息,更新相应币种的仓位
    for data in pos:
        symbol = data['Symbol'].split("_")[0]
        
        # 过滤掉不符合条件的币种
        if not data['Symbol'].split(".")[0].endswith(Info['base_coin']) or symbol not in Info['trade_symbols']:
            continue
        
        # 如果仓位不为零,则需要单向持仓
        if position_info[symbol]['amount'] != 0:
            raise Exception("需要单向持仓")
        
        # 更新仓位信息
        position_info[symbol] = {
            'amount': data['Amount'] * Info['precision'][symbol]['ctVal'] if data['Type'] == 0 else -data['Amount'] * Info['precision'][symbol]['ctVal'],
            'hold_price': data['Price'],
            'unrealised_profit': data['Profit']
        }

    # 初始化仓位统计数据
    Info['count'] = {'long': 0, 'short': 0, 'total': 0, 'leverage': 0}

    # 遍历更新后的仓位信息
    for symbol, info in position_info.items():
        deal_volume = abs(info['amount'] - Info['position'][symbol]['amount'])
        direction = 1 if info['amount'] - Info['position'][symbol]['amount'] > 0 else -1
        
        # 如果仓位发生变化,记录成交信息
        if deal_volume:
            deal_price = Info['order'][symbol]['buy']['price'] if direction == 1 else Info['order'][symbol]['sell']['price']
            Log(
                symbol,
                "仓位更新:",
                round(Info['position'][symbol]['value'], 1),
                " -> ",
                round(info['amount'] * Info['ticker'][symbol]['last'], 1),
                ", 买" if direction == 1 else ", 卖",
                ", 成交价:",
                deal_price,
                ", 成本价:",
                round(Info['position'][symbol]['hold_price'], Info['precision'][symbol]['price_precision']),
            )

        # 更新仓位信息
        Info['position'][symbol]['amount'] = info['amount']
        Info['position'][symbol]['hold_price'] = info['hold_price']
        Info['position'][symbol]['value'] = round(Info['position'][symbol]['amount'] * Info['ticker'][symbol]['last'], 2)
        Info['position'][symbol]['unrealised_profit'] = info['unrealised_profit']

        # 统计多头和空头仓位价值
        if Info['position'][symbol]['amount'] > 0:
            Info['count']['long'] += abs(Info['position'][symbol]['value'])
        else:
            Info['count']['short'] += abs(Info['position'][symbol]['value'])

# 订单函数
def Order(symbol, direction, price, amount, msg):
    pair = f"{symbol}_{Info['base_coin']}.swap"  # 构造交易对名称
    ret = exchange.CreateOrder(pair, direction, price, amount, msg)  # 执行下单操作
    
    # 判断是否下单成功
    if ret:
        Info['order'][symbol][direction]['id'] = ret  # 记录订单ID
        Info['order'][symbol][direction]['price'] = price  # 记录下单价格
    else:
        Log(f"{symbol} {direction} {price} {amount} 下单异常")  # 输出异常信息

# 交易函数
def Trade(symbol, direction, price, amount, msg):
    
    # 根据最小价格变动调整价格
    price = round(price - (price % Info['precision'][symbol]['tick_size']), Info['precision'][symbol]['price_precision'])
    
    # 计算调整后的交易数量
    amount = amount / Info['precision'][symbol]['ctVal']  # 计算真实合约数量
    amount = round(amount - (amount % Info['precision'][symbol]['amount_size']), Info['precision'][symbol]['amount_precision'])

    # 限制最大交易数量
    if Info['precision'][symbol]['max_qty'] > 0:
        amount = min(amount, Info['precision'][symbol]['max_qty'])
    
    new_order = False
    
    # 如果新价格与之前的订单价格差异大于0.0001,则重新下单

    if Info['order'][symbol][direction]['price'] > 0 and abs(price - Info['order'][symbol][direction]['price']) / price > 0.0001:
        Log('已持订单,订单价格发生变化')
        new_order = True
    
    # 如果交易数量为0或者当前订单ID为0,则撤单
    if amount <= 0 or Info['order'][symbol][direction]['id'] == 0:
        Log('新订单产生')
        new_order = True
    
    # 如果需要新订单
    if new_order:
        # 如果有原有订单,撤销它
        if Info['order'][symbol][direction]['id'] != 0:
            exchange.CancelOrder(Info['order'][symbol][direction]['id'])
            Info['order'][symbol][direction]['id'] = 0
            Info['order'][symbol][direction]['price'] = 0
            Log('撤单成功:', symbol)
        
        
        # 如果更新仓位或ticker的延迟太高,则不下单
        if (time.time() * 1000 - Info['time']['update_pos_time'] > 2 * Info['interval'] * 1000 or 
            time.time() * 1000 - Info['time']['update_ticker_time'] > 2 * Info['interval'] * 1000):
            Log(time.time() * 1000, Info['time']['update_pos_time'], time.time() * 1000 - Info['time']['update_pos_time'])
            Log(time.time() * 1000, Info['time']['update_ticker_time'], time.time() * 1000 - Info['time']['update_ticker_time'])
            Log('延迟过高')
            return
        
        # 如果订单金额或数量过低,不执行下单操作
        if price * amount <= Info['precision'][symbol]['min_notional'] or amount < Info['precision'][symbol]['min_qty']:
            Log(f"{symbol} 下单量太低", price * amount)
            return
        
        # 执行下单操作
        Log('order下单:', symbol)
        Order(symbol, direction, price, amount, msg)


# 更新状态函数
def UpdateStatus():
    
    # 如果距离上次更新的时间小于4秒,则直接返回
    if time.time() * 1000 - Info['time']['update_status_time'] < 4000:
        return
    
    # 更新状态时间
    Info['time']['update_status_time'] = time.time() * 1000

    # 账户信息表格
    table1 = {
        "type": "table",
        "title": "账户信息",
        "cols": [
            "初始余额", "钱包余额", "保证金余额", "已用保证金", "可用保证金",
            "总收益", "收益率", "未实现收益", "总持仓", "已用杠杆", "循环延时"
        ],
        "rows": [
            [
                Info['account']['init_balance'],  # 初始余额
                Info['account']['wallet_balance'],  # 钱包余额
                Info['account']['margin_balance'],  # 保证金余额
                Info['account']['margin_used'],  # 已用保证金
                Info['account']['margin_free'],  # 可用保证金
                Info['account']['profit'],  # 总收益
                str(Info['account']['profit_rate']) + "%",  # 收益率
                round(Info['account']['unrealised_profit'], 2),  # 未实现收益
                round(Info['count']['total'], 2),  # 总持仓
                Info['count']['leverage'],  # 已用杠杆
                str(Info['time']['loop_delay']) + "ms",  # 循环延时
            ],
        ],
    }
    
    # 交易对信息表格
    table2 = {
        "type": "table",
        "title": "交易对信息",
        "cols": [
            "币种", "方向", "数量", "持仓价格", "持仓价值", 
            "现价", "挂单买价", "挂单卖价", "未实现盈亏"
        ],
        "rows": [],
    }

    # 遍历每个交易对,填充交易对信息
    for symbol in Info['trade_symbols']:
        table2['rows'].append([
            symbol,  # 币种
            "LONG" if Info['position'][symbol]['amount'] > 0 else "SHORT",  # 方向
            round(Info['position'][symbol]['amount'], Info['precision'][symbol]['amount_precision'] + 2),  # 数量
            round(Info['position'][symbol]['hold_price'], Info['precision'][symbol]['price_precision']),  # 持仓价格
            round(Info['position'][symbol]['value'], 2),  # 持仓价值
            round(Info['ticker'][symbol]['last'], Info['precision'][symbol]['price_precision']),  # 现价
            Info['order'][symbol]['buy']['price'],  # 挂单买价
            Info['order'][symbol]['sell']['price'],  # 挂单卖价
            round(Info['position'][symbol]['unrealised_profit'], 2),  # 未实现盈亏
        ])

    # 输出状态日志
    LogStatus(
        f"初始化时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(Info['time']['start_time']))}\n",
        f"`{json.dumps(table1)}`\n" + f"`{json.dumps(table2)}`\n",
        f"最后执行时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n"
    )

    # 每10秒钟更新一次账户信息
    if time.time() * 1000 - Info['time']['update_profit_time'] > 10 * 1000:
        UpdateAccount()  # 更新账户信息
        LogProfit(round(Info['account']['profit'], 3), '&')  # 输出收益日志
        Info['time']['update_profit_time'] = time.time() * 1000  # 更新收益时间


def MakeOrder():
    
    # 遍历所有交易对
    for symbol in Info['trade_symbols']:
        
        # 获取买价(挂单买价)
        buy_price = Info['ticker'][symbol]['bid']
        
        # 计算买入数量,根据买入金额除以买价
        buy_amount = 50 / buy_price

        # 获取卖价(挂单卖价)
        sell_price = Info['ticker'][symbol]['ask']
        
        # 计算买入数量,根据买入金额除以买价
        sell_amount = 50 / sell_price

        
        # 如果当前持仓的总价值小于2000,则进行买入操作
        if Info['position'][symbol]['value'] < 2000:
            Log('进入交易')
            Log('设定价格:', Info['ticker'][symbol]['bid'])
            Trade(symbol, "buy", buy_price, buy_amount, symbol)  # 执行买入操作
        
        #if Info['position'][symbol]['value'] < 3000:
        #    Trade(symbol, "sell", sell_price, sell_amount, symbol)  # 执行买入操作

def OnTick():
    try:
        # 更新市场行情信息
        UpdateTicker()
        # 更新持仓信息
        UpdatePosition()
        # 执行下单操作
        MakeOrder()
        # 更新状态信息
        UpdateStatus()
    except Exception as error:
        # 记录循环中发生的错误
        Log("循环出错: " + str(error))

def main():
    LogReset(0)  
    apiBase = "https://testnet.binancefuture.com" # 币安期货仿真交易所        
    exchange.SetBase(apiBase) # 设置仿真交易所基站
    # 初始化信息
    exchange.IO('dual', False) # 单向持仓
    InitInfo()
    GetPrecision()
    
    while True:  # 无限循环
        loop_start_time = time.time() * 1000  # 获取当前时间(毫秒)
        
        # 检查上次循环时间与设定的间隔时间是否已过
        if time.time() * 1000 - Info['time']['last_loop_time'] > Info['interval'] * 1000:
            OnTick()  # 调用 OnTick 函数
            
            # 更新最后一次循环时间
            Info['time']['last_loop_time'] = time.time() * 1000
            # 计算当前循环的延迟时间
            Info['time']['loop_delay'] = time.time() * 1000 - loop_start_time
        
        # 暂停5毫秒,避免过度消耗CPU资源
        Sleep(5000)