4
Seguir
13
Seguidores

Crea un robot recordatorio de precios sencillo

Creado el: 2022-03-27 15:44:13, Actualizado el: 2022-03-27 15:59:46
comments   5
hits   1330

He hecho copias para otras personas y he descubierto que muchos de mis amigos tienen que seguir una estrategia de monitoreo de situaciones, que puede alertar de inmediato en caso de situaciones especiales, pero no estoy muy satisfecho con el disco duro siempre abierto; así que hoy comparto una demo de recordatorio de precios fácil para que todos la utilicen. ps: el lenguaje que se usa es python, alerta a través de la interfaz de pinceladas, la configuración del servidor no se presenta aquí


Preparación

  1. Panel de control Aquí se utiliza el panel de la torre de Babel, con el fin de subir los archivos al servidor, por supuesto, también se puede utilizar de otras maneras, recuerde que la ruta está disponible ((Debido a que el servidor de autor es el sistema Ubuntu, todos los siguientes comandos se basan en esto, los otros comandos del sistema pueden consultarse por sí mismos))
  • descargar
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Al finalizar, obtenga la dirección y inicie sesión (si la dirección no se puede abrir, debe abrir el puerto 8888)

  • Acceso a los permisos Activa la seguridad - shh Administración de seguridad - Activa el acceso con la clave shh Crea un robot recordatorio de precios sencillo Crea un robot recordatorio de precios sencillo

  • Carga de archivos Crea un robot recordatorio de precios sencillo

  1. Screen screen es un programa de administración de reutilización de Windows para Linux, cuyo objetivo es que el programa siga funcionando después de la conexión de la terminal shh.
  • descargar
sudo apt-get install screen
  • Orden común La pantalla de ejecución se puede abrir después de abrir el archivo de ejecución de la ruta.[La etiqueta [name] se puede configurar por sí misma.
screen -S [name]

Salga de la pantalla

ctrl+a+d

Ver el proceso de la pantalla en el fondo

screen -ls

Terminación del proceso (el número de pid se puede ver a través del proceso)

kill -9 [pid号]

Limpiar la pantalla de información de un proceso que ha muerto

screen -wipe
  1. Ajuste de los clavos Aquí se hace referencia al artículo de Hukybo Daiji, el principio de implementación ya no se describe, solo se muestra el código de embalaje simple, solo para referencia. https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

Implementación del código

La transacción se obtiene a través de Binance API, y se utiliza la interfaz de contrato de U local fapi, el siguiente código se encierra fácilmente en Binance API, solo para referencia

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Como las estrategias solo se usan para obtener interfaces de precios, solo se hace una simple demostración aquí, lo mismo con otras interfaces

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

A continuación se muestra la implementación del código de monitoreo de la situación.

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. El funcionamiento

Cuando el código esté listo, recuerde el camino y abra la pantalla de ejecución de terminal

screen -S [名称]
cd [路径]
python3 [文件名]

El proceso de confirmación se ejecuta y se puede cancelar.

La dirección de la política es:Un robot de advertencia de precios sencillo