python实现飞书机器人-大模型版

1. 完整代码

import base64
import hashlib
import hmac
import json
import random
import time
from datetime import datetime, timedelta
from http import HTTPStatus

import dashscope
import requests
from flask import Flask, jsonify, request

tenant_access_token = ""
expire_time = datetime.now() - timedelta(seconds=6)
app = Flask(__name__)

deal_msg_time = {}

key_words_to_reply = ["PP","pp"]

reply_sentences = [
    "我去",
    "不",
    "要得",
    "哈哈哈哈哈哈哈",

]

reply_sentences_dirty = [
    "?!",
    "我日!!!",

]

reply_emoji = [

    "v2_9cf38100-89b9-4e07-8660-f8fb6605d27g",
    "v2_1df4f781-d2cb-445d-8891-357223184dfg",
]


dashscope.api_key = "sk-XXX" # 填你的阿里云dashscope的api_key

prompy_text_1 = "你来扮演一个角色:* 你的名字叫pp \n  * 你是一个四川人 \n * 你非常搞笑 \n * 只要满足角色风格,你可以随便使用脏话 \n * 请尽量提炼并参考我提供给你的说话的风格 \n 下面给pp说过的一些话,我把每句话都用括号括起来:{} \n"
prompt_text_2 = "现在有人对这个角色说“{}” \n"
prompt_text_3 = "请你模拟这个角色的说话风格进行一句话简短回复,尽量选一句角色说过的话"
text_cave = ""
for text in reply_sentences:
    text_cave = text_cave + "({})".format(text)


def ask_qwen(aim_test):
    global text_cave, prompy_text_1, prompy_text_2, prompy_text_3
    prompt_text = (
        prompy_text_1.format(text_cave) + prompt_text_2.format(aim_test) + prompt_text_3
    )
    response = dashscope.Generation.call(
        prompt=prompt_text,
        model="qwen-14b-chat",
        seed=1234,
        top_p=0.8,
        result_format="message",
        enable_search=False,
        max_tokens=1500,
        temperature=1.0,
        repetition_penalty=1.0,
    )
    return response


def ai_reply(request):
    message_text = str(request.json["event"]["message"]["content"])
    response = ask_qwen(message_text)
    if response.status_code == HTTPStatus.OK:
        ai_reply_text = json.loads(str(ask_qwen(message_text)))["output"]["choices"][0][
            "message"
        ]["content"]
        print("ai_reply_text:{}".format(ai_reply_text))
        send_msg(ai_reply_text.replace('"', ""), request)
        return "200"
    else:
        print(
            "Request id: %s, Status code: %s, error code: %s, error message: %s"
            % (
                response.request_id,
                response.status_code,
                response.code,
                response.message,
            )
        )
        send_msg(random.choice(reply_sentences_dirty), request)
        return "200"


@app.route("/webhook", methods=["GET", "POST"])
def webhook():
    global deal_msg_time
    print("content:{}".format(str(request.json)))

    if (
        request.json.get("type") != None
        and request.json.get("type") == "url_verification"
    ):
        print("Do url_verification")
        print("challenge value: {}".format(request.json["challenge"]))
        challenge = request.json.get("challenge", "")
        rsp = {"challenge": challenge}
        return json.dumps(rsp)

    if (
        request.json.get("event") != None
        and request.json.get("event").get("sender") != None
        and request.json.get("event").get("sender").get("sender_id") != None
        and request.json["event"]["sender"]["sender_id"]["open_id"]
        == "cli_a5d05c6f8cf8100e"
    ):
        return "200"
    
    message_time = request.json["event"]["message"]["create_time"]
    if deal_msg_time.get(message_time) == None:
        deal_msg_time[message_time] = ""
    else:
        return "200"

    message_text = str(request.json["event"]["message"]["content"])
    print("text is {}".format(message_text))

    # 判断是否被@
    if "@" in message_text:
        if request.json.get("event").get("message").get("mentions") != None:
            for a_mention in request.json.get("event").get("message").get("mentions"):
                if "X" in a_mention.get("name"):
                    ai_reply(request)
                    return "200"

    for kw in key_words_to_reply:
        if kw in message_text:
            ai_reply(request)
    #     send_reply("test", request)
    return "200"


# https://open.feishu.cn/document/server-docs/authentication-management/access-token/tenant_access_token_internal
def renew_tenant_access_token():
    global tenant_access_token
    global expire_time
    data = {
        "app_id": "cli_a5d05c6f8cf8100e",
        "app_secret": "oTXhyCemcTuuvypQjGnhDfvbwXz1bKPx",
    }
    headers = {
        "Content-Type": "application/json; charset=utf-8",
    }
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    response = requests.post(url, json=data, headers=headers)
    print("send get_tenant_access_token post")

    # Print response content as text
    print("response text: {}".format(response.text))

    data = json.loads(response.text)
    tenant_access_token = data["tenant_access_token"]
    expire_time = datetime.now() + timedelta(seconds=(data["expire"] - 10))
    return


def send_emoji(emoji_id, request):
    global expire_time
    global tenant_access_token

    if expire_time < datetime.now():
        renew_tenant_access_token()

    url = "https://open.feishu.cn/open-apis/im/v1/messages"
    params = {"receive_id_type": "chat_id"}
    msgContent = {
        "file_key": emoji_id,
    }
    req = {
        "receive_id": request.json["event"]["message"]["chat_id"],  # chat id
        "msg_type": "sticker",
        "content": json.dumps(msgContent),
    }
    payload = json.dumps(req)
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),  # your access token
        "Content-Type": "application/json",
    }
    response = requests.request(
        "POST", url, params=params, headers=headers, data=payload
    )
    print(response.headers["X-Tt-Logid"])  # for debug or oncall
    print(response.content)  # Print Response

    return None


def send_msg(text, request):
    global expire_time
    global tenant_access_token

    if expire_time < datetime.now():
        renew_tenant_access_token()

    url = "https://open.feishu.cn/open-apis/im/v1/messages"
    params = {"receive_id_type": "chat_id"}
    msgContent = {
        "text": text,
    }
    req = {
        "receive_id": request.json["event"]["message"]["chat_id"],  # chat id
        "msg_type": "text",
        "content": json.dumps(msgContent),
    }
    payload = json.dumps(req)
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),  # your access token
        "Content-Type": "application/json",
    }
    response = requests.request(
        "POST", url, params=params, headers=headers, data=payload
    )
    print(response.headers["X-Tt-Logid"])  # for debug or oncall
    print(response.content)  # Print Response

    return None


def send_reply(text, request):
    global expire_time
    global tenant_access_token
    if expire_time < datetime.now():
        renew_tenant_access_token()
    data = {"msg_type": "text", "content": {"text": text}}
    headers = {
        "Authorization": "Bearer {}".format(tenant_access_token),
        "Content-Type": "application/json; charset=utf-8",
    }
    url = "https://open.feishu.cn/open-apis/im/v1/messages/:{}/reply".format(
        request.json["event"]["message"]["message_id"]
    )
    requests.post(url, json=data, headers=headers)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8891)

# print(json.loads(str(ask_qwen("干妈")))["output"]["choices"][0]["message"]['content'])

2. 功能简述

一个飞书群聊机器人

  • 检测群聊中的信息特征
    • 如果包含关键词(这个数组key_words_to_reply 中的关键词)
    • 或被@的人的名字包含X则进行回复
  • 向阿里千问API发一个prompt,要求其扮演pp进行回复(需要用reply_sentences去记一些pp的特征言语)
  • 将阿里千问返回的内容发送到群聊

3. 基础准备

3.1 一个基础的飞书机器人,实现详见:

https://pangruitao.com:1180/post/4501(在新窗口中打开)

3.2 阿里云平台去申请一个API KEY

也可以用其他模型的API,我暂用的阿里云千问玩一玩

https://dashscope.console.aliyun.com/overview

把获得的 api key 填入代码,即可

dashscope.api_key = "sk-XXX" # 填你的阿里云dashscope的api_key

4. 大模型API调用代码

def ask_qwen(aim_test):
    global text_cave, prompy_text_1, prompy_text_2, prompy_text_3
    prompt_text = (
        prompy_text_1.format(text_cave) + prompt_text_2.format(aim_test) + prompt_text_3
    )
    response = dashscope.Generation.call(
        prompt=prompt_text,
        model="qwen-14b-chat", # 选择模型,这个是最贵的,但相对最强的
        seed=1234,
        top_p=0.8, # 选择温度
        result_format="message",
        enable_search=False,
        max_tokens=1500,
        temperature=1.0,
        repetition_penalty=1.0,
    )
    return response


def ai_reply(request):
    message_text = str(request.json["event"]["message"]["content"])
    response = ask_qwen(message_text)
    if response.status_code == HTTPStatus.OK:
        ai_reply_text = json.loads(str(ask_qwen(message_text)))["output"]["choices"][0][
            "message"
        ]["content"]
        print("ai_reply_text:{}".format(ai_reply_text))
        send_msg(ai_reply_text.replace('"', ""), request)
        return "200"
    else:
        print(
            "Request id: %s, Status code: %s, error code: %s, error message: %s"
            % (
                response.request_id,
                response.status_code,
                response.code,
                response.message,
            )
        )
        send_msg(random.choice(reply_sentences_dirty), request)
        return "200"

发表评论