You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""
===================================
钉钉 Stream 模式适配器
===================================
使用钉钉官方 Stream SDK 接入机器人,无需公网 IP 和 Webhook 配置。
优势:
- 不需要公网 IP 或域名
- 不需要配置 Webhook URL
- 通过 WebSocket 长连接接收消息
- 更简单的接入方式
依赖:
pip install dingtalk-stream
钉钉 Stream SDK
https://github.com/open-dingtalk/dingtalk-stream-sdk-python
"""
import logging
import asyncio
import threading
from datetime import datetime
from typing import Optional, Callable, Any
logger = logging.getLogger(__name__)
# 尝试导入钉钉 Stream SDK
try:
import dingtalk_stream
from dingtalk_stream import AckMessage
DINGTALK_STREAM_AVAILABLE = True
except ImportError:
DINGTALK_STREAM_AVAILABLE = False
logger.warning("[DingTalk Stream] dingtalk-stream SDK 未安装Stream 模式不可用")
logger.warning("[DingTalk Stream] 请运行: pip install dingtalk-stream")
from bot.models import BotMessage, BotResponse, ChatType
class DingtalkStreamHandler:
"""
钉钉 Stream 模式消息处理器
将 Stream SDK 的回调转换为统一的 BotMessage 格式,
并调用命令分发器处理。
"""
def __init__(self, on_message: Callable[[BotMessage], BotResponse]):
"""
Args:
on_message: 消息处理回调函数,接收 BotMessage 返回 BotResponse
"""
self._on_message = on_message
self._logger = logger
@staticmethod
def _truncate_log_content(text: str, max_len: int = 200) -> str:
cleaned = text.replace("\n", " ").strip()
if len(cleaned) > max_len:
return f"{cleaned[:max_len]}..."
return cleaned
def _log_incoming_message(self, message: BotMessage) -> None:
content = message.raw_content or message.content or ""
summary = self._truncate_log_content(content)
self._logger.info(
"[DingTalk Stream] Incoming message: msg_id=%s user_id=%s chat_id=%s chat_type=%s content=%s",
message.message_id,
message.user_id,
message.chat_id,
getattr(message.chat_type, "value", message.chat_type),
summary,
)
if DINGTALK_STREAM_AVAILABLE:
class _ChatbotHandler(dingtalk_stream.ChatbotHandler):
"""内部消息处理器"""
def __init__(self, parent: 'DingtalkStreamHandler'):
super().__init__()
self._parent = parent
self.logger = logger
async def process(self, callback: dingtalk_stream.CallbackMessage):
"""处理收到的消息"""
try:
# 解析消息
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
# 转换为统一格式
bot_message = self._parent._parse_stream_message(incoming, callback.data)
if bot_message:
self._parent._log_incoming_message(bot_message)
# 调用消息处理回调
response = self._parent._on_message(bot_message)
# 发送回复
if response and response.text:
# 构建 @用户 前缀(群聊场景下需要在文本中包含 @用户名)
if response.at_user and incoming.sender_nick:
if response.markdown:
self.reply_markdown(
title="股票分析助手",
text=f"@{incoming.sender_nick} " + response.text,
incoming_message=incoming
)
else:
self.reply_text(response.text, incoming)
return AckMessage.STATUS_OK, 'OK'
except Exception as e:
self.logger.error(f"[DingTalk Stream] 处理消息失败: {e}")
self.logger.exception(e)
return AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)
def create_handler(self) -> '_ChatbotHandler':
"""创建 SDK 需要的处理器实例"""
return self._ChatbotHandler(self)
def _parse_stream_message(self, incoming: Any, raw_data: dict) -> Optional[BotMessage]:
"""
解析 Stream 消息为统一格式
Args:
incoming: ChatbotMessage 对象
raw_data: 原始回调数据
"""
try:
raw_data = dict(raw_data or {})
# 获取消息内容
raw_content = incoming.text.content if incoming.text else ''
# 提取命令(去除 @机器人)
content = self._extract_command(raw_content)
# 会话类型
conversation_type = getattr(incoming, 'conversation_type', None)
if conversation_type == '1':
chat_type = ChatType.PRIVATE
elif conversation_type == '2':
chat_type = ChatType.GROUP
else:
chat_type = ChatType.UNKNOWN
# 是否 @了机器人Stream 模式下收到的消息一般都是 @机器人的)
mentioned = True
# 提取 sessionWebhook便于异步推送
session_webhook = (
getattr(incoming, 'session_webhook', None)
or raw_data.get('sessionWebhook')
or raw_data.get('session_webhook')
)
if session_webhook:
raw_data['_session_webhook'] = session_webhook
return BotMessage(
platform='dingtalk',
message_id=getattr(incoming, 'msg_id', '') or '',
user_id=getattr(incoming, 'sender_id', '') or '',
user_name=getattr(incoming, 'sender_nick', '') or '',
chat_id=getattr(incoming, 'conversation_id', '') or '',
chat_type=chat_type,
content=content,
raw_content=raw_content,
mentioned=mentioned,
mentions=[],
timestamp=datetime.now(),
raw_data=raw_data,
)
except Exception as e:
logger.error(f"[DingTalk Stream] 解析消息失败: {e}")
return None
def _extract_command(self, text: str) -> str:
"""提取命令内容(去除 @机器人)"""
import re
text = re.sub(r'^@[\S]+\s*', '', text.strip())
return text.strip()
class DingtalkStreamClient:
"""
钉钉 Stream 模式客户端
封装 dingtalk-stream SDK提供简单的启动接口。
使用方式:
client = DingtalkStreamClient()
client.start() # 阻塞运行
# 或者在后台运行
client.start_background()
"""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None
):
"""
Args:
client_id: 应用 AppKey不传则从配置读取
client_secret: 应用 AppSecret不传则从配置读取
"""
if not DINGTALK_STREAM_AVAILABLE:
raise ImportError(
"dingtalk-stream SDK 未安装。\n"
"请运行: pip install dingtalk-stream"
)
from src.config import get_config
config = get_config()
self._client_id = client_id or getattr(config, 'dingtalk_app_key', None)
self._client_secret = client_secret or getattr(config, 'dingtalk_app_secret', None)
if not self._client_id or not self._client_secret:
raise ValueError(
"钉钉 Stream 模式需要配置 DINGTALK_APP_KEY 和 DINGTALK_APP_SECRET"
)
self._client: Optional[dingtalk_stream.DingTalkStreamClient] = None
self._background_thread: Optional[threading.Thread] = None
self._running = False
def _create_message_handler(self) -> Callable[[BotMessage], BotResponse]:
"""创建消息处理函数"""
def handle_message(message: BotMessage) -> BotResponse:
from bot.dispatcher import get_dispatcher
dispatcher = get_dispatcher()
return dispatcher.dispatch(message)
return handle_message
def start(self) -> None:
"""
启动 Stream 客户端(阻塞)
此方法会阻塞当前线程,直到客户端停止。
"""
logger.info("[DingTalk Stream] 正在启动...")
# 创建凭证
credential = dingtalk_stream.Credential(
self._client_id,
self._client_secret
)
# 创建客户端
self._client = dingtalk_stream.DingTalkStreamClient(credential)
# 注册消息处理器
handler = DingtalkStreamHandler(self._create_message_handler())
self._client.register_callback_handler(
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
handler.create_handler()
)
self._running = True
logger.info("[DingTalk Stream] 客户端已启动,等待消息...")
# 启动(阻塞)
self._client.start_forever()
def start_background(self) -> None:
"""
在后台线程启动 Stream 客户端(非阻塞)
适用于与其他服务(如 WebUI同时运行的场景。
"""
if self._background_thread and self._background_thread.is_alive():
logger.warning("[DingTalk Stream] 客户端已在运行")
return
self._running = True
self._background_thread = threading.Thread(
target=self._run_in_background,
daemon=True,
name="DingtalkStreamClient"
)
self._background_thread.start()
logger.info("[DingTalk Stream] 后台客户端已启动")
def _run_in_background(self) -> None:
"""后台运行(处理异常和重连)"""
while self._running:
try:
self.start()
except Exception as e:
logger.error(f"[DingTalk Stream] 运行异常: {e}")
if self._running:
logger.info("[DingTalk Stream] 5 秒后重连...")
import time
time.sleep(5)
def stop(self) -> None:
"""停止客户端"""
self._running = False
logger.info("[DingTalk Stream] 客户端已停止")
@property
def is_running(self) -> bool:
"""是否正在运行"""
return self._running
# 全局客户端实例
_stream_client: Optional[DingtalkStreamClient] = None
def get_dingtalk_stream_client() -> Optional[DingtalkStreamClient]:
"""获取全局 Stream 客户端实例"""
global _stream_client
if _stream_client is None and DINGTALK_STREAM_AVAILABLE:
try:
_stream_client = DingtalkStreamClient()
except (ImportError, ValueError) as e:
logger.warning(f"[DingTalk Stream] 无法创建客户端: {e}")
return None
return _stream_client
def start_dingtalk_stream_background() -> bool:
"""
在后台启动钉钉 Stream 客户端
Returns:
是否成功启动
"""
client = get_dingtalk_stream_client()
if client:
client.start_background()
return True
return False