4
fokus pada
13
Pengikut

Buat robot peringatan harga yang mudah

Dicipta dalam: 2022-03-27 15:44:13, dikemas kini pada: 2022-03-27 15:59:46
comments   5
hits   1330

Sebelum ini, saya telah menulis untuk orang lain dan mendapati ramai kawan-kawan saya mempunyai strategi untuk memantau keadaan dan memberi amaran segera apabila terdapat keadaan yang tidak biasa, tetapi saya tidak begitu berpuas hati dengan pemacu tetap; jadi hari ini saya ingin berkongsi demo peringatan harga yang mudah untuk rujukan anda. ps: bahasa yang digunakan adalah python, dan pelayan tidak diarahkan untuk membuat konfigurasi di sini


I. Persediaan

  1. Panel Kawalan Di sini digunakan panel menara, bertujuan untuk memuat naik fail ke pelayan, tentu saja anda juga boleh menggunakan cara lain, ingat jalan boleh ((kerana pelayan pengarang adalah sistem Ubuntu, jadi semua arahan berikut adalah untuk ini, perintah sistem lain boleh dibaca sendiri)
  • muat turun
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Setelah selesai, anda akan mendapat alamat dan log masuk (jika alamat tidak dapat dibuka, anda perlu membuka port 8888)

  • Mendapatkan keizinan Buka keselamatan - shh pengurusan keselamatan - buka shh kunci log masuk Buat robot peringatan harga yang mudah Buat robot peringatan harga yang mudah

  • Muat naik fail Buat robot peringatan harga yang mudah

  1. Screen screen adalah program pengurusan penggunaan semula tetingkap di bawah Linux, yang bertujuan untuk menjalankan program selepas sambungan terminal shh.
  • muat turun
sudo apt-get install screen
  • Perintah biasa Ia boleh dibuka selepas anda membuka file jalan jalan (Run screen, open path Run file)[[name] label boleh disesuaikan sendiri)
screen -S [name]

Keluar dari skrin

ctrl+a+d

Lihat skrin berjalan di latar belakang

screen -ls

Menamatkan proses ((pid boleh dilihat melalui proses)

kill -9 [pid号]

Bersihkan maklumat proses yang telah mati di skrin

screen -wipe
  1. Pengaturan Pin Di sini merujuk kepada artikel oleh Hukybo, yang tidak lagi menerangkan prinsip pelaksanaan, hanya menunjukkan kod yang mudah dibungkus, untuk rujukan sahaja https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

2. Kode pelaksanaan

Proses ini diambil melalui Bitmain api, menggunakan antara muka Fapi kontrak U-set, kod berikut mudah dibungkus Bitmain api, hanya untuk rujukan

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Oleh kerana strategi hanya digunakan untuk mendapatkan harga antara muka, hanya demo ringkas yang dilakukan di sini, dan yang lain adalah sama

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

Berikut adalah penerapan kod pemantauan.

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. Operasi

Apabila kod siap siap ingat laluan, buka skrin terminal berjalan

screen -S [名称]
cd [路径]
python3 [文件名]

Anda boleh keluar selepas proses pengesahan dijalankan.

Alamat:Robot Amaran Harga Mudah