python 实现飞书机器人-随机回复版

1. 完整代码

import base64
import hashlib
import hmac
import json
import random
import time
from datetime import datetime, timedelta

import requests
from flask import Flask, jsonify, request

tenant_access_token = ""
expire_time = datetime.now() - timedelta(seconds=6)
app = Flask(__name__)

deal_msg_time = {}

key_words_to_reply = ["PP","pp"]

reply_sentences = [
    "我去",
    "不",
    "要得",
    "哈哈哈哈哈哈哈",

]

reply_emoji = [

    "v3_0061_a751e648-72f6-4fe8-9e6a-5850d4d34dfg",
    "v2_dc539e42-fbd0-4ad9-b57b-37339d11e9dg"
]


@app.route("/webhook", methods=["GET", "POST"])
def webhook():
    global deal_msg_time
    print("content:{}".format(str(request.json)))

    if (
        request.json.get("type") != None
        and request.json.get("type") == "url_verification"
    ):
        print("Do url_verification")
        print("challenge value: {}".format(request.json["challenge"]))
        challenge = request.json.get("challenge", "")
        rsp = {"challenge": challenge}
        return json.dumps(rsp)

    if (
        request.json.get("event") != None
        and request.json.get("event").get("sender") != None
        and request.json.get("event").get("sender").get("sender_id") != None
        and request.json["event"]["sender"]["sender_id"]["open_id"]
        == "cli_a5d05c6f8cf8100e"
    ):
        return "200"
    
    message_time = request.json["event"]["message"]["create_time"]
    if deal_msg_time.get(message_time) == None:
        deal_msg_time[message_time] = ""
    else:
        return "200"

    message_text = str(request.json["event"]["message"]["content"])
    print("text is {}".format(message_text))

    # 判断是否被@
    if '@' in message_text:
        if request.json.get("event").get("message").get("mentions") != None:
            for a_mention in request.json.get("event").get("message").get("mentions"):
                if "X" in a_mention.get("name"):
                    if random.choice([i for i in range(0, 10)]) < 3:
                        emoji_id = random.choice(reply_emoji)
                        send_emoji(emoji_id, request)
                        return "200"
                    else:
                        reply_text = random.choice(reply_sentences)
                        send_msg(reply_text, request)
                        return "200"
    
    
    for kw in key_words_to_reply:
        if kw in message_text:
            if random.choice([i for i in range(0, 10)]) < 3:
                emoji_id = random.choice(reply_emoji)
                send_emoji(emoji_id, request)
                return "200"
            else:
                reply_text = random.choice(reply_sentences)
                send_msg(reply_text, request)
                return "200"
    #     send_reply("test", request)
    return "200"


# https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal
def renew_tenant_access_token():
    global tenant_access_token
    global expire_time
    data = {
        "app_id": "your_app_id",
        "app_secret": "your_app_secret",
    }
    headers = {
        "Content-Type": "application/json; charset=utf-8",
    }
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    response = requests.post(url, json=data, headers=headers)
    print("send get_tenant_access_token post")

    # Print response content as text
    print("response text: {}".format(response.text))

    data = json.loads(response.text)
    tenant_access_token = data["tenant_access_token"]
    expire_time = datetime.now() + timedelta(seconds=(data["expire"] - 10))
    return


def send_emoji(emoji_id, request):
    global expire_time
    global tenant_access_token

    if expire_time < datetime.now():
        renew_tenant_access_token()

    url = "https://open.feishu.cn/open-apis/im/v1/messages"
    params = {"receive_id_type": "chat_id"}
    msgContent = {
        "file_key": emoji_id,
    }
    req = {
        "receive_id": request.json["event"]["message"]["chat_id"],  # chat id
        "msg_type": "sticker",
        "content": json.dumps(msgContent),
    }
    payload = json.dumps(req)
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),  # your access token
        "Content-Type": "application/json",
    }
    response = requests.request(
        "POST", url, params=params, headers=headers, data=payload
    )
    print(response.headers["X-Tt-Logid"])  # for debug or oncall
    print(response.content)  # Print Response

    return None


def send_msg(text, request):
    global expire_time
    global tenant_access_token

    if expire_time < datetime.now():
        renew_tenant_access_token()

    url = "https://open.feishu.cn/open-apis/im/v1/messages"
    params = {"receive_id_type": "chat_id"}
    msgContent = {
        "text": text,
    }
    req = {
        "receive_id": request.json["event"]["message"]["chat_id"],  # chat id
        "msg_type": "text",
        "content": json.dumps(msgContent),
    }
    payload = json.dumps(req)
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),  # your access token
        "Content-Type": "application/json",
    }
    response = requests.request(
        "POST", url, params=params, headers=headers, data=payload
    )
    print(response.headers["X-Tt-Logid"])  # for debug or oncall
    print(response.content)  # Print Response

    return None


def send_reply(text, request):
    global expire_time
    global tenant_access_token
    if expire_time < datetime.now():
        renew_tenant_access_token()
    data = {"msg_type": "text", "content": {"text": text}}
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),
        "Content-Type": "application/json; charset=utf-8",
    }
    url = "https://open.feishu.cn/open-apis/im/v1/messages/:{}/reply".format(
        request.json["event"]["message"]["message_id"]
    )
    requests.post(url, json=data, headers=headers)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8891)

2. 功能简述

弄了个飞书机器人试试玩

飞书的机器人支持还是蛮好的。不像微信那么封闭,还得去付费搞个 wechaty 啥的。

功能简述

  • 可以读取群聊信息
  • 发现消息内容是X被@,或者聊天内容包含 [“PP”,”pp”] 以后就触发回复逻辑
    • 随机回复一句话或者表情(配置在 reply_sentences 和 reply_emoji)
  • 群消息会以print的形式展示在控制台,想提取新表情啥的可以随意复制其飞书文件代码

这是最简单的回复方式,可以把某某人的常用口头禅和表情放进去,一旦识别到关键词就随机回复一个,经常还蛮有趣的。

3. 基础准备

注:更详细的可以查看飞书的机器人文档:https://open.feishu.cn/document/client-docs/bot-v3/bot-overview

这里针对python记几个主要的

  1. 一个有域名或者公网ip的服务器
  2. 去创建一个企业自建应用:https://open.feishu.cn/app
    • 创建后就能获得
      • App ID
        • “cli_”开头
        • 需要填进
      • App Secret
        • “oTXhyCemc”之类的
    • 需要填进代码里这个地方
      • data = { “app_id”: “your_app_id”, “app_secret”: “your_app_secret”, }
  3. 应用能力中添加“机器人”
  4. 申请到一堆读群聊、发信息等权限
  5. 找一个群建个这个 app 机器人

4. 代码逻辑简述

4.1 基本服务器

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8891)

靠 python 的 flask 库创建一个服务器 ,监听 8891 端口。

之后这个服务就通过以下链接访问

  • http://你的服务器ip或域名地址:8891

但具体的访问内容还需要进一步编写,靠重载这个函数:

@app.route("/webhook", methods=["GET", "POST"])
def webhook():
        return "200"

如上写了以后,就可以通过这个链接

  • http://你的服务器ip或域名地址:8891/webhook

用如果请求是 GET POST 就会走 def webhook() 函数处理请求。比如上面就是返回”200″

我们需要把这个链接填到飞书应用,机器人的 “消息卡片请求网址” 下,之后飞书就知道去访问这个服务了。

4.2 飞书机器人验证

填写了地址以后,飞书会要求验证服务器的有效性,要求1秒内回复其请求中的challenge值。为此要修改代码

@app.route("/webhook", methods=["GET", "POST"])
def webhook():
    global deal_msg_time
    print("content:{}".format(str(request.json)))

    if (
        request.json.get("type") != None
        and request.json.get("type") == "url_verification"
    ):
        print("Do url_verification")
        print("challenge value: {}".format(request.json["challenge"]))
        challenge = request.json.get("challenge", "")
        rsp = {"challenge": challenge}
        return json.dumps(rsp)

如上

  • 如果发现请求的type是url_verification
    • 则解析出challenge值并打包成json发回去

正常这样就能在飞书平台上验证通过了

4.3 获取 tenant_access_token

机器人发消息是需要一个 tenant_access_token 的

这个 token 有时效性,如果过期了需要申请

申请方式如下(会保存在 global tenant_access_token 中)

# https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal
def renew_tenant_access_token():
    global tenant_access_token
    global expire_time
    data = {
        "app_id": "your_app_id",
        "app_secret": "your_app_secret",
    }
    headers = {
        "Content-Type": "application/json; charset=utf-8",
    }
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    response = requests.post(url, json=data, headers=headers)
    print("send get_tenant_access_token post")

    # Print response content as text
    print("response text: {}".format(response.text))

    data = json.loads(response.text)
    tenant_access_token = data["tenant_access_token"]
    expire_time = datetime.now() + timedelta(seconds=(data["expire"] - 10))
    return

所以每次要回复消息之前检测一下当前的 token 是否过期,如果过期就重新申请一下再发,如代码中的片段:

def send_msg(text, request):
    global expire_time
    global tenant_access_token

    if expire_time < datetime.now():
        renew_tenant_access_token()

4.4 避免重复回复

飞书机器人有个机制,对每个向服务器的消息

  • 服务器如果没有2秒内回复,就会6秒后再发一次,如果再没回复就好像1分钟还是多少再发一次,非常保险

但我们在处理消息的时候,如想去申请 token 或者想接一个 LLM 的 API,都可能导致 处理+传送 的时间超过 2 秒,进而导致多收到一次消息,多做一次处理。

最优雅的做法是无论什么消息都先 return “200” 再进行处理,之后正式回复。但好像 flask 不支持对一个 request 进行两次回复?

所以我暂用的笨办法:用一个 dict deal_msg_time 记录收到的请求的消息(实际飞书对话中消息的创建时间),如果重复的就不再处理,不重复的就添加进去记着。

发表评论