异步的问题,要疯掉了

Author: lzhqlj, Created: 2023-03-15 10:43:20, Updated:

import pandas as pd from binance.client import AsyncClient from datetime import datetime, timedelta import aiohttp import json def utc_to_local(utc_dt):#转换为北京时间 local_tz = datetime.timezone(datetime.timedelta(hours=8)) # 东八区时差 local_dt = utc_dt.replace(tzinfo=datetime.timezone.utc).astimezone(local_tz) return local_dt

异步初始化 Binance 客户端

async def init_client(): client = await AsyncClient.create(api_key=api_key, api_secret=api_secret)

return client

异步获取K线数据

async def get_klines(client, symbol, start_time, end_time, interval): klines = await client.futures_klines(symbol=symbol, interval=interval, startTime=start_time.timestamp()*1000, endTime=end_time.timestamp()*1000) df = pd.DataFrame(klines, columns=[‘timestamp’, ‘open’, ‘high’, ‘low’, ‘close’, ‘volume’, ‘close_time’, ‘quote_asset_volume’, ‘number_of_trades’, ‘taker_buy_base_asset_volume’, ‘taker_buy_quote_asset_volume’, ‘ignore’]) df[‘timestamp’] = pd.to_datetime(df[‘timestamp’], unit=‘ms’) df[‘close_time’] = pd.to_datetime(df[‘close_time’], unit=‘ms’) df.set_index(‘timestamp’, inplace=True) df.drop(columns=[‘close_time’, ‘ignore’], inplace=True) df = df.astype(‘float’) return df

异步关闭 Binance 客户端

async def close_client(client): await client.close_connection()

异步执行主函数

async def main(): client = await init_client() # 获取所有 USDT 永续合约的交易对 exchange_info = await client.futures_exchange_info() symbols = [symbol_info[‘symbol’] for symbol_info in exchange_info[‘symbols’] if symbol_info[‘contractType’] == ‘PERPETUAL’ and symbol_info[‘quoteAsset’] == ‘USDT’] # 创建空DataFrame df = pd.DataFrame(columns=[‘Symbol’, ‘Open’, ‘High’, ‘Low’, ‘Close’, ‘Change’, ‘Volume’]) df.set_index(‘Symbol’, inplace=True)

# 将所有交易对添加到 DataFrame 中
for symbol in symbols:
    df.loc[symbol] = [None] * len(df.columns)
# 遍历所有交易对
for symbol in symbols:
    # 设置起止时间
    start_time = datetime.utcnow() - timedelta(minutes=16)#开始时间为16分钟前
    end_time = datetime.utcnow()
    try:
   # 获取M15k线
        current_klines = await get_klines(client, symbol, end_time - timedelta(minutes=15), end_time, '15m')
    except Exception as e:
        Log(f"An error occurred: {e}")
        current_klines = []
    # 将数据存入 DataFrame
   # 更新对应的行
    df.loc[symbol, 'Open'] = current_klines['open'].iloc[-1]
    df.loc[symbol, 'High'] = current_klines['high'].iloc[-1]
    df.loc[symbol, 'Low'] = current_klines['low'].iloc[-1]
    df.loc[symbol, 'Close'] = current_klines['close'].iloc[-1]
    df.loc[symbol, 'Change'] = current_change
    df.loc[symbol, 'Volume'] = current_klines['volume'].iloc[-1]

# 关闭客户端
Log(df)
await close_client(client)

运行异步程序

if name == ‘main’: import asyncio asyncio.run(main())

目的是取得所有合约货币对M15的涨跌幅数据

Log(df)一次之后 就出错了;就是说更新一次所有symbol的价格等数据后出错:Traceback (most recent call last): File “<string>”, line 1246, in init_ctx File “<string>”, line 147, in <module> TypeError: Object of type coroutine is not JSON serializable sys:1: RuntimeWarning: coroutine ‘main’ was never awaited RuntimeWarning: Enable tracemalloc to get the object allocation traceback ChatGpt也没有搞定,求大佬帮忙


More

没有如果 可以通过多线程去收集,将每个线程收集的到数据放在一个 公共的对象里;然后判断 所有线程有没有结束

lzhqlj 关于聚合函数 梦总能提点一两个关键字吗

lzhqlj 关于聚合函数 梦总能提点一两个关键字吗

小草 没用过这个库,不过最好一步一步来调试。另外直接并发上百个交易对也不太好。用聚合行情接口自己记录下更简单

小草 /fapi/v1/ticker/price 币安文档有