Sebelum ini, saya telah menulis untuk orang lain dan mendapati ramai kawan-kawan saya mempunyai strategi untuk memantau keadaan dan memberi amaran segera apabila terdapat keadaan yang tidak biasa, tetapi saya tidak begitu berpuas hati dengan pemacu tetap; jadi hari ini saya ingin berkongsi demo peringatan harga yang mudah untuk rujukan anda. ps: bahasa yang digunakan adalah python, dan pelayan tidak diarahkan untuk membuat konfigurasi di sini
wget -O install.sh http://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh
Setelah selesai, anda akan mendapat alamat dan log masuk (jika alamat tidak dapat dibuka, anda perlu membuka port 8888)
Mendapatkan keizinan
Buka keselamatan - shh pengurusan keselamatan - buka shh kunci log masuk

Muat naik fail

sudo apt-get install screen
screen -S [name]
Keluar dari skrin
ctrl+a+d
Lihat skrin berjalan di latar belakang
screen -ls
Menamatkan proses ((pid boleh dilihat melalui proses)
kill -9 [pid号]
Bersihkan maklumat proses yang telah mati di skrin
screen -wipe
# 钉钉输出接口
class DING:
# 向钉钉群输出信息
def msg(self,text):
token ="***"
headers = {'Content-Type': 'application/json;charset=utf-8'} # 请求头
api_url = f"***={token}"
json_text = {
"msgtype": "text", # 信息格式
"text": {
"content": text
}
}
# 发送并打印信息
requests.post(api_url, json.dumps(json_text), headers=headers).content
#拼接输出信息
def dd(self,logging):
bj_dt = str(datetime.datetime.now())
bj_dt = bj_dt.split('.')[0] # 处理时间
text = f'{bj_dt}\n' # 定义信息内容
text = text + logging # 处理信息内容
log.msg(text) # 调用msg函数,输出信息
Proses ini diambil melalui Bitmain api, menggunakan antara muka Fapi kontrak U-set, kod berikut mudah dibungkus Bitmain api, hanya untuk rujukan
import requests,json,time,hmac,hashlib,datetime
# APIKEY填写位置
apikey = '***'
Secret_KEY = '***'
#币安接口
class bian:
#拼接请求参数
def param2string(self,param):
s = ''
for k in param.keys():
s += k
s += '='
s += str(param[k])
s += '&'
return s[:-1]
# 参数为get,post请求方式,路径,body
def IO(self,method,request_path,body):
header = {
'X-MBX-APIKEY': apikey,
}
#选择请求方式
if body != '':
#签名
body['signature'] = hmac.new(Secret_KEY.encode('utf-8'), self.param2string(body).encode('utf-8'), hashlib.sha256).hexdigest()
if method == 'GET':
body = self.param2string(body)
tell = 'https://fapi.binance.com{0}?{1}'.format(request_path,body)
response = requests.get(url=tell, headers=header).json()
return response
elif method == 'POST':
response = requests.post(url='https://fapi.binance.com'+str(request_path), headers=header, data=body).json()
return response
else:
response = requests.get(url='https://fapi.binance.com'+str(request_path), headers=header).json()
return response
Oleh kerana strategi hanya digunakan untuk mendapatkan harga antara muka, hanya demo ringkas yang dilakukan di sini, dan yang lain adalah sama
#封装获取价格接口
def price(self,Name):
body = {"symbol":str(Name)}
info = self.IO("GET","/fapi/v1/ticker/price",body)
for i in info:
if i == "code":
#设计一个接口错误容错功能
time.sleep(0.5)
letgo = '调用price函数接口返回错误,再次尝试 返回错误代码:{0}'.format(str(info))
log.dd(str(letgo))
exchange.price(Name)
return info["price"]
Berikut adalah penerapan kod pemantauan.
# 监控币种&&监控价格一一对应
ccy = ["BTCUSDT","ETHUSDT","LTCUSDT"]
PriceTIME = ["100000;28000","500000000;1200","500;100"]
#行情监控逻辑
def pricewarm():
#轮询获取当前价格
for i in range(len(PriceTIME)):
info = exchange.price(str(PriceTIME[i]))
priceindex = PriceTIME[i].find(";") #提取价格区间
#价格上限
priceup = PriceTIME[i][:priceindex]
#价格下限
pricedown = PriceTIME[i][priceindex+1:]
if float(info) >= float(priceup): #钉钉接口输出
letgo = f'当前价格{info}USDT大于所设定上限{priceup}USDT'
log.dd(letgo)
elif float(info) <= float(pricedown):
letgo = f'当前价格{info}USDT小于等于设定下限{pricedown}USDT'
log.dd(letgo)
time.sleep(0.2)
# 主函数
def main():
global exchange,log
log = DING
exchange = bian
while True:
try:
pricewarm()
time.sleep(1)
except:
time.sleep(1)
if __name__ == "__main__":
main()
Apabila kod siap siap ingat laluan, buka skrin terminal berjalan
screen -S [名称]
cd [路径]
python3 [文件名]
Anda boleh keluar selepas proses pengesahan dijalankan.
Alamat:Robot Amaran Harga Mudah