4
tập trung vào
13
Người theo dõi

Tạo một robot nhắc nhở giá đơn giản

Được tạo ra trong: 2022-03-27 15:44:13, cập nhật trên: 2022-03-27 15:59:46
comments   5
hits   1330

Trước đây, tôi đã làm phiên bản cho người khác và thấy rằng nhiều người bạn phải thực hiện chiến lược giám sát tình hình, có thể báo động ngay lập tức khi có tình huống đặc biệt, nhưng không hài lòng với việc luôn mở ổ cứng; vì vậy hôm nay tôi sẽ chia sẻ một bản demo nhắc nhở giá rẻ cho mọi người tham khảo. ps: Ngôn ngữ được sử dụng là Python, thông qua giao diện thông báo, cấu hình máy chủ không được giới thiệu ở đây


A. Chuẩn bị

  1. Bảng điều khiển Ở đây sử dụng bảng Baotou, mục đích là để tải tập tin lên máy chủ, tất nhiên cũng có thể sử dụng các cách khác, nhớ đường đi là ((vì máy chủ của tác giả là hệ thống Ubuntu, vì vậy tất cả các lệnh sau đây được chuẩn bị cho điều này, các lệnh hệ thống khác có thể tự đọc)
  • tải về
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh

Sau khi hoàn thành, nhận được địa chỉ và đăng nhập ((Nếu địa chỉ không thể mở được, bạn cần mở cổng 8888)

  • Nhận quyền Khởi động an toàn - shh quản lý an toàn - Khởi động khóa shh đăng nhập Tạo một robot nhắc nhở giá đơn giản Tạo một robot nhắc nhở giá đơn giản

  • Tải tập tin Tạo một robot nhắc nhở giá đơn giản

  1. Screen screen là một chương trình quản lý đa dạng của Windows dưới Linux, nhằm mục đích cho phép chương trình vẫn có thể chạy sau khi kết nối với thiết bị đầu cuối shh.
  • tải về
sudo apt-get install screen
  • Lệnh thông thường Chạy màn hình, mở đường dẫn chạy tệp[[name] có thể tự mình thiết lập.
screen -S [name]

Khóa tắt thoát màn hình

ctrl+a+d

Xem màn hình đang chạy trong nền

screen -ls

Quá trình kết thúc ((pid có thể xem qua quá trình)

kill -9 [pid号]

Làm sạch thông tin quá trình đã chết trên màn hình

screen -wipe
  1. Cài đặt đinh Trong bài viết của Hukybo, không còn đề cập đến nguyên tắc thực hiện, chỉ hiển thị mã gói đơn giản, chỉ để tham khảo https://www.fmz.com/digest-topic/5840
# 钉钉输出接口
class DING:
    # 向钉钉群输出信息
    def msg(self,text):
        token ="***"
        headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
        api_url = f"***={token}"
        json_text = {
            "msgtype": "text",  # 信息格式
            "text": {
                "content": text
            }
        }
        # 发送并打印信息
        requests.post(api_url, json.dumps(json_text), headers=headers).content
    
    #拼接输出信息
    def dd(self,logging):
        bj_dt = str(datetime.datetime.now())
        bj_dt = bj_dt.split('.')[0]  # 处理时间
        text = f'{bj_dt}\n'  # 定义信息内容
        text = text + logging # 处理信息内容
        log.msg(text)  # 调用msg函数,输出信息

2. Thực hiện mã

Các giao dịch được lấy thông qua Binance API, sử dụng giao diện fapi hợp đồng U-local, mã sau đây được gói gọn đơn giản trong Binance API, chỉ để tham khảo

import requests,json,time,hmac,hashlib,datetime

# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'

#币安接口
class bian:
    #拼接请求参数
    def param2string(self,param):
        s = ''
        for k in param.keys():
            s += k
            s += '='
            s += str(param[k])
            s += '&'
        return s[:-1]
    
    # 参数为get,post请求方式,路径,body
    def IO(self,method,request_path,body):
        header = {
        'X-MBX-APIKEY': apikey,
        }
        #选择请求方式
        if body != '':
            #签名
            body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
            if method == 'GET':
                body = self.param2string(body)
                tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
                response = requests.get(url=tell, headers=header).json()
                return response
            elif method == 'POST':
                response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
                return response
        else:
            response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
            return response

Vì chiến lược chỉ sử dụng để lấy giao diện giá, nên chỉ có một bản trình diễn đơn giản ở đây, các giao diện khác cũng vậy

#封装获取价格接口
def price(self,Name):
    body = {"symbol":str(Name)}
    info = self.IO("GET","/fapi/v1/ticker/price",body)
    for i in info:
        if i == "code":
            #设计一个接口错误容错功能
            time.sleep(0.5)
            letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
            log.dd(str(letgo))
            exchange.price(Name)
    return info["price"]

Dưới đây là cách thực hiện mã điều khiển.

# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]

#行情监控逻辑
def pricewarm():
    #轮询获取当前价格
    for i in range(len(PriceTIME)):
        info = exchange.price(str(PriceTIME[i]))
        priceindex = PriceTIME[i].find(";") #提取价格区间
        #价格上限
        priceup = PriceTIME[i][:priceindex]
        #价格下限
        pricedown = PriceTIME[i][priceindex+1:]
        if float(info) >= float(priceup): #钉钉接口输出
            letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
            log.dd(letgo)
        elif float(info) <= float(pricedown):
            letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
            log.dd(letgo)
        time.sleep(0.2)

# 主函数
def main():
    global exchange,log
    log = DING
    exchange = bian
    while True:
        try:
            pricewarm()
            time.sleep(1)
        except:
            time.sleep(1)

if __name__ == "__main__":
    main()

3. Hoạt động

Sau khi code đã sẵn sàng, hãy nhớ đường dẫn và mở màn hình chạy đầu cuối

screen -S [名称]
cd [路径]
python3 [文件名]

Bạn có thể rút ra sau khi xác nhận.

Địa chỉ chính sách:Robot nhắc nhở giá cả dễ dàng