feat(core): 初步完成新增用户单独通知功能

This commit is contained in:
2025-05-18 01:57:48 +08:00
parent c0f887ff9d
commit 0d904b229e
4 changed files with 381 additions and 3 deletions

View File

@@ -313,4 +313,126 @@ class Notification(QWidget):
return True
class UserNotification:
"""用户单独通知服务"""
def __init__(self, user_config):
self.config = user_config
def send_notification(self, title: str, content: str) -> bool:
"""发送用户通知
Args:
title: 通知标题
content: 通知内容
Returns:
bool: 是否发送成功
"""
if not self.config.get(self.config.Notify_Enable):
return False
success = False
# 发送邮件通知
if self._check_smtp_config():
try:
self._send_email(title, content)
success = True
except Exception as e:
logger.error(f"发送邮件通知失败: {str(e)}")
# 发送ServerChan通知
if self.config.get(self.config.Notify_IfServerChan) and self._check_serverchan_config():
try:
self._send_serverchan(title, content)
success = True
except Exception as e:
logger.error(f"发送ServerChan通知失败: {str(e)}")
# 发送企业微信机器人通知
if self.config.get(self.config.Notify_IfCompanyWebHookBot) and self._check_webhook_config():
try:
self._send_webhook(title, content)
success = True
except Exception as e:
logger.error(f"发送企业微信机器人通知失败: {str(e)}")
return success
def _check_smtp_config(self) -> bool:
"""检查SMTP配置是否完整"""
return all([
self.config.get(self.config.Notify_IfSMTP),
self.config.get(self.config.Notify_SMTPServerAddress),
self.config.get(self.config.Notify_AuthorizationCode),
self.config.get(self.config.Notify_FromAddress),
self.config.get(self.config.Notify_ToAddress)
])
def _check_serverchan_config(self) -> bool:
"""检查ServerChan配置是否完整"""
return bool(self.config.get(self.config.Notify_ServerChanKey))
def _check_webhook_config(self) -> bool:
"""检查企业微信机器人配置是否完整"""
return bool(self.config.get(self.config.Notify_CompanyWebHookBotUrl))
def _send_email(self, title: str, content: str):
"""发送邮件通知"""
import smtplib
from email.mime.text import MIMEText
from email.header import Header
msg = MIMEText(content, 'plain', 'utf-8')
msg['Subject'] = Header(title, 'utf-8')
msg['From'] = self.config.get(self.config.Notify_FromAddress)
msg['To'] = self.config.get(self.config.Notify_ToAddress)
server = smtplib.SMTP_SSL(self.config.get(self.config.Notify_SMTPServerAddress))
server.login(
self.config.get(self.config.Notify_FromAddress),
self.config.get(self.config.Notify_AuthorizationCode)
)
server.send_message(msg)
server.quit()
def _send_serverchan(self, title: str, content: str):
"""发送ServerChan通知"""
import requests
key = self.config.get(self.config.Notify_ServerChanKey)
channel = self.config.get(self.config.Notify_ServerChanChannel)
tag = self.config.get(self.config.Notify_ServerChanTag)
url = f"https://sctapi.ftqq.com/{key}.send"
data = {
"title": title,
"desp": content,
"channel": channel if channel else 9, # 默认使用企业微信通道
"tag": tag if tag else ""
}
response = requests.post(url, data=data)
if response.status_code != 200:
raise Exception(f"ServerChan API返回错误: {response.text}")
def _send_webhook(self, title: str, content: str):
"""发送企业微信机器人通知"""
import requests
import json
url = self.config.get(self.config.Notify_CompanyWebHookBotUrl)
data = {
"msgtype": "markdown",
"markdown": {
"content": f"### {title}\n{content}"
}
}
response = requests.post(url, json=data)
if response.status_code != 200:
raise Exception(f"企业微信机器人API返回错误: {response.text}")
Notify = Notification()