1. 完整代码
import base64
import hashlib
import hmac
import json
import random
import time
from datetime import datetime, timedelta
import requests
from flask import Flask, jsonify, request
tenant_access_token = ""
expire_time = datetime.now() - timedelta(seconds=6)
app = Flask(__name__)
deal_msg_time = {}
key_words_to_reply = ["PP","pp"]
reply_sentences = [
"我去",
"不",
"要得",
"哈哈哈哈哈哈哈",
]
reply_emoji = [
"v3_0061_a751e648-72f6-4fe8-9e6a-5850d4d34dfg",
"v2_dc539e42-fbd0-4ad9-b57b-37339d11e9dg"
]
@app.route("/webhook", methods=["GET", "POST"])
def webhook():
global deal_msg_time
print("content:{}".format(str(request.json)))
if (
request.json.get("type") != None
and request.json.get("type") == "url_verification"
):
print("Do url_verification")
print("challenge value: {}".format(request.json["challenge"]))
challenge = request.json.get("challenge", "")
rsp = {"challenge": challenge}
return json.dumps(rsp)
if (
request.json.get("event") != None
and request.json.get("event").get("sender") != None
and request.json.get("event").get("sender").get("sender_id") != None
and request.json["event"]["sender"]["sender_id"]["open_id"]
== "cli_a5d05c6f8cf8100e"
):
return "200"
message_time = request.json["event"]["message"]["create_time"]
if deal_msg_time.get(message_time) == None:
deal_msg_time[message_time] = ""
else:
return "200"
message_text = str(request.json["event"]["message"]["content"])
print("text is {}".format(message_text))
# 判断是否被@
if '@' in message_text:
if request.json.get("event").get("message").get("mentions") != None:
for a_mention in request.json.get("event").get("message").get("mentions"):
if "X" in a_mention.get("name"):
if random.choice([i for i in range(0, 10)]) < 3:
emoji_id = random.choice(reply_emoji)
send_emoji(emoji_id, request)
return "200"
else:
reply_text = random.choice(reply_sentences)
send_msg(reply_text, request)
return "200"
for kw in key_words_to_reply:
if kw in message_text:
if random.choice([i for i in range(0, 10)]) < 3:
emoji_id = random.choice(reply_emoji)
send_emoji(emoji_id, request)
return "200"
else:
reply_text = random.choice(reply_sentences)
send_msg(reply_text, request)
return "200"
# send_reply("test", request)
return "200"
# https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal
def renew_tenant_access_token():
global tenant_access_token
global expire_time
data = {
"app_id": "your_app_id",
"app_secret": "your_app_secret",
}
headers = {
"Content-Type": "application/json; charset=utf-8",
}
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
response = requests.post(url, json=data, headers=headers)
print("send get_tenant_access_token post")
# Print response content as text
print("response text: {}".format(response.text))
data = json.loads(response.text)
tenant_access_token = data["tenant_access_token"]
expire_time = datetime.now() + timedelta(seconds=(data["expire"] - 10))
return
def send_emoji(emoji_id, request):
global expire_time
global tenant_access_token
if expire_time < datetime.now():
renew_tenant_access_token()
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {"receive_id_type": "chat_id"}
msgContent = {
"file_key": emoji_id,
}
req = {
"receive_id": request.json["event"]["message"]["chat_id"], # chat id
"msg_type": "sticker",
"content": json.dumps(msgContent),
}
payload = json.dumps(req)
headers = {
"Authorization": "Bearer {}".format(tenant_access_token), # your access token
"Content-Type": "application/json",
}
response = requests.request(
"POST", url, params=params, headers=headers, data=payload
)
print(response.headers["X-Tt-Logid"]) # for debug or oncall
print(response.content) # Print Response
return None
def send_msg(text, request):
global expire_time
global tenant_access_token
if expire_time < datetime.now():
renew_tenant_access_token()
url = "https://open.feishu.cn/open-apis/im/v1/messages"
params = {"receive_id_type": "chat_id"}
msgContent = {
"text": text,
}
req = {
"receive_id": request.json["event"]["message"]["chat_id"], # chat id
"msg_type": "text",
"content": json.dumps(msgContent),
}
payload = json.dumps(req)
headers = {
"Authorization": "Bearer {}".format(tenant_access_token), # your access token
"Content-Type": "application/json",
}
response = requests.request(
"POST", url, params=params, headers=headers, data=payload
)
print(response.headers["X-Tt-Logid"]) # for debug or oncall
print(response.content) # Print Response
return None
def send_reply(text, request):
global expire_time
global tenant_access_token
if expire_time < datetime.now():
renew_tenant_access_token()
data = {"msg_type": "text", "content": {"text": text}}
headers = {
"Authorization": "Bearer {}".format(tenant_access_token),
"Content-Type": "application/json; charset=utf-8",
}
url = "https://open.feishu.cn/open-apis/im/v1/messages/:{}/reply".format(
request.json["event"]["message"]["message_id"]
)
requests.post(url, json=data, headers=headers)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8891)
2. 功能简述
弄了个飞书机器人试试玩
飞书的机器人支持还是蛮好的。不像微信那么封闭,还得去付费搞个 wechaty 啥的。
功能简述
- 可以读取群聊信息
- 发现消息内容是X被@,或者聊天内容包含 [“PP”,”pp”] 以后就触发回复逻辑
- 随机回复一句话或者表情(配置在 reply_sentences 和 reply_emoji)
- 群消息会以print的形式展示在控制台,想提取新表情啥的可以随意复制其飞书文件代码
这是最简单的回复方式,可以把某某人的常用口头禅和表情放进去,一旦识别到关键词就随机回复一个,经常还蛮有趣的。
3. 基础准备
注:更详细的可以查看飞书的机器人文档:https://open.feishu.cn/document/client-docs/bot-v3/bot-overview
这里针对python记几个主要的
- 一个有域名或者公网ip的服务器
- 去创建一个企业自建应用:https://open.feishu.cn/app
- 创建后就能获得
- App ID
- “cli_”开头
- 需要填进
- App Secret
- “oTXhyCemc”之类的
- App ID
- 需要填进代码里这个地方
- data = { “app_id”: “your_app_id”, “app_secret”: “your_app_secret”, }
- 创建后就能获得
- 应用能力中添加“机器人”
- 申请到一堆读群聊、发信息等权限
- 找一个群建个这个 app 机器人
4. 代码逻辑简述
4.1 基本服务器
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8891)
靠 python 的 flask 库创建一个服务器 ,监听 8891 端口。
之后这个服务就通过以下链接访问
- http://你的服务器ip或域名地址:8891
但具体的访问内容还需要进一步编写,靠重载这个函数:
@app.route("/webhook", methods=["GET", "POST"])
def webhook():
return "200"
如上写了以后,就可以通过这个链接
- http://你的服务器ip或域名地址:8891/webhook
用如果请求是 GET POST 就会走 def webhook() 函数处理请求。比如上面就是返回”200″
我们需要把这个链接填到飞书应用,机器人的 “消息卡片请求网址” 下,之后飞书就知道去访问这个服务了。
4.2 飞书机器人验证
填写了地址以后,飞书会要求验证服务器的有效性,要求1秒内回复其请求中的challenge值。为此要修改代码
@app.route("/webhook", methods=["GET", "POST"])
def webhook():
global deal_msg_time
print("content:{}".format(str(request.json)))
if (
request.json.get("type") != None
and request.json.get("type") == "url_verification"
):
print("Do url_verification")
print("challenge value: {}".format(request.json["challenge"]))
challenge = request.json.get("challenge", "")
rsp = {"challenge": challenge}
return json.dumps(rsp)
如上
- 如果发现请求的type是url_verification
- 则解析出challenge值并打包成json发回去
正常这样就能在飞书平台上验证通过了
4.3 获取 tenant_access_token
机器人发消息是需要一个 tenant_access_token 的
这个 token 有时效性,如果过期了需要申请
申请方式如下(会保存在 global tenant_access_token 中)
# https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal
def renew_tenant_access_token():
global tenant_access_token
global expire_time
data = {
"app_id": "your_app_id",
"app_secret": "your_app_secret",
}
headers = {
"Content-Type": "application/json; charset=utf-8",
}
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
response = requests.post(url, json=data, headers=headers)
print("send get_tenant_access_token post")
# Print response content as text
print("response text: {}".format(response.text))
data = json.loads(response.text)
tenant_access_token = data["tenant_access_token"]
expire_time = datetime.now() + timedelta(seconds=(data["expire"] - 10))
return
所以每次要回复消息之前检测一下当前的 token 是否过期,如果过期就重新申请一下再发,如代码中的片段:
def send_msg(text, request):
global expire_time
global tenant_access_token
if expire_time < datetime.now():
renew_tenant_access_token()
4.4 避免重复回复
飞书机器人有个机制,对每个向服务器的消息
- 服务器如果没有2秒内回复,就会6秒后再发一次,如果再没回复就好像1分钟还是多少再发一次,非常保险
但我们在处理消息的时候,如想去申请 token 或者想接一个 LLM 的 API,都可能导致 处理+传送 的时间超过 2 秒,进而导致多收到一次消息,多做一次处理。
最优雅的做法是无论什么消息都先 return “200” 再进行处理,之后正式回复。但好像 flask 不支持对一个 request 进行两次回复?
所以我暂用的笨办法:用一个 dict deal_msg_time 记录收到的请求的消息(实际飞书对话中消息的创建时间),如果重复的就不再处理,不重复的就添加进去记着。