refactor(notification): 重构通知模块
This commit is contained in:
@@ -40,7 +40,6 @@ from typing import Union, List, Dict
|
|||||||
|
|
||||||
from app.core import Config, MaaConfig, MaaUserConfig
|
from app.core import Config, MaaConfig, MaaUserConfig
|
||||||
from app.services import Notify, System
|
from app.services import Notify, System
|
||||||
from app.services.notification import Notification, UserNotification
|
|
||||||
|
|
||||||
|
|
||||||
class MaaManager(QObject):
|
class MaaManager(QObject):
|
||||||
@@ -1773,21 +1772,22 @@ class MaaManager(QObject):
|
|||||||
message_html = template.render(message)
|
message_html = template.render(message)
|
||||||
|
|
||||||
# 发送全局通知
|
# 发送全局通知
|
||||||
Notify.send_mail("网页", title, message_html)
|
Notify.send_mail("网页", title, message_html,Config.get(Config.notify_ToAddress))
|
||||||
Notify.ServerChanPush(title, f"{message_text}\n\nAUTO_MAA 敬上")
|
serverchan_message = message_text.replace("\n", "\n\n")
|
||||||
Notify.CompanyWebHookBotPush(title, f"{message_text}\n\nAUTO_MAA 敬上")
|
Notify.ServerChanPush(title, f"{serverchan_message}\n\nAUTO_MAA 敬上",Config.get(Config.notify_ServerChanKey),Config.get(Config.notify_ServerChanTag),Config.get(Config.notify_ServerChanChannel))
|
||||||
|
Notify.CompanyWebHookBotPush(title, f"{message_text}\n\nAUTO_MAA 敬上",Config.get(Config.notify_CompanyWebHookBotUrl))
|
||||||
|
|
||||||
# 发送用户单独通知
|
# # 发送用户单独通知
|
||||||
for user_name in message["failed_user"].split("、") + message["waiting_user"].split("、"):
|
# for user_name in message["failed_user"].split("、") + message["waiting_user"].split("、"):
|
||||||
if not user_name: # 跳过空字符串
|
# if not user_name: # 跳过空字符串
|
||||||
continue
|
# continue
|
||||||
user_config = Config.member_dict.get(user_name)
|
# user_config = Config.member_dict.get(user_name)
|
||||||
if user_config and user_config.get(user_config.Notify_Enable):
|
# if user_config and user_config.get(user_config.Notify_Enable):
|
||||||
user_notify = UserNotification(user_config)
|
# user_notify = UserNotification(user_config)
|
||||||
user_notify.send_notification(
|
# user_notify.send_notification(
|
||||||
f"{self.mode[2:4]}任务未完成通知",
|
# f"{self.mode[2:4]}任务未完成通知",
|
||||||
f"您的{self.mode[2:4]}任务未完成,请检查相关设置。\n\n{message_text}"
|
# f"您的{self.mode[2:4]}任务未完成,请检查相关设置。\n\n{message_text}"
|
||||||
)
|
# )
|
||||||
|
|
||||||
return message_text
|
return message_text
|
||||||
|
|
||||||
@@ -1818,22 +1818,22 @@ class MaaManager(QObject):
|
|||||||
message_html = template.render(message)
|
message_html = template.render(message)
|
||||||
|
|
||||||
# 发送全局通知
|
# 发送全局通知
|
||||||
Notify.send_mail("网页", title, message_html)
|
Notify.send_mail("网页", title, message_html,Config.get(Config.notify_ToAddress))
|
||||||
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
|
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
|
||||||
serverchan_message = message_text.replace("\n", "\n\n")
|
serverchan_message = message_text.replace("\n", "\n\n")
|
||||||
Notify.ServerChanPush(title, f"{serverchan_message}\n\nAUTO_MAA 敬上")
|
Notify.ServerChanPush(title, f"{serverchan_message}\n\nAUTO_MAA 敬上",Config.get(Config.notify_ServerChanKey),Config.get(Config.notify_ServerChanTag),Config.get(Config.notify_ServerChanChannel))
|
||||||
Notify.CompanyWebHookBotPush(title, f"{message_text}\n\nAUTO_MAA 敬上")
|
Notify.CompanyWebHookBotPush(title, f"{message_text}\n\nAUTO_MAA 敬上",Config.get(Config.notify_CompanyWebHookBotUrl))
|
||||||
|
|
||||||
# 发送用户单独通知
|
# # 发送用户单独通知
|
||||||
user_name = message.get("user_info")
|
# user_name = message.get("user_info")
|
||||||
if user_name:
|
# if user_name:
|
||||||
user_config = Config.member_dict.get(user_name)
|
# user_config = Config.member_dict.get(user_name)
|
||||||
if user_config and user_config.get(user_config.Notify_Enable):
|
# if user_config and user_config.get(user_config.Notify_Enable):
|
||||||
user_notify = UserNotification(user_config)
|
# user_notify = UserNotification(user_config)
|
||||||
user_notify.send_notification(
|
# user_notify.send_notification(
|
||||||
f"{self.mode[2:4]}任务统计报告",
|
# f"{self.mode[2:4]}任务统计报告",
|
||||||
f"您的{self.mode[2:4]}任务统计报告如下:\n\n{message_text}"
|
# f"您的{self.mode[2:4]}任务统计报告如下:\n\n{message_text}"
|
||||||
)
|
# )
|
||||||
|
|
||||||
elif mode == "公招六星":
|
elif mode == "公招六星":
|
||||||
# 生成HTML通知内容
|
# 生成HTML通知内容
|
||||||
@@ -1841,17 +1841,17 @@ class MaaManager(QObject):
|
|||||||
message_html = template.render(message)
|
message_html = template.render(message)
|
||||||
|
|
||||||
# 发送全局通知
|
# 发送全局通知
|
||||||
Notify.send_mail("网页", title, message_html)
|
Notify.send_mail("网页", title, message_html,Config.get(Config.notify_ToAddress))
|
||||||
Notify.ServerChanPush(title, "好羡慕~\n\nAUTO_MAA 敬上")
|
Notify.ServerChanPush(title, "好羡慕~\n\nAUTO_MAA 敬上",Config.get(Config.notify_ServerChanKey),Config.get(Config.notify_ServerChanTag),Config.get(Config.notify_ServerChanChannel))
|
||||||
Notify.CompanyWebHookBotPush(title, "好羡慕~\n\nAUTO_MAA 敬上")
|
Notify.CompanyWebHookBotPush(title, "好羡慕~\n\nAUTO_MAA 敬上",Config.get(Config.notify_CompanyWebHookBotUrl))
|
||||||
|
|
||||||
# 发送用户单独通知
|
# # 发送用户单独通知
|
||||||
user_name = message.get("user_name")
|
# user_name = message.get("user_name")
|
||||||
if user_name:
|
# if user_name:
|
||||||
user_config = Config.member_dict.get(user_name)
|
# user_config = Config.member_dict.get(user_name)
|
||||||
if user_config and user_config.get(user_config.Notify_Enable):
|
# if user_config and user_config.get(user_config.Notify_Enable):
|
||||||
user_notify = UserNotification(user_config)
|
# user_notify = UserNotification(user_config)
|
||||||
user_notify.send_notification(
|
# user_notify.send_notification(
|
||||||
"公招六星通知",
|
# "公招六星通知",
|
||||||
"恭喜您在公招中获得了六星干员!"
|
# "恭喜您在公招中获得了六星干员!"
|
||||||
)
|
# )
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ class Notification(QWidget):
|
|||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def send_mail(self, mode, title, content) -> None:
|
def send_mail(self, mode, title, content, to_address) -> None:
|
||||||
"""推送邮件通知"""
|
"""推送邮件通知"""
|
||||||
|
|
||||||
if Config.get(Config.notify_IfSendMail):
|
if Config.get(Config.notify_IfSendMail):
|
||||||
@@ -84,7 +84,7 @@ class Notification(QWidget):
|
|||||||
or not bool(
|
or not bool(
|
||||||
re.match(
|
re.match(
|
||||||
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
|
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
|
||||||
Config.get(Config.notify_ToAddress),
|
to_address,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
@@ -114,7 +114,7 @@ class Notification(QWidget):
|
|||||||
message["To"] = formataddr(
|
message["To"] = formataddr(
|
||||||
(
|
(
|
||||||
Header("AUTO_MAA用户", "utf-8").encode(),
|
Header("AUTO_MAA用户", "utf-8").encode(),
|
||||||
Config.get(Config.notify_ToAddress),
|
to_address,
|
||||||
)
|
)
|
||||||
) # 收件人显示的名字
|
) # 收件人显示的名字
|
||||||
message["Subject"] = Header(title, "utf-8")
|
message["Subject"] = Header(title, "utf-8")
|
||||||
@@ -132,20 +132,21 @@ class Notification(QWidget):
|
|||||||
)
|
)
|
||||||
smtpObj.sendmail(
|
smtpObj.sendmail(
|
||||||
Config.get(Config.notify_FromAddress),
|
Config.get(Config.notify_FromAddress),
|
||||||
Config.get(Config.notify_ToAddress),
|
to_address,
|
||||||
message.as_string(),
|
message.as_string(),
|
||||||
)
|
)
|
||||||
smtpObj.quit()
|
smtpObj.quit()
|
||||||
logger.success("邮件发送成功")
|
logger.success("邮件发送成功")
|
||||||
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"发送邮件时出错:\n{e}")
|
logger.error(f"发送邮件时出错:\n{e}")
|
||||||
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
|
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
|
||||||
|
return None
|
||||||
|
return None
|
||||||
|
|
||||||
def ServerChanPush(self, title, content):
|
def ServerChanPush(self, title, content, send_key, tag, channel):
|
||||||
"""使用Server酱推送通知(支持 tag 和 channel,避免使用SDK)"""
|
"""使用Server酱推送通知"""
|
||||||
if Config.get(Config.notify_IfServerChan):
|
if Config.get(Config.notify_IfServerChan):
|
||||||
send_key = Config.get(Config.notify_ServerChanKey)
|
|
||||||
|
|
||||||
if not send_key:
|
if not send_key:
|
||||||
logger.error("请正确设置Server酱的SendKey")
|
logger.error("请正确设置Server酱的SendKey")
|
||||||
self.push_info_bar.emit(
|
self.push_info_bar.emit(
|
||||||
@@ -173,11 +174,11 @@ class Notification(QWidget):
|
|||||||
|
|
||||||
tags = "|".join(
|
tags = "|".join(
|
||||||
_.strip()
|
_.strip()
|
||||||
for _ in Config.get(Config.notify_ServerChanTag).split("|")
|
for _ in tag.split("|")
|
||||||
)
|
)
|
||||||
channels = "|".join(
|
channels = "|".join(
|
||||||
_.strip()
|
_.strip()
|
||||||
for _ in Config.get(Config.notify_ServerChanChannel).split("|")
|
for _ in channel.split("|")
|
||||||
)
|
)
|
||||||
|
|
||||||
options = {}
|
options = {}
|
||||||
@@ -227,12 +228,13 @@ class Notification(QWidget):
|
|||||||
"error", "Server酱通知推送异常", f"请检查相关设置,如还有问题可联系开发者", -1
|
"error", "Server酱通知推送异常", f"请检查相关设置,如还有问题可联系开发者", -1
|
||||||
)
|
)
|
||||||
return f"Server酱通知推送异常:{str(e)}"
|
return f"Server酱通知推送异常:{str(e)}"
|
||||||
|
return None
|
||||||
|
|
||||||
def CompanyWebHookBotPush(self, title, content):
|
def CompanyWebHookBotPush(self, title, content,webhook_url):
|
||||||
"""使用企业微信群机器人推送通知"""
|
"""使用企业微信群机器人推送通知"""
|
||||||
if Config.get(Config.notify_IfCompanyWebHookBot):
|
if Config.get(Config.notify_IfCompanyWebHookBot):
|
||||||
|
|
||||||
if Config.get(Config.notify_CompanyWebHookBotUrl) == "":
|
if webhook_url == "":
|
||||||
logger.error("请正确设置企业微信群机器人的WebHook地址")
|
logger.error("请正确设置企业微信群机器人的WebHook地址")
|
||||||
self.push_info_bar.emit(
|
self.push_info_bar.emit(
|
||||||
"error",
|
"error",
|
||||||
@@ -244,11 +246,11 @@ class Notification(QWidget):
|
|||||||
|
|
||||||
content = f"{title}\n{content}"
|
content = f"{title}\n{content}"
|
||||||
data = {"msgtype": "text", "text": {"content": content}}
|
data = {"msgtype": "text", "text": {"content": content}}
|
||||||
# 从远程服务器获取最新主题图像
|
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
try:
|
try:
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
url=Config.get(Config.notify_CompanyWebHookBotUrl),
|
url=webhook_url,
|
||||||
json=data,
|
json=data,
|
||||||
timeout=10,
|
timeout=10,
|
||||||
)
|
)
|
||||||
@@ -279,6 +281,7 @@ class Notification(QWidget):
|
|||||||
-1,
|
-1,
|
||||||
)
|
)
|
||||||
return f'使用企业微信群机器人推送通知时出错:{info["errmsg"]}'
|
return f'使用企业微信群机器人推送通知时出错:{info["errmsg"]}'
|
||||||
|
return None
|
||||||
|
|
||||||
def send_test_notification(self):
|
def send_test_notification(self):
|
||||||
"""发送测试通知到所有已启用的通知渠道"""
|
"""发送测试通知到所有已启用的通知渠道"""
|
||||||
@@ -296,6 +299,7 @@ class Notification(QWidget):
|
|||||||
"文本",
|
"文本",
|
||||||
"AUTO_MAA测试通知",
|
"AUTO_MAA测试通知",
|
||||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||||
|
Config.get(Config.notify_ToAddress),
|
||||||
)
|
)
|
||||||
|
|
||||||
# 发送Server酱通知
|
# 发送Server酱通知
|
||||||
@@ -303,6 +307,9 @@ class Notification(QWidget):
|
|||||||
self.ServerChanPush(
|
self.ServerChanPush(
|
||||||
"AUTO_MAA测试通知",
|
"AUTO_MAA测试通知",
|
||||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||||
|
Config.get(Config.notify_ServerChanKey),
|
||||||
|
Config.get(Config.notify_ServerChanTag),
|
||||||
|
Config.get(Config.notify_ServerChanChannel),
|
||||||
)
|
)
|
||||||
|
|
||||||
# 发送企业微信机器人通知
|
# 发送企业微信机器人通知
|
||||||
@@ -310,184 +317,10 @@ class Notification(QWidget):
|
|||||||
self.CompanyWebHookBotPush(
|
self.CompanyWebHookBotPush(
|
||||||
"AUTO_MAA测试通知",
|
"AUTO_MAA测试通知",
|
||||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||||
|
Config.get(Config.notify_CompanyWebHookBotUrl),
|
||||||
)
|
)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class UserNotification:
|
|
||||||
"""用户单独通知服务"""
|
|
||||||
|
|
||||||
def __init__(self, user_config):
|
|
||||||
self.config = user_config
|
|
||||||
|
|
||||||
def send_notification(self, title: str, content: str) -> bool:
|
|
||||||
"""发送用户通知"""
|
|
||||||
logger.info(f"单独通知-准备发送用户通知,标题: {title}")
|
|
||||||
|
|
||||||
if not self.config.get(self.config.Notify_Enable):
|
|
||||||
logger.warning("单独通知-用户通知功能未启用,跳过发送")
|
|
||||||
return False
|
|
||||||
|
|
||||||
success = False
|
|
||||||
|
|
||||||
# 邮件通知
|
|
||||||
if self._check_smtp_config():
|
|
||||||
try:
|
|
||||||
self._send_email(title, content)
|
|
||||||
success = True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"单独通知-发送邮件通知失败: {str(e)}")
|
|
||||||
|
|
||||||
# Server酱通知
|
|
||||||
if (
|
|
||||||
self.config.get(self.config.Notify_IfServerChan)
|
|
||||||
and self._check_serverchan_config()
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
self._send_serverchan(title, content)
|
|
||||||
success = True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"单独通知-发送 Server酱 通知失败: {str(e)}")
|
|
||||||
|
|
||||||
# 企业微信群机器人
|
|
||||||
if (
|
|
||||||
self.config.get(self.config.Notify_IfCompanyWebHookBot)
|
|
||||||
and self._check_webhook_config()
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
self._send_webhook(title, content)
|
|
||||||
success = True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"单独通知-发送企业微信机器人通知失败: {str(e)}")
|
|
||||||
|
|
||||||
if success:
|
|
||||||
logger.info("单独通知-用户通知发送完成")
|
|
||||||
else:
|
|
||||||
logger.warning("单独通知-所有通知方式均发送失败")
|
|
||||||
|
|
||||||
return success
|
|
||||||
|
|
||||||
def _check_smtp_config(self) -> bool:
|
|
||||||
"""检查SMTP配置是否完整"""
|
|
||||||
return all([
|
|
||||||
self.config.get(self.config.Notify_IfSMTP),
|
|
||||||
self.config.get(self.config.Notify_SMTPServerAddress),
|
|
||||||
self.config.get(self.config.Notify_AuthorizationCode),
|
|
||||||
self.config.get(self.config.Notify_FromAddress),
|
|
||||||
self.config.get(self.config.Notify_ToAddress)
|
|
||||||
])
|
|
||||||
|
|
||||||
def _check_serverchan_config(self) -> bool:
|
|
||||||
"""检查ServerChan配置是否完整"""
|
|
||||||
return bool(self.config.get(self.config.Notify_ServerChanKey))
|
|
||||||
|
|
||||||
def _check_webhook_config(self) -> bool:
|
|
||||||
"""检查企业微信机器人配置是否完整"""
|
|
||||||
return bool(self.config.get(self.config.Notify_CompanyWebHookBotUrl))
|
|
||||||
|
|
||||||
def _send_email(self, title: str, content: str):
|
|
||||||
"""发送邮件通知"""
|
|
||||||
logger.debug("单独通知-开始发送邮件通知")
|
|
||||||
import smtplib
|
|
||||||
from email.mime.text import MIMEText
|
|
||||||
from email.header import Header
|
|
||||||
|
|
||||||
msg = MIMEText(content, 'plain', 'utf-8')
|
|
||||||
msg['Subject'] = Header(title, 'utf-8')
|
|
||||||
msg['From'] = self.config.get(self.config.Notify_FromAddress)
|
|
||||||
msg['To'] = self.config.get(self.config.Notify_ToAddress)
|
|
||||||
|
|
||||||
server = smtplib.SMTP_SSL(self.config.get(self.config.Notify_SMTPServerAddress))
|
|
||||||
server.login(
|
|
||||||
self.config.get(self.config.Notify_FromAddress),
|
|
||||||
self.config.get(self.config.Notify_AuthorizationCode)
|
|
||||||
)
|
|
||||||
server.send_message(msg)
|
|
||||||
server.quit()
|
|
||||||
logger.success("单独通知-邮件通知发送成功")
|
|
||||||
|
|
||||||
def _send_serverchan(self, title: str, content: str):
|
|
||||||
"""发送 ServerChan 通知,支持 SCT、SC3、自定义域名等"""
|
|
||||||
logger.debug("单独通知-开始发送 ServerChan 通知")
|
|
||||||
import requests
|
|
||||||
import re
|
|
||||||
|
|
||||||
key = self.config.get(self.config.Notify_ServerChanKey)
|
|
||||||
tag = self.config.get(self.config.Notify_ServerChanTag)
|
|
||||||
channel = self.config.get(self.config.Notify_ServerChanChannel)
|
|
||||||
|
|
||||||
if not key:
|
|
||||||
raise Exception("ServerChan SendKey 未设置")
|
|
||||||
|
|
||||||
# 1. 构造 URL(支持 sctpN 和 sct 开头的)
|
|
||||||
if key.startswith("sctp"):
|
|
||||||
match = re.match(r"^sctp(\d+)t", key)
|
|
||||||
if match:
|
|
||||||
url = f"https://{match.group(1)}.push.ft07.com/send/{key}.send"
|
|
||||||
else:
|
|
||||||
raise ValueError("SendKey 格式错误,sctp 开头但不符合规范")
|
|
||||||
else:
|
|
||||||
url = f"https://sctapi.ftqq.com/{key}.send"
|
|
||||||
|
|
||||||
logger.debug(f"单独通知-Server酱推送URL: {url}")
|
|
||||||
|
|
||||||
# 2. 校验 tag 和 channel 格式
|
|
||||||
def is_valid(s):
|
|
||||||
return s == "" or (
|
|
||||||
s == "|".join(s.split("|")) and (s.count("|") == 0 or all(s.split("|")))
|
|
||||||
)
|
|
||||||
|
|
||||||
tags = "|".join([_.strip() for _ in tag.split("|")]) if tag else ""
|
|
||||||
channels = "|".join([_.strip() for _ in channel.split("|")]) if channel else ""
|
|
||||||
|
|
||||||
options = {}
|
|
||||||
if is_valid(tags):
|
|
||||||
options["tags"] = tags
|
|
||||||
else:
|
|
||||||
logger.warning("单独通知-ServerChan Tag 格式不正确,已忽略")
|
|
||||||
|
|
||||||
if is_valid(channels):
|
|
||||||
options["channel"] = channels
|
|
||||||
else:
|
|
||||||
logger.warning("单独通知-ServerChan Channel 格式不正确,已忽略")
|
|
||||||
|
|
||||||
# 3. 构造 payload
|
|
||||||
payload = {"title": title, "desp": content, **options}
|
|
||||||
|
|
||||||
headers = {"Content-Type": "application/json;charset=utf-8"}
|
|
||||||
logger.info(f"单独通知-发送 Server酱通知: {payload}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = requests.post(url, json=payload, headers=headers, timeout=10)
|
|
||||||
result = response.json()
|
|
||||||
|
|
||||||
if result.get("code") == 0:
|
|
||||||
logger.success("Server酱通知推送成功")
|
|
||||||
else:
|
|
||||||
raise Exception(
|
|
||||||
f"推送失败,响应码:{result.get('code')}, 信息:{result}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise Exception(f"Server酱推送失败: {e}")
|
|
||||||
|
|
||||||
def _send_webhook(self, title: str, content: str):
|
|
||||||
"""发送企业微信机器人通知"""
|
|
||||||
logger.debug("单独通知-开始发送企业微信机器人通知")
|
|
||||||
import requests
|
|
||||||
|
|
||||||
url = self.config.get(self.config.Notify_CompanyWebHookBotUrl)
|
|
||||||
data = {
|
|
||||||
"msgtype": "markdown",
|
|
||||||
"markdown": {
|
|
||||||
"content": f"### {title}\n{content}"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
response = requests.post(url, json=data)
|
|
||||||
if response.status_code != 200:
|
|
||||||
raise Exception(f"企业微信机器人API返回错误: {response.text}")
|
|
||||||
logger.success("单独通知-企业微信机器人通知发送成功")
|
|
||||||
|
|
||||||
|
|
||||||
Notify = Notification()
|
Notify = Notification()
|
||||||
|
|||||||
Reference in New Issue
Block a user