Calling a pin-and-pick interface to enable bot push messages

Author: , Created: 2020-07-02 12:03:23, Updated: 2023-09-28 21:08:12

img

A summary

In real-world transactions, in order to know the inventor's quantitative robot trading status in a timely manner, sometimes we need to send the results of the robot's transactions to WeChat, e-mail, text messages, etc. But hundreds of messages a day make all sorts of information insensitive to this information, which leads to important information not being collected in a timely manner.

Two, the robot with the nails.

The robot is a high-end extension that can be used as long as there is a tag account. It can aggregate third-party information into the tag, enabling automatic synchronization of information. It supports custom access to the Webhook protocol, through the inventor's quantified bot, which aggregates information such as alerts, alerts, and more into the tag. It supports text, link, markdown, three message formats, five message types. Here is the official link:https://ding-doc.dingtalk.com/doc#/serverapi2/ye8tup

Three, create a robot

Step 1: Create a group of nails imgEach time a custom robot is created, a unique Hook address is generated, which we call the WebHook address, and the Hook group receives the message by pushing a message to the WebHook address. We use the PC-side example of the Hook group, first clicking the button + moniker in the upper left to start the group chat, if you just want to receive the message yourself, you can simply pull two people out and fill in the group name:

Step 2: Adding a robot to the group.Click the header, select Bot Management, and then select Custom, click Add. Custom bot name: FMZ, add to newly created group of pins. The bot supports three security settings:img

  • Custom keyword: Only messages containing this keyword are synchronized.
  • Add tags: This is equivalent to setting a password.
  • IP address: IP address segment that is fixed to third-party information.img imgIf you are only using an alert or alarm, you can select a custom keyword. Here we define the keyword as "hook:hook", which means that when the inventor quantifies the information sent by the robot that contains hook:hook, the message is pushed to the pinhole. Then click on "agree agreement completed". Lastly, copy the Webhook address back up.

Four, the code implementation

Once we have access to the Webhook address, we can initiate an HTTP POST request to this address in the inventor quantification policy, which can send information to this pinhole. Note that the character set encoding must be set to UTF-8 when the POST request is initiated.

import requests
import json
from datetime import datetime, timedelta, timezone


# 向钉钉群输出信息
def msg(text):
    token ="0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1"
    headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
    api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
    json_text = {
        "msgtype": "text",  # 信息格式
        "text": {
            "content": text
        }
    }
    # 发送并打印信息
    Log(requests.post(api_url, json.dumps(json_text), headers=headers).content)

    
# 测试函数
def onTick():
    arr = ['BTC', 'ETH', 'XRP', 'BCH', 'LTC']  # 主流数字货币
    # 获取东八区时间
    bj_dt = str(datetime.now().astimezone(timezone(timedelta(hours=8))))
    bj_dt = bj_dt.split('.')[0]  # 处理时间
    text = f'{bj_dt}\n'  # 定义信息内容
    for i in arr:  # 循环主流数字货币数组
        exchange.IO("currency", f"{i}_USDT")  # 切换交易对
        ticker = exchange.GetTicker().Last  # 获取最新价格
        if i == 'LTC':
            full = ' :'
        else:
            full = ':'
        text = text + f"{i}/USDT{full}${ticker}\n"  # 处理信息内容
    msg(text)  # 调用msg函数,输出信息
    

# 策略入口
def main():
    while True:  # 进入无线循环
        onTick()  # 执行onTick函数
        Sleep(1000 * 60)  # 休眠一分钟

By default, a bot can synchronize messages by setting a mobile phone number to @multiple group members. A @message alert will be sent when the @group member receives the message, and the alert will still be notified even if the session is set to No Disruption.

# 向钉钉群输出信息
def msg(text):
    token = "0303627a118e739e628bcde104e19cf5463f61a4a127e4f2376e6a8aa1156ef1"
    headers = {'Content-Type': 'application/json;charset=utf-8'}  # 请求头
    api_url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
    json_text = {
        "msgtype": "text",  # 信息格式
        "text": {
            "content": text
        },
        "at": {
            "atMobiles": [
                "16666666666",  # 被@的手机号码
                "18888888888"  # 被@的手机号码
            ],
            "isAtAll": False  # 不@所有人
        }
    }
    # 发送并打印信息
    Log(requests.post(api_url, json.dumps(json_text), headers=headers).content)

5 Testing the robot

In the code above, we wrote a case where we get the price of the mainstream digital currency every minute and push that information to the cluster:img img


Related

More

AllyI've been trying to find a way to get a webhook for the company's WeChat.

The Leaping WavePlease, now that WeChat pushing is restricted, is it still possible to push the pins?