4
Suivre
13
Abonnés

Créer un robot simple de rappel de prix

Créé le: 2022-03-27 15:44:13, Mis à jour le: 2022-03-27 15:59:46
comments   5
hits   1330

J’ai déjà fait des copies pour d’autres personnes et j’ai trouvé que beaucoup de mes amis avaient des stratégies de surveillance et de signalement instantané en cas de situation exceptionnelle, mais je n’étais pas très satisfait du fait que le disque dur était toujours en marche; c’est pourquoi je partage aujourd’hui une simple démo de rappel de prix pour votre information. ps: le langage utilisé est python, et les alertes sont envoyées via une interface de crochet, la configuration du serveur n’est pas présentée ici


A. La préparation

  1. Panneau de commande Ici, le panneau de la tour est utilisé pour télécharger des fichiers vers le serveur, mais d’autres méthodes peuvent également être utilisées, en gardant à l’esprit le chemin (parce que le serveur de l’auteur est un système Ubuntu, toutes les commandes suivantes sont basées sur ce dernier, les autres commandes du système peuvent être consultées par eux-mêmes).
  • télécharger
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Une fois terminé, vous obtenez l’adresse et vous vous connectez (si l’adresse ne s’ouvre pas, vous devez ouvrir le port 8888)

  • Obtenir les autorisations Activer la sécurisation - SHH Sécurisation - Activer la connexion par clé SHH Créer un robot simple de rappel de prix Créer un robot simple de rappel de prix

  • Téléchargement de fichiers Créer un robot simple de rappel de prix

  1. Screen screen est un programme de gestion de réutilisabilité de Windows sous Linux qui permet d’exécuter des programmes après une connexion à un terminal shh.
  • télécharger
sudo apt-get install screen
  • Les commandes courantes L’écran d’exécution est ouvert et le chemin d’exécution est affiché.[[Nom] est une balise qui peut être configurée par vous-même.]
screen -S [name]

Sortie de la touche d’accès

ctrl+a+d

Voir le processus de l’écran en arrière-plan

screen -ls

Terminer le processus (le numéro de pid est affiché par le processus)

kill -9 [pid号]

Nettoyer l’écran des informations de processus morts

screen -wipe
  1. Réglages des clous Il n’y a plus d’explications sur la mise en œuvre du principe, mais seulement un code d’emballage simple, à titre de référence. https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

2. mise en œuvre du code

La transaction a été réalisée via le protocole API, et utilise l’interface fapi des contrats de type U. Le code suivant est simplement inclus dans le protocole API, pour référence seulement.

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Comme la stratégie n’utilise que l’interface pour obtenir le prix, il n’y a qu’une simple démonstration ici, les autres interfaces sont les mêmes

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

Voici la mise en œuvre du code de surveillance

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. Le fonctionnement

Lorsque le code est prêt, souvenez-vous du chemin et ouvrez l’écran d’exécution du terminal

screen -S [名称]
cd [路径]
python3 [文件名]

Vous pouvez vous désinscrire après l’exécution de la procédure de confirmation.

Le site est hébergé à:Un robot de remémoration de prix simple