feat: 载入各种服务

This commit is contained in:
DLmaster361
2025-08-05 22:50:07 +08:00
parent 4ca7f9053f
commit 6898e548a5
9 changed files with 290 additions and 527 deletions

View File

@@ -18,39 +18,28 @@
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA通知服务
v4.4
作者DLmaster_361
"""
import re
import smtplib
import time
import requests
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from pathlib import Path
from typing import Union
import requests
from PySide6.QtCore import QObject, Signal
from plyer import notification
from app.core import Config, logger
from app.utils.security import Crypto
from app.utils.ImageUtils import ImageUtils
from core import Config
from utils import get_logger, ImageUtils
logger = get_logger("通知服务")
class Notification(QObject):
class Notification:
push_info_bar = Signal(str, str, str, int)
def __init__(self, parent=None):
super().__init__(parent)
def __init__(self):
super().__init__()
def push_plyer(self, title, message, ticker, t) -> bool:
"""
@@ -63,19 +52,22 @@ class Notification(QObject):
:return: bool
"""
if Config.get(Config.notify_IfPushPlyer):
if Config.get("Notify", "IfPushPlyer"):
logger.info(f"推送系统通知:{title}", module="通知服务")
logger.info(f"推送系统通知:{title}")
notification.notify(
title=title,
message=message,
app_name="AUTO_MAA",
app_icon=str(Config.app_path / "resources/icons/AUTO_MAA.ico"),
timeout=t,
ticker=ticker,
toast=True,
)
if notification.notify is not None:
notification.notify(
title=title,
message=message,
app_name="AUTO_MAA",
app_icon=(Path.cwd() / "resources/icons/AUTO_MAA.ico").as_posix(),
timeout=t,
ticker=ticker,
toast=True,
)
else:
logger.error("plyer.notification 未正确导入,无法推送系统通知")
return True
@@ -90,12 +82,12 @@ class Notification(QObject):
"""
if (
Config.get(Config.notify_SMTPServerAddress) == ""
or Config.get(Config.notify_AuthorizationCode) == ""
Config.get("Notify", "SMTPServerAddress") == ""
or Config.get("Notify", "AuthorizationCode") == ""
or not bool(
re.match(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
Config.get(Config.notify_FromAddress),
Config.get("Notify", "FromAddress"),
)
)
or not bool(
@@ -106,336 +98,146 @@ class Notification(QObject):
)
):
logger.error(
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
module="通知服务",
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址"
)
self.push_info_bar.emit(
"error",
"邮件通知推送异常",
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
-1,
raise ValueError(
"The SMTP server address, authorization code, sender address, or recipient address is not set correctly."
)
return None
try:
# 定义邮件正文
if mode == "文本":
message = MIMEText(content, "plain", "utf-8")
elif mode == "网页":
message = MIMEMultipart("alternative")
message["From"] = formataddr(
(
Header("AUTO_MAA通知服务", "utf-8").encode(),
Config.get(Config.notify_FromAddress),
)
) # 发件人显示的名字
message["To"] = formataddr(
(Header("AUTO_MAA用户", "utf-8").encode(), to_address)
) # 收件人显示的名字
message["Subject"] = Header(title, "utf-8")
if mode == "网页":
message.attach(MIMEText(content, "html", "utf-8"))
smtpObj = smtplib.SMTP_SSL(Config.get(Config.notify_SMTPServerAddress), 465)
smtpObj.login(
Config.get(Config.notify_FromAddress),
Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
# 定义邮件正文
if mode == "文本":
message = MIMEText(content, "plain", "utf-8")
elif mode == "网页":
message = MIMEMultipart("alternative")
message["From"] = formataddr(
(
Header("AUTO_MAA通知服务", "utf-8").encode(),
Config.get("Notify", "FromAddress"),
)
smtpObj.sendmail(
Config.get(Config.notify_FromAddress), to_address, message.as_string()
)
smtpObj.quit()
logger.success(f"邮件发送成功:{title}", module="通知服务")
except Exception as e:
logger.exception(f"发送邮件时出错:{e}", module="通知服务")
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
) # 发件人显示的名字
message["To"] = formataddr(
(Header("AUTO_MAA用户", "utf-8").encode(), to_address)
) # 收件人显示的名字
message["Subject"] = str(Header(title, "utf-8"))
def ServerChanPush(
self, title, content, send_key, tag, channel
) -> Union[bool, str]:
if mode == "网页":
message.attach(MIMEText(content, "html", "utf-8"))
smtpObj = smtplib.SMTP_SSL(Config.get("Notify", "SMTPServerAddress"), 465)
smtpObj.login(
Config.get("Notify", "FromAddress"),
Config.get("Notify", "AuthorizationCode"),
)
smtpObj.sendmail(
Config.get("Notify", "FromAddress"), to_address, message.as_string()
)
smtpObj.quit()
logger.success(f"邮件发送成功:{title}")
def ServerChanPush(self, title, content, send_key) -> None:
"""
使用Server酱推送通知
:param title: 通知标题
:param content: 通知内容
:param send_key: Server酱的SendKey
:param tag: 通知标签
:param channel: 通知频道
:return: bool or str
"""
if not send_key:
logger.error("请正确设置Server酱的SendKey", module="通知服务")
self.push_info_bar.emit(
"error", "Server酱通知推送异常", "请正确设置Server酱的SendKey", -1
)
return None
raise ValueError("The ServerChan SendKey can not be empty.")
try:
# 构造 URL
if send_key.startswith("sctp"):
match = re.match(r"^sctp(\d+)t", send_key)
if match:
url = f"https://{match.group(1)}.push.ft07.com/send/{send_key}.send"
else:
raise ValueError("SendKey 格式错误sctp")
# 构造 URL
if send_key.startswith("sctp"):
match = re.match(r"^sctp(\d+)t", send_key)
if match:
url = f"https://{match.group(1)}.push.ft07.com/send/{send_key}.send"
else:
url = f"https://sctapi.ftqq.com/{send_key}.send"
raise ValueError("SendKey format is incorrect (sctp<int>).")
else:
url = f"https://sctapi.ftqq.com/{send_key}.send"
# 构建 tags 和 channel
def is_valid(s):
return s == "" or (
s == "|".join(s.split("|"))
and (s.count("|") == 0 or all(s.split("|")))
)
# 请求发送
params = {"title": title, "desp": content}
headers = {"Content-Type": "application/json;charset=utf-8"}
tags = "|".join(_.strip() for _ in tag.split("|"))
channels = "|".join(_.strip() for _ in channel.split("|"))
response = requests.post(
url, json=params, headers=headers, timeout=10, proxies=Config.get_proxies()
)
result = response.json()
options = {}
if is_valid(tags):
options["tags"] = tags
else:
logger.warning("Server酱 Tag 配置不正确,将被忽略", module="通知服务")
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
"请正确设置 ServerChan 的 Tag",
-1,
)
if result.get("code") == 0:
logger.success(f"Server酱推送通知成功{title}")
else:
raise Exception(f"ServerChan failed to send notification: {response.text}")
if is_valid(channels):
options["channel"] = channels
else:
logger.warning(
"Server酱 Channel 配置不正确,将被忽略", module="通知服务"
)
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
"请正确设置 ServerChan 的 Channel",
-1,
)
# 请求发送
params = {"title": title, "desp": content, **options}
headers = {"Content-Type": "application/json;charset=utf-8"}
response = requests.post(
url,
json=params,
headers=headers,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
result = response.json()
if result.get("code") == 0:
logger.success(f"Server酱推送通知成功{title}", module="通知服务")
return True
else:
error_code = result.get("code", "-1")
logger.exception(
f"Server酱通知推送失败响应码{error_code}", module="通知服务"
)
self.push_info_bar.emit(
"error", "Server酱通知推送失败", f"响应码:{error_code}", -1
)
return f"Server酱通知推送失败{error_code}"
except Exception as e:
logger.exception(f"Server酱通知推送异常{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"Server酱通知推送异常",
"请检查相关设置和网络连接。如全部配置正确,请稍后再试。",
-1,
)
return f"Server酱通知推送异常{str(e)}"
def CompanyWebHookBotPush(self, title, content, webhook_url) -> Union[bool, str]:
def WebHookPush(self, title, content, webhook_url) -> None:
"""
使用企业微信群机器人推送通知
WebHook 推送通知
:param title: 通知标题
:param content: 通知内容
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool or str
:param webhook_url: WebHook地址
"""
if webhook_url == "":
logger.error("请正确设置企业微信群机器人的WebHook地址", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"请正确设置企业微信群机器人的WebHook地址",
-1,
)
return None
if not webhook_url:
raise ValueError("The webhook URL can not be empty.")
content = f"{title}\n{content}"
data = {"msgtype": "text", "text": {"content": content}}
for _ in range(3):
try:
response = requests.post(
url=webhook_url,
json=data,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
info = response.json()
break
except Exception as e:
err = e
time.sleep(0.1)
else:
logger.error(f"推送企业微信群机器人时出错:{err}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
f"使用企业微信群机器人推送通知时出错:{err}",
-1,
)
return None
response = requests.post(
url=webhook_url, json=data, timeout=10, proxies=Config.get_proxies()
)
info = response.json()
if info["errcode"] == 0:
logger.success(f"企业微信群机器人推送通知成功:{title}", module="通知服务")
return True
logger.success(f"WebHook 推送通知成功:{title}")
else:
logger.error(f"企业微信群机器人推送通知失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
f"使用企业微信群机器人推送通知时出错:{err}",
-1,
)
return f"使用企业微信群机器人推送通知时出错:{err}"
raise Exception(f"WebHook failed to send notification: {response.text}")
def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> bool:
def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> None:
"""
使用企业微信群机器人推送图片通知
:param image_path: 图片文件路径
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool
"""
try:
# 压缩图片
ImageUtils.compress_image_if_needed(image_path)
if not webhook_url:
raise ValueError("The webhook URL can not be empty.")
# 检查图片是否存在
if not image_path.exists():
logger.error(
"图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确",
module="通知服务",
)
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"图片不存在或者压缩失败,请检查图片路径是否正确",
-1,
)
return False
# 压缩图片
ImageUtils.compress_image_if_needed(image_path)
if not webhook_url:
logger.error(
"请正确设置企业微信群机器人的WebHook地址", module="通知服务"
)
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"请正确设置企业微信群机器人的WebHook地址",
-1,
)
return False
# 检查图片是否存在
if not image_path.exists():
raise FileNotFoundError(f"File not found: {image_path}")
# 获取图片base64和md5
try:
image_base64 = ImageUtils.get_base64_from_file(str(image_path))
image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
except Exception as e:
logger.exception(f"图片编码或MD5计算失败{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
f"图片编码或MD5计算失败{e}",
-1,
)
return False
# 获取图片base64和md5
image_base64 = ImageUtils.get_base64_from_file(str(image_path))
image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
data = {
"msgtype": "image",
"image": {"base64": image_base64, "md5": image_md5},
}
data = {
"msgtype": "image",
"image": {"base64": image_base64, "md5": image_md5},
}
for _ in range(3):
try:
response = requests.post(
url=webhook_url,
json=data,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
info = response.json()
break
except requests.RequestException as e:
err = e
logger.exception(
f"推送企业微信群机器人图片第{_+1}次失败:{e}", module="通知服务"
)
time.sleep(0.1)
else:
logger.error("推送企业微信群机器人图片时出错", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"使用企业微信群机器人推送图片时出错:{err}",
-1,
)
return False
response = requests.post(
url=webhook_url, json=data, timeout=10, proxies=Config.get_proxies()
)
info = response.json()
if info.get("errcode") == 0:
logger.success(
f"企业微信群机器人推送图片成功:{image_path.name}",
module="通知服务",
)
return True
else:
logger.error(f"企业微信群机器人推送图片失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"使用企业微信群机器人推送图片时出错:{info}",
-1,
)
return False
except Exception as e:
logger.error(f"推送企业微信群机器人图片时发生未知异常:{e}")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"发生未知异常:{e}",
-1,
if info.get("errcode") == 0:
logger.success(f"企业微信群机器人推送图片成功:{image_path.name}")
else:
raise Exception(
f"Company WebHook Bot failed to send image: {response.text}"
)
return False
def send_test_notification(self):
def send_test_notification(self) -> None:
"""发送测试通知到所有已启用的通知渠道"""
logger.info("发送测试通知到所有已启用的通知渠道", module="通知服务")
logger.info("发送测试通知到所有已启用的通知渠道")
# 发送系统通知
self.push_plyer(
@@ -446,39 +248,35 @@ class Notification(QObject):
)
# 发送邮件通知
if Config.get(Config.notify_IfSendMail):
if Config.get("Notify", "IfSendMail"):
self.send_mail(
"文本",
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_ToAddress),
Config.get("Notify", "ToAddress"),
)
# 发送Server酱通知
if Config.get(Config.notify_IfServerChan):
if Config.get("Notify", "IfServerChan"):
self.ServerChanPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
Config.get("Notify", "ServerChanKey"),
)
# 发送企业微信机器人通知
if Config.get(Config.notify_IfCompanyWebHookBot):
self.CompanyWebHookBotPush(
# 发送WebHook通知
if Config.get("Notify", "IfCompanyWebHookBot"):
self.WebHookPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_CompanyWebHookBotUrl),
Config.get("Notify", "CompanyWebHookBotUrl"),
)
Notify.CompanyWebHookBotPushImage(
Config.app_path / "resources/images/notification/test_notify.png",
Config.get(Config.notify_CompanyWebHookBotUrl),
Path.cwd() / "resources/images/notification/test_notify.png",
Config.get("Notify", "CompanyWebHookBotUrl"),
)
logger.info("测试通知发送完成", module="通知服务")
return True
logger.success("测试通知发送完成")
Notify = Notification()

View File

@@ -18,14 +18,7 @@
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA系统服务
v4.4
作者DLmaster_361
"""
from PySide6.QtWidgets import QApplication
import sys
import ctypes
import win32gui
@@ -37,7 +30,10 @@ import getpass
from datetime import datetime
from pathlib import Path
from app.core import Config
from core import Config
from utils.logger import get_logger
logger = get_logger("系统服务")
class _SystemHandler:
@@ -53,7 +49,7 @@ class _SystemHandler:
def set_Sleep(self) -> None:
"""同步系统休眠状态"""
if Config.get(Config.function_IfAllowSleep):
if Config.get("Function", "IfAllowSleep"):
# 设置系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(
self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
@@ -65,7 +61,9 @@ class _SystemHandler:
def set_SelfStart(self) -> None:
"""同步开机自启"""
if Config.get(Config.start_IfSelfStart) and not self.is_startup():
return None # 目前不支持开机自启
if Config.get("Function", "IfSelfStart") and not self.is_startup():
# 创建任务计划
try:
@@ -164,7 +162,7 @@ class _SystemHandler:
pass
except Exception as e:
logger.exception(f"程序自启动任务计划创建失败: {e}", module="系统服务")
logger.exception(f"程序自启动任务计划创建失败: {e}")
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
@@ -179,7 +177,7 @@ class _SystemHandler:
)
if result.returncode == 0:
logger.success("程序自启动任务计划已删除", module="系统服务")
logger.success("程序自启动任务计划已删除")
else:
logger.error(
f"程序自启动任务计划删除失败: {result.stderr}",
@@ -187,7 +185,7 @@ class _SystemHandler:
)
except Exception as e:
logger.exception(f"程序自启动任务计划删除失败: {e}", module="系统服务")
logger.exception(f"程序自启动任务计划删除失败: {e}")
def set_power(self, mode) -> None:
"""
@@ -200,69 +198,65 @@ class _SystemHandler:
if mode == "NoAction":
logger.info("不执行系统电源操作", module="系统服务")
logger.info("不执行系统电源操作")
elif mode == "Shutdown":
self.kill_emulator_processes()
logger.info("执行关机操作", module="系统服务")
logger.info("执行关机操作")
subprocess.run(["shutdown", "/s", "/t", "0"])
elif mode == "ShutdownForce":
logger.info("执行强制关机操作", module="系统服务")
logger.info("执行强制关机操作")
subprocess.run(["shutdown", "/s", "/t", "0", "/f"])
elif mode == "Hibernate":
logger.info("执行休眠操作", module="系统服务")
logger.info("执行休眠操作")
subprocess.run(["shutdown", "/h"])
elif mode == "Sleep":
logger.info("执行睡眠操作", module="系统服务")
logger.info("执行睡眠操作")
subprocess.run(
["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"]
)
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
logger.info("执行退出主程序操作")
sys.exit(0)
elif sys.platform.startswith("linux"):
if mode == "NoAction":
logger.info("不执行系统电源操作", module="系统服务")
logger.info("不执行系统电源操作")
elif mode == "Shutdown":
logger.info("执行关机操作", module="系统服务")
logger.info("执行关机操作")
subprocess.run(["shutdown", "-h", "now"])
elif mode == "Hibernate":
logger.info("执行休眠操作", module="系统服务")
logger.info("执行休眠操作")
subprocess.run(["systemctl", "hibernate"])
elif mode == "Sleep":
logger.info("执行睡眠操作", module="系统服务")
logger.info("执行睡眠操作")
subprocess.run(["systemctl", "suspend"])
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
logger.info("执行退出主程序操作")
sys.exit(0)
def kill_emulator_processes(self):
"""这里暂时仅支持 MuMu 模拟器"""
logger.info("正在清除模拟器进程", module="系统服务")
logger.info("正在清除模拟器进程")
keywords = ["Nemu", "nemu", "emulator", "MuMu"]
for proc in psutil.process_iter(["pid", "name"]):
@@ -277,7 +271,7 @@ class _SystemHandler:
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
logger.success("模拟器进程清除完成", module="系统服务")
logger.success("模拟器进程清除完成")
def is_startup(self) -> bool:
"""判断程序是否已经开机自启"""
@@ -292,7 +286,7 @@ class _SystemHandler:
)
return result.returncode == 0
except Exception as e:
logger.exception(f"检查任务计划程序失败: {e}", module="系统服务")
logger.exception(f"检查任务计划程序失败: {e}")
return False
def get_window_info(self) -> list:
@@ -316,7 +310,7 @@ class _SystemHandler:
:param path: 进程路径
"""
logger.info(f"开始中止进程: {path}", module="系统服务")
logger.info(f"开始中止进程: {path}")
for pid in self.search_pids(path):
killprocess = subprocess.Popen(
@@ -326,7 +320,7 @@ class _SystemHandler:
)
killprocess.wait()
logger.success(f"进程已中止: {path}", module="系统服务")
logger.success(f"进程已中止: {path}")
def search_pids(self, path: Path) -> list:
"""
@@ -336,7 +330,7 @@ class _SystemHandler:
:return: 匹配的进程PID列表
"""
logger.info(f"开始查找进程 PID: {path}", module="系统服务")
logger.info(f"开始查找进程 PID: {path}")
pids = []
for proc in psutil.process_iter(["pid", "exe"]):