4
focar em
13
Seguidores

Crie um robô simples de lembrete de preços

Criado em: 2022-03-27 15:44:13, atualizado em: 2022-03-27 15:59:46
comments   5
hits   1330

Eu já tinha feito copias para outras pessoas e descobri que muitos dos meus amigos tinham que ter uma estratégia de monitoramento de situações, e que eles poderiam alertar imediatamente quando houvesse uma situação especial, mas não estava muito satisfeito com o disco rígido que estava sempre aberto; então, hoje eu compartilho uma demo com um simples aviso de preço para todos vocês. ps: a linguagem usada é o python, e o alarme é feito através de uma interface de pinceladas, a configuração do servidor não é apresentada aqui


I. Preparação

  1. Painel de controlo Aqui é usado o painel da torre, com o objetivo de fazer o upload de arquivos para o servidor, é claro que também pode ser usado de outras maneiras, lembre-se do caminho disponível ((Como o servidor do autor é o sistema Ubuntu, então todos os comandos a seguir são baseados nisso, os outros comandos do sistema podem ser consultados sozinhos))
  • download
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Depois de concluído, obtenha o endereço e faça login ((Se o endereço não puder ser aberto, é necessário abrir a porta 8888)

  • Obter permissões Ativar segurança - shh gerenciamento de segurança - Ativar login com chave shh Crie um robô simples de lembrete de preços Crie um robô simples de lembrete de preços

  • Carregamento de ficheiros Crie um robô simples de lembrete de preços

  1. Screen screen é um programa de gerenciamento de reutilização do Windows no Linux, que permite que o programa continue a ser executado após a conexão do terminal shh.
  • download
sudo apt-get install screen
  • Ordens comuns O caminho para a execução de um arquivo é aberto após a exibição do ecrã.[[nome] pode ser configurado automaticamente.
screen -S [name]

Sair da tela

ctrl+a+d

Ver o que está acontecendo no fundo do ecrã

screen -ls

Terminação do processo (podendo ser visualizado através do pid)

kill -9 [pid号]

Limpe a tela com informações de processos mortos

screen -wipe
  1. Configuração de pinos Referência ao artigo de Hukybo Daiji, onde a implementação de princípios não é mais mencionada, apenas o código de encapsulamento simples é mostrado, apenas para referência https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

Implementação do código

A transação foi obtida através do Binance API, usando o U-base Contract fapi interface, o código a seguir é simplesmente encapsulado no Binance API, apenas para referência

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Uma vez que a estratégia é usada apenas para obter interfaces de preços, aqui é apenas uma simples demonstração.

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

Aqui está a implementação do código de monitorização do mercado.

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. Funcionamento

Quando o código estiver pronto, lembre-se do caminho e abra a tela de execução do terminal

screen -S [名称]
cd [路径]
python3 [文件名]

O processo de confirmação é executado e você pode sair.

Endereço da política:Um robô de alerta de preços simples