4
konzentrieren Sie sich auf
13
Anhänger

Erstellen Sie einen einfachen Preiserinnerungsroboter

Erstellt in: 2022-03-27 15:44:13, aktualisiert am: 2022-03-27 15:59:46
comments   5
hits   1330

Ich habe schon früher für andere Leute geschrieben und festgestellt, dass viele meiner Freunde eine Überwachungsstrategie anwenden müssen, um bei besonderen Situationen sofort Alarm zu schalten, aber ich war nicht sehr zufrieden mit der ständigen Betriebszeit der Festplatte; deshalb möchte ich heute eine einfache Preis-Erinnerungs-Demo für alle mit euch teilen. ps: Die Sprache ist Python, die Alarm durch die Schrauben-Schrauben-Schnittstelle, die Konfiguration des Servers wird hier nicht vorgestellt


1. Vorbereitung

  1. Steuerung Hier wird die Tor-Panel verwendet, um die Dateien auf den Server hochzuladen. Natürlich kann man auch andere Methoden verwenden, um den Pfad zu merken.
  • herunterladen
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Wenn Sie fertig sind, erhalten Sie die URL und melden sich an ((Wenn die URL nicht geöffnet werden kann, müssen Sie den Port 8888 öffnen))

  • Berechtigung Öffnen Sie die Sicherheits-shh-Sicherheitsverwaltung - Öffnen Sie die Shh-Schlüssel-Anmeldung Erstellen Sie einen einfachen Preiserinnerungsroboter Erstellen Sie einen einfachen Preiserinnerungsroboter

  • Dateien hochladen Erstellen Sie einen einfachen Preiserinnerungsroboter

  1. Screen screen ist ein Windows-Multiplikationsmanagementprogramm unter Linux, dessen Ziel es ist, die Programme nach einer SHH-Verbindung mit dem Endgerät laufen zu lassen.
  • herunterladen
sudo apt-get install screen
  • Allgemeine Befehle Der Startbildschirm wird nach dem Öffnen der Pfaddatei ausgeführt[[name] kann selbstständig eingestellt werden.]
screen -S [name]

Schaltfläche aus dem Bildschirm

ctrl+a+d

Anzeigen des Bildschirms im Hintergrund

screen -ls

Ende des Prozesses (die PID-Nummer kann durch den Prozess eingesehen werden)

kill -9 [pid号]

Löschen Sie den Bildschirm von toten Prozessen

screen -wipe
  1. Schrauben-Einstellung Hier ist ein Hinweis auf den Artikel von Hukybo Daiki, der die Implementierung des Prinzips nicht mehr beschreibt, sondern nur den einfachen Packaging-Code zeigt, nur als Referenz. https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

2. Code-Implementierung

Die Transaktionen werden über die Binance API erfasst, wobei die U-Standard-Kontrakte über die Fapi-Schnittstelle verwendet werden. Der folgende Code ist nur als Referenz für die Binance API enthalten.

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Da die Strategie nur die Preis-Schnittstelle verwendet, wird hier nur eine einfache Vorführung durchgeführt.

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

Im Folgenden sehen Sie die Implementierung des Code für die Überwachung der Geschäftslage.

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. Betrieb

Wenn der Code fertig ist, merken Sie den Pfad und öffnen Sie den Terminallaufbildschirm

screen -S [名称]
cd [路径]
python3 [文件名]

Das Programm kann abgeladen werden, sobald die Bestätigung durchgeführt wurde.

Das ist die Adresse der Politik:Ein einfacher Preis-Reminder-Roboter