|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
"""
|
|
|
|
|
|
===================================
|
|
|
|
|
|
钉钉 Stream 模式适配器
|
|
|
|
|
|
===================================
|
|
|
|
|
|
|
|
|
|
|
|
使用钉钉官方 Stream SDK 接入机器人,无需公网 IP 和 Webhook 配置。
|
|
|
|
|
|
|
|
|
|
|
|
优势:
|
|
|
|
|
|
- 不需要公网 IP 或域名
|
|
|
|
|
|
- 不需要配置 Webhook URL
|
|
|
|
|
|
- 通过 WebSocket 长连接接收消息
|
|
|
|
|
|
- 更简单的接入方式
|
|
|
|
|
|
|
|
|
|
|
|
依赖:
|
|
|
|
|
|
pip install dingtalk-stream
|
|
|
|
|
|
|
|
|
|
|
|
钉钉 Stream SDK:
|
|
|
|
|
|
https://github.com/open-dingtalk/dingtalk-stream-sdk-python
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import threading
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import Optional, Callable, Any
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
# 尝试导入钉钉 Stream SDK
|
|
|
|
|
|
try:
|
|
|
|
|
|
import dingtalk_stream
|
|
|
|
|
|
from dingtalk_stream import AckMessage
|
|
|
|
|
|
|
|
|
|
|
|
DINGTALK_STREAM_AVAILABLE = True
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
DINGTALK_STREAM_AVAILABLE = False
|
|
|
|
|
|
logger.warning("[DingTalk Stream] dingtalk-stream SDK 未安装,Stream 模式不可用")
|
|
|
|
|
|
logger.warning("[DingTalk Stream] 请运行: pip install dingtalk-stream")
|
|
|
|
|
|
|
|
|
|
|
|
from bot.models import BotMessage, BotResponse, ChatType
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DingtalkStreamHandler:
|
|
|
|
|
|
"""
|
|
|
|
|
|
钉钉 Stream 模式消息处理器
|
|
|
|
|
|
|
|
|
|
|
|
将 Stream SDK 的回调转换为统一的 BotMessage 格式,
|
|
|
|
|
|
并调用命令分发器处理。
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, on_message: Callable[[BotMessage], BotResponse]):
|
|
|
|
|
|
"""
|
|
|
|
|
|
Args:
|
|
|
|
|
|
on_message: 消息处理回调函数,接收 BotMessage 返回 BotResponse
|
|
|
|
|
|
"""
|
|
|
|
|
|
self._on_message = on_message
|
|
|
|
|
|
self._logger = logger
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _truncate_log_content(text: str, max_len: int = 200) -> str:
|
|
|
|
|
|
cleaned = text.replace("\n", " ").strip()
|
|
|
|
|
|
if len(cleaned) > max_len:
|
|
|
|
|
|
return f"{cleaned[:max_len]}..."
|
|
|
|
|
|
return cleaned
|
|
|
|
|
|
|
|
|
|
|
|
def _log_incoming_message(self, message: BotMessage) -> None:
|
|
|
|
|
|
content = message.raw_content or message.content or ""
|
|
|
|
|
|
summary = self._truncate_log_content(content)
|
|
|
|
|
|
self._logger.info(
|
|
|
|
|
|
"[DingTalk Stream] Incoming message: msg_id=%s user_id=%s chat_id=%s chat_type=%s content=%s",
|
|
|
|
|
|
message.message_id,
|
|
|
|
|
|
message.user_id,
|
|
|
|
|
|
message.chat_id,
|
|
|
|
|
|
getattr(message.chat_type, "value", message.chat_type),
|
|
|
|
|
|
summary,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if DINGTALK_STREAM_AVAILABLE:
|
|
|
|
|
|
class _ChatbotHandler(dingtalk_stream.ChatbotHandler):
|
|
|
|
|
|
"""内部消息处理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, parent: 'DingtalkStreamHandler'):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self._parent = parent
|
|
|
|
|
|
self.logger = logger
|
|
|
|
|
|
|
|
|
|
|
|
async def process(self, callback: dingtalk_stream.CallbackMessage):
|
|
|
|
|
|
"""处理收到的消息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 解析消息
|
|
|
|
|
|
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
|
|
|
|
|
|
|
|
|
|
|
|
# 转换为统一格式
|
|
|
|
|
|
bot_message = self._parent._parse_stream_message(incoming, callback.data)
|
|
|
|
|
|
|
|
|
|
|
|
if bot_message:
|
|
|
|
|
|
self._parent._log_incoming_message(bot_message)
|
|
|
|
|
|
# 调用消息处理回调
|
|
|
|
|
|
response = self._parent._on_message(bot_message)
|
|
|
|
|
|
|
|
|
|
|
|
# 发送回复
|
|
|
|
|
|
if response and response.text:
|
|
|
|
|
|
# 构建 @用户 前缀(群聊场景下需要在文本中包含 @用户名)
|
|
|
|
|
|
if response.at_user and incoming.sender_nick:
|
|
|
|
|
|
if response.markdown:
|
|
|
|
|
|
self.reply_markdown(
|
|
|
|
|
|
title="股票分析助手",
|
|
|
|
|
|
text=f"@{incoming.sender_nick} " + response.text,
|
|
|
|
|
|
incoming_message=incoming
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.reply_text(response.text, incoming)
|
|
|
|
|
|
|
|
|
|
|
|
return AckMessage.STATUS_OK, 'OK'
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
self.logger.error(f"[DingTalk Stream] 处理消息失败: {e}")
|
|
|
|
|
|
self.logger.exception(e)
|
|
|
|
|
|
return AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)
|
|
|
|
|
|
|
|
|
|
|
|
def create_handler(self) -> '_ChatbotHandler':
|
|
|
|
|
|
"""创建 SDK 需要的处理器实例"""
|
|
|
|
|
|
return self._ChatbotHandler(self)
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_stream_message(self, incoming: Any, raw_data: dict) -> Optional[BotMessage]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
解析 Stream 消息为统一格式
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
incoming: ChatbotMessage 对象
|
|
|
|
|
|
raw_data: 原始回调数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
raw_data = dict(raw_data or {})
|
|
|
|
|
|
|
|
|
|
|
|
# 获取消息内容
|
|
|
|
|
|
raw_content = incoming.text.content if incoming.text else ''
|
|
|
|
|
|
|
|
|
|
|
|
# 提取命令(去除 @机器人)
|
|
|
|
|
|
content = self._extract_command(raw_content)
|
|
|
|
|
|
|
|
|
|
|
|
# 会话类型
|
|
|
|
|
|
conversation_type = getattr(incoming, 'conversation_type', None)
|
|
|
|
|
|
if conversation_type == '1':
|
|
|
|
|
|
chat_type = ChatType.PRIVATE
|
|
|
|
|
|
elif conversation_type == '2':
|
|
|
|
|
|
chat_type = ChatType.GROUP
|
|
|
|
|
|
else:
|
|
|
|
|
|
chat_type = ChatType.UNKNOWN
|
|
|
|
|
|
|
|
|
|
|
|
# 是否 @了机器人(Stream 模式下收到的消息一般都是 @机器人的)
|
|
|
|
|
|
mentioned = True
|
|
|
|
|
|
|
|
|
|
|
|
# 提取 sessionWebhook,便于异步推送
|
|
|
|
|
|
session_webhook = (
|
|
|
|
|
|
getattr(incoming, 'session_webhook', None)
|
|
|
|
|
|
or raw_data.get('sessionWebhook')
|
|
|
|
|
|
or raw_data.get('session_webhook')
|
|
|
|
|
|
)
|
|
|
|
|
|
if session_webhook:
|
|
|
|
|
|
raw_data['_session_webhook'] = session_webhook
|
|
|
|
|
|
|
|
|
|
|
|
return BotMessage(
|
|
|
|
|
|
platform='dingtalk',
|
|
|
|
|
|
message_id=getattr(incoming, 'msg_id', '') or '',
|
|
|
|
|
|
user_id=getattr(incoming, 'sender_id', '') or '',
|
|
|
|
|
|
user_name=getattr(incoming, 'sender_nick', '') or '',
|
|
|
|
|
|
chat_id=getattr(incoming, 'conversation_id', '') or '',
|
|
|
|
|
|
chat_type=chat_type,
|
|
|
|
|
|
content=content,
|
|
|
|
|
|
raw_content=raw_content,
|
|
|
|
|
|
mentioned=mentioned,
|
|
|
|
|
|
mentions=[],
|
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
|
raw_data=raw_data,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"[DingTalk Stream] 解析消息失败: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_command(self, text: str) -> str:
|
|
|
|
|
|
"""提取命令内容(去除 @机器人)"""
|
|
|
|
|
|
import re
|
|
|
|
|
|
text = re.sub(r'^@[\S]+\s*', '', text.strip())
|
|
|
|
|
|
return text.strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DingtalkStreamClient:
|
|
|
|
|
|
"""
|
|
|
|
|
|
钉钉 Stream 模式客户端
|
|
|
|
|
|
|
|
|
|
|
|
封装 dingtalk-stream SDK,提供简单的启动接口。
|
|
|
|
|
|
|
|
|
|
|
|
使用方式:
|
|
|
|
|
|
client = DingtalkStreamClient()
|
|
|
|
|
|
client.start() # 阻塞运行
|
|
|
|
|
|
|
|
|
|
|
|
# 或者在后台运行
|
|
|
|
|
|
client.start_background()
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
|
self,
|
|
|
|
|
|
client_id: Optional[str] = None,
|
|
|
|
|
|
client_secret: Optional[str] = None
|
|
|
|
|
|
):
|
|
|
|
|
|
"""
|
|
|
|
|
|
Args:
|
|
|
|
|
|
client_id: 应用 AppKey(不传则从配置读取)
|
|
|
|
|
|
client_secret: 应用 AppSecret(不传则从配置读取)
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not DINGTALK_STREAM_AVAILABLE:
|
|
|
|
|
|
raise ImportError(
|
|
|
|
|
|
"dingtalk-stream SDK 未安装。\n"
|
|
|
|
|
|
"请运行: pip install dingtalk-stream"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
from src.config import get_config
|
|
|
|
|
|
config = get_config()
|
|
|
|
|
|
|
|
|
|
|
|
self._client_id = client_id or getattr(config, 'dingtalk_app_key', None)
|
|
|
|
|
|
self._client_secret = client_secret or getattr(config, 'dingtalk_app_secret', None)
|
|
|
|
|
|
|
|
|
|
|
|
if not self._client_id or not self._client_secret:
|
|
|
|
|
|
raise ValueError(
|
|
|
|
|
|
"钉钉 Stream 模式需要配置 DINGTALK_APP_KEY 和 DINGTALK_APP_SECRET"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
self._client: Optional[dingtalk_stream.DingTalkStreamClient] = None
|
|
|
|
|
|
self._background_thread: Optional[threading.Thread] = None
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
|
|
|
|
|
def _create_message_handler(self) -> Callable[[BotMessage], BotResponse]:
|
|
|
|
|
|
"""创建消息处理函数"""
|
|
|
|
|
|
|
|
|
|
|
|
def handle_message(message: BotMessage) -> BotResponse:
|
|
|
|
|
|
from bot.dispatcher import get_dispatcher
|
|
|
|
|
|
dispatcher = get_dispatcher()
|
|
|
|
|
|
return dispatcher.dispatch(message)
|
|
|
|
|
|
|
|
|
|
|
|
return handle_message
|
|
|
|
|
|
|
|
|
|
|
|
def start(self) -> None:
|
|
|
|
|
|
"""
|
|
|
|
|
|
启动 Stream 客户端(阻塞)
|
|
|
|
|
|
|
|
|
|
|
|
此方法会阻塞当前线程,直到客户端停止。
|
|
|
|
|
|
"""
|
|
|
|
|
|
logger.info("[DingTalk Stream] 正在启动...")
|
|
|
|
|
|
|
|
|
|
|
|
# 创建凭证
|
|
|
|
|
|
credential = dingtalk_stream.Credential(
|
|
|
|
|
|
self._client_id,
|
|
|
|
|
|
self._client_secret
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 创建客户端
|
|
|
|
|
|
self._client = dingtalk_stream.DingTalkStreamClient(credential)
|
|
|
|
|
|
|
|
|
|
|
|
# 注册消息处理器
|
|
|
|
|
|
handler = DingtalkStreamHandler(self._create_message_handler())
|
|
|
|
|
|
self._client.register_callback_handler(
|
|
|
|
|
|
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
|
|
|
|
|
|
handler.create_handler()
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
self._running = True
|
|
|
|
|
|
logger.info("[DingTalk Stream] 客户端已启动,等待消息...")
|
|
|
|
|
|
|
|
|
|
|
|
# 启动(阻塞)
|
|
|
|
|
|
self._client.start_forever()
|
|
|
|
|
|
|
|
|
|
|
|
def start_background(self) -> None:
|
|
|
|
|
|
"""
|
|
|
|
|
|
在后台线程启动 Stream 客户端(非阻塞)
|
|
|
|
|
|
|
|
|
|
|
|
适用于与其他服务(如 WebUI)同时运行的场景。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if self._background_thread and self._background_thread.is_alive():
|
|
|
|
|
|
logger.warning("[DingTalk Stream] 客户端已在运行")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
self._running = True
|
|
|
|
|
|
self._background_thread = threading.Thread(
|
|
|
|
|
|
target=self._run_in_background,
|
|
|
|
|
|
daemon=True,
|
|
|
|
|
|
name="DingtalkStreamClient"
|
|
|
|
|
|
)
|
|
|
|
|
|
self._background_thread.start()
|
|
|
|
|
|
logger.info("[DingTalk Stream] 后台客户端已启动")
|
|
|
|
|
|
|
|
|
|
|
|
def _run_in_background(self) -> None:
|
|
|
|
|
|
"""后台运行(处理异常和重连)"""
|
|
|
|
|
|
while self._running:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.start()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"[DingTalk Stream] 运行异常: {e}")
|
|
|
|
|
|
if self._running:
|
|
|
|
|
|
logger.info("[DingTalk Stream] 5 秒后重连...")
|
|
|
|
|
|
import time
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self) -> None:
|
|
|
|
|
|
"""停止客户端"""
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
logger.info("[DingTalk Stream] 客户端已停止")
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def is_running(self) -> bool:
|
|
|
|
|
|
"""是否正在运行"""
|
|
|
|
|
|
return self._running
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 全局客户端实例
|
|
|
|
|
|
_stream_client: Optional[DingtalkStreamClient] = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_dingtalk_stream_client() -> Optional[DingtalkStreamClient]:
|
|
|
|
|
|
"""获取全局 Stream 客户端实例"""
|
|
|
|
|
|
global _stream_client
|
|
|
|
|
|
|
|
|
|
|
|
if _stream_client is None and DINGTALK_STREAM_AVAILABLE:
|
|
|
|
|
|
try:
|
|
|
|
|
|
_stream_client = DingtalkStreamClient()
|
|
|
|
|
|
except (ImportError, ValueError) as e:
|
|
|
|
|
|
logger.warning(f"[DingTalk Stream] 无法创建客户端: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
return _stream_client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_dingtalk_stream_background() -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
在后台启动钉钉 Stream 客户端
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
是否成功启动
|
|
|
|
|
|
"""
|
|
|
|
|
|
client = get_dingtalk_stream_client()
|
|
|
|
|
|
if client:
|
|
|
|
|
|
client.start_background()
|
|
|
|
|
|
return True
|
|
|
|
|
|
return False
|