♻️重构, 实现基础框架-日志 AUTO_MAA配置 用户配置基类
This commit is contained in:
@@ -1,37 +0,0 @@
|
||||
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
|
||||
# Copyright © 2024-2025 DLmaster361
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# Contact: DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA服务包
|
||||
v4.4
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
__version__ = "4.2.0"
|
||||
__author__ = "DLmaster361 <DLmaster_361@163.com>"
|
||||
__license__ = "GPL-3.0 license"
|
||||
|
||||
from .notification import Notify
|
||||
from .security import Crypto
|
||||
from .system import System
|
||||
from .skland import skland_sign_in
|
||||
|
||||
__all__ = ["Notify", "Crypto", "System", "skland_sign_in"]
|
||||
@@ -1,484 +0,0 @@
|
||||
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
|
||||
# Copyright © 2024-2025 DLmaster361
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# Contact: DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA通知服务
|
||||
v4.4
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
import re
|
||||
import smtplib
|
||||
import time
|
||||
from email.header import Header
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from email.utils import formataddr
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
import requests
|
||||
from PySide6.QtCore import QObject, Signal
|
||||
|
||||
from plyer import notification
|
||||
|
||||
from app.core import Config, logger
|
||||
from app.services.security import Crypto
|
||||
from app.utils.ImageUtils import ImageUtils
|
||||
|
||||
|
||||
class Notification(QObject):
|
||||
|
||||
push_info_bar = Signal(str, str, str, int)
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
|
||||
def push_plyer(self, title, message, ticker, t) -> bool:
|
||||
"""
|
||||
推送系统通知
|
||||
|
||||
:param title: 通知标题
|
||||
:param message: 通知内容
|
||||
:param ticker: 通知横幅
|
||||
:param t: 通知持续时间
|
||||
:return: bool
|
||||
"""
|
||||
|
||||
if Config.get(Config.notify_IfPushPlyer):
|
||||
|
||||
logger.info(f"推送系统通知:{title}", module="通知服务")
|
||||
|
||||
notification.notify(
|
||||
title=title,
|
||||
message=message,
|
||||
app_name="AUTO_MAA",
|
||||
app_icon=str(Config.app_path / "resources/icons/AUTO_MAA.ico"),
|
||||
timeout=t,
|
||||
ticker=ticker,
|
||||
toast=True,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def send_mail(self, mode, title, content, to_address) -> None:
|
||||
"""
|
||||
推送邮件通知
|
||||
|
||||
:param mode: 邮件内容模式,支持 "文本" 和 "网页"
|
||||
:param title: 邮件标题
|
||||
:param content: 邮件内容
|
||||
:param to_address: 收件人地址
|
||||
"""
|
||||
|
||||
if (
|
||||
Config.get(Config.notify_SMTPServerAddress) == ""
|
||||
or Config.get(Config.notify_AuthorizationCode) == ""
|
||||
or not bool(
|
||||
re.match(
|
||||
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
|
||||
Config.get(Config.notify_FromAddress),
|
||||
)
|
||||
)
|
||||
or not bool(
|
||||
re.match(
|
||||
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
|
||||
to_address,
|
||||
)
|
||||
)
|
||||
):
|
||||
logger.error(
|
||||
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
|
||||
module="通知服务",
|
||||
)
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"邮件通知推送异常",
|
||||
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
|
||||
-1,
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
# 定义邮件正文
|
||||
if mode == "文本":
|
||||
message = MIMEText(content, "plain", "utf-8")
|
||||
elif mode == "网页":
|
||||
message = MIMEMultipart("alternative")
|
||||
message["From"] = formataddr(
|
||||
(
|
||||
Header("AUTO_MAA通知服务", "utf-8").encode(),
|
||||
Config.get(Config.notify_FromAddress),
|
||||
)
|
||||
) # 发件人显示的名字
|
||||
message["To"] = formataddr(
|
||||
(Header("AUTO_MAA用户", "utf-8").encode(), to_address)
|
||||
) # 收件人显示的名字
|
||||
message["Subject"] = Header(title, "utf-8")
|
||||
|
||||
if mode == "网页":
|
||||
message.attach(MIMEText(content, "html", "utf-8"))
|
||||
|
||||
smtpObj = smtplib.SMTP_SSL(Config.get(Config.notify_SMTPServerAddress), 465)
|
||||
smtpObj.login(
|
||||
Config.get(Config.notify_FromAddress),
|
||||
Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
|
||||
)
|
||||
smtpObj.sendmail(
|
||||
Config.get(Config.notify_FromAddress), to_address, message.as_string()
|
||||
)
|
||||
smtpObj.quit()
|
||||
logger.success(f"邮件发送成功:{title}", module="通知服务")
|
||||
except Exception as e:
|
||||
logger.exception(f"发送邮件时出错:{e}", module="通知服务")
|
||||
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
|
||||
|
||||
def ServerChanPush(
|
||||
self, title, content, send_key, tag, channel
|
||||
) -> Union[bool, str]:
|
||||
"""
|
||||
使用Server酱推送通知
|
||||
|
||||
:param title: 通知标题
|
||||
:param content: 通知内容
|
||||
:param send_key: Server酱的SendKey
|
||||
:param tag: 通知标签
|
||||
:param channel: 通知频道
|
||||
:return: bool or str
|
||||
"""
|
||||
|
||||
if not send_key:
|
||||
logger.error("请正确设置Server酱的SendKey", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error", "Server酱通知推送异常", "请正确设置Server酱的SendKey", -1
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
# 构造 URL
|
||||
if send_key.startswith("sctp"):
|
||||
match = re.match(r"^sctp(\d+)t", send_key)
|
||||
if match:
|
||||
url = f"https://{match.group(1)}.push.ft07.com/send/{send_key}.send"
|
||||
else:
|
||||
raise ValueError("SendKey 格式错误(sctp)")
|
||||
else:
|
||||
url = f"https://sctapi.ftqq.com/{send_key}.send"
|
||||
|
||||
# 构建 tags 和 channel
|
||||
def is_valid(s):
|
||||
return s == "" or (
|
||||
s == "|".join(s.split("|"))
|
||||
and (s.count("|") == 0 or all(s.split("|")))
|
||||
)
|
||||
|
||||
tags = "|".join(_.strip() for _ in tag.split("|"))
|
||||
channels = "|".join(_.strip() for _ in channel.split("|"))
|
||||
|
||||
options = {}
|
||||
if is_valid(tags):
|
||||
options["tags"] = tags
|
||||
else:
|
||||
logger.warning("Server酱 Tag 配置不正确,将被忽略", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"warning",
|
||||
"Server酱通知推送异常",
|
||||
"请正确设置 ServerChan 的 Tag",
|
||||
-1,
|
||||
)
|
||||
|
||||
if is_valid(channels):
|
||||
options["channel"] = channels
|
||||
else:
|
||||
logger.warning(
|
||||
"Server酱 Channel 配置不正确,将被忽略", module="通知服务"
|
||||
)
|
||||
self.push_info_bar.emit(
|
||||
"warning",
|
||||
"Server酱通知推送异常",
|
||||
"请正确设置 ServerChan 的 Channel",
|
||||
-1,
|
||||
)
|
||||
|
||||
# 请求发送
|
||||
params = {"title": title, "desp": content, **options}
|
||||
headers = {"Content-Type": "application/json;charset=utf-8"}
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
json=params,
|
||||
headers=headers,
|
||||
timeout=10,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
)
|
||||
result = response.json()
|
||||
|
||||
if result.get("code") == 0:
|
||||
logger.success(f"Server酱推送通知成功:{title}", module="通知服务")
|
||||
return True
|
||||
else:
|
||||
error_code = result.get("code", "-1")
|
||||
logger.exception(
|
||||
f"Server酱通知推送失败:响应码:{error_code}", module="通知服务"
|
||||
)
|
||||
self.push_info_bar.emit(
|
||||
"error", "Server酱通知推送失败", f"响应码:{error_code}", -1
|
||||
)
|
||||
return f"Server酱通知推送失败:{error_code}"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Server酱通知推送异常:{e}", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"Server酱通知推送异常",
|
||||
"请检查相关设置和网络连接。如全部配置正确,请稍后再试。",
|
||||
-1,
|
||||
)
|
||||
return f"Server酱通知推送异常:{str(e)}"
|
||||
|
||||
def CompanyWebHookBotPush(self, title, content, webhook_url) -> Union[bool, str]:
|
||||
"""
|
||||
使用企业微信群机器人推送通知
|
||||
|
||||
:param title: 通知标题
|
||||
:param content: 通知内容
|
||||
:param webhook_url: 企业微信群机器人的WebHook地址
|
||||
:return: bool or str
|
||||
"""
|
||||
|
||||
if webhook_url == "":
|
||||
logger.error("请正确设置企业微信群机器人的WebHook地址", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送异常",
|
||||
"请正确设置企业微信群机器人的WebHook地址",
|
||||
-1,
|
||||
)
|
||||
return None
|
||||
|
||||
content = f"{title}\n{content}"
|
||||
data = {"msgtype": "text", "text": {"content": content}}
|
||||
|
||||
for _ in range(3):
|
||||
try:
|
||||
response = requests.post(
|
||||
url=webhook_url,
|
||||
json=data,
|
||||
timeout=10,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
)
|
||||
info = response.json()
|
||||
break
|
||||
except Exception as e:
|
||||
err = e
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
logger.error(f"推送企业微信群机器人时出错:{err}", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送失败",
|
||||
f"使用企业微信群机器人推送通知时出错:{err}",
|
||||
-1,
|
||||
)
|
||||
return None
|
||||
|
||||
if info["errcode"] == 0:
|
||||
logger.success(f"企业微信群机器人推送通知成功:{title}", module="通知服务")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"企业微信群机器人推送通知失败:{info}", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送失败",
|
||||
f"使用企业微信群机器人推送通知时出错:{err}",
|
||||
-1,
|
||||
)
|
||||
return f"使用企业微信群机器人推送通知时出错:{err}"
|
||||
|
||||
def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> bool:
|
||||
"""
|
||||
使用企业微信群机器人推送图片通知
|
||||
|
||||
:param image_path: 图片文件路径
|
||||
:param webhook_url: 企业微信群机器人的WebHook地址
|
||||
:return: bool
|
||||
"""
|
||||
|
||||
try:
|
||||
# 压缩图片
|
||||
ImageUtils.compress_image_if_needed(image_path)
|
||||
|
||||
# 检查图片是否存在
|
||||
if not image_path.exists():
|
||||
logger.error(
|
||||
"图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确",
|
||||
module="通知服务",
|
||||
)
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送异常",
|
||||
"图片不存在或者压缩失败,请检查图片路径是否正确",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
if not webhook_url:
|
||||
logger.error(
|
||||
"请正确设置企业微信群机器人的WebHook地址", module="通知服务"
|
||||
)
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送异常",
|
||||
"请正确设置企业微信群机器人的WebHook地址",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
# 获取图片base64和md5
|
||||
try:
|
||||
image_base64 = ImageUtils.get_base64_from_file(str(image_path))
|
||||
image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
|
||||
except Exception as e:
|
||||
logger.exception(f"图片编码或MD5计算失败:{e}", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人通知推送异常",
|
||||
f"图片编码或MD5计算失败:{e}",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
data = {
|
||||
"msgtype": "image",
|
||||
"image": {"base64": image_base64, "md5": image_md5},
|
||||
}
|
||||
|
||||
for _ in range(3):
|
||||
try:
|
||||
response = requests.post(
|
||||
url=webhook_url,
|
||||
json=data,
|
||||
timeout=10,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
)
|
||||
info = response.json()
|
||||
break
|
||||
except requests.RequestException as e:
|
||||
err = e
|
||||
logger.exception(
|
||||
f"推送企业微信群机器人图片第{_+1}次失败:{e}", module="通知服务"
|
||||
)
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
logger.error("推送企业微信群机器人图片时出错", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人图片推送失败",
|
||||
f"使用企业微信群机器人推送图片时出错:{err}",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
if info.get("errcode") == 0:
|
||||
logger.success(
|
||||
f"企业微信群机器人推送图片成功:{image_path.name}",
|
||||
module="通知服务",
|
||||
)
|
||||
return True
|
||||
else:
|
||||
logger.error(f"企业微信群机器人推送图片失败:{info}", module="通知服务")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人图片推送失败",
|
||||
f"使用企业微信群机器人推送图片时出错:{info}",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"推送企业微信群机器人图片时发生未知异常:{e}")
|
||||
self.push_info_bar.emit(
|
||||
"error",
|
||||
"企业微信群机器人图片推送失败",
|
||||
f"发生未知异常:{e}",
|
||||
-1,
|
||||
)
|
||||
return False
|
||||
|
||||
def send_test_notification(self):
|
||||
"""发送测试通知到所有已启用的通知渠道"""
|
||||
|
||||
logger.info("发送测试通知到所有已启用的通知渠道", module="通知服务")
|
||||
|
||||
# 发送系统通知
|
||||
self.push_plyer(
|
||||
"测试通知",
|
||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||
"测试通知",
|
||||
3,
|
||||
)
|
||||
|
||||
# 发送邮件通知
|
||||
if Config.get(Config.notify_IfSendMail):
|
||||
self.send_mail(
|
||||
"文本",
|
||||
"AUTO_MAA测试通知",
|
||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||
Config.get(Config.notify_ToAddress),
|
||||
)
|
||||
|
||||
# 发送Server酱通知
|
||||
if Config.get(Config.notify_IfServerChan):
|
||||
self.ServerChanPush(
|
||||
"AUTO_MAA测试通知",
|
||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||
Config.get(Config.notify_ServerChanKey),
|
||||
Config.get(Config.notify_ServerChanTag),
|
||||
Config.get(Config.notify_ServerChanChannel),
|
||||
)
|
||||
|
||||
# 发送企业微信机器人通知
|
||||
if Config.get(Config.notify_IfCompanyWebHookBot):
|
||||
self.CompanyWebHookBotPush(
|
||||
"AUTO_MAA测试通知",
|
||||
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
|
||||
Config.get(Config.notify_CompanyWebHookBotUrl),
|
||||
)
|
||||
Notify.CompanyWebHookBotPushImage(
|
||||
Config.app_path / "resources/images/notification/test_notify.png",
|
||||
Config.get(Config.notify_CompanyWebHookBotUrl),
|
||||
)
|
||||
|
||||
logger.info("测试通知发送完成", module="通知服务")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
Notify = Notification()
|
||||
@@ -1,270 +0,0 @@
|
||||
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
|
||||
# Copyright © 2024-2025 DLmaster361
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# Contact: DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA安全服务
|
||||
v4.4
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import random
|
||||
import secrets
|
||||
import base64
|
||||
import win32crypt
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_OAEP
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
from app.core import Config
|
||||
|
||||
|
||||
class CryptoHandler:
|
||||
|
||||
def get_PASSWORD(self, PASSWORD: str) -> None:
|
||||
"""
|
||||
配置管理密钥
|
||||
|
||||
:param PASSWORD: 管理密钥
|
||||
:type PASSWORD: str
|
||||
"""
|
||||
|
||||
# 生成目录
|
||||
Config.key_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 生成RSA密钥对
|
||||
key = RSA.generate(2048)
|
||||
public_key_local = key.publickey()
|
||||
private_key = key
|
||||
# 保存RSA公钥
|
||||
(Config.app_path / "data/key/public_key.pem").write_bytes(
|
||||
public_key_local.exportKey()
|
||||
)
|
||||
# 生成密钥转换与校验随机盐
|
||||
PASSWORD_salt = secrets.token_hex(random.randint(32, 1024))
|
||||
(Config.app_path / "data/key/PASSWORDsalt.txt").write_text(
|
||||
PASSWORD_salt,
|
||||
encoding="utf-8",
|
||||
)
|
||||
verify_salt = secrets.token_hex(random.randint(32, 1024))
|
||||
(Config.app_path / "data/key/verifysalt.txt").write_text(
|
||||
verify_salt,
|
||||
encoding="utf-8",
|
||||
)
|
||||
# 将管理密钥转化为AES-256密钥
|
||||
AES_password = hashlib.sha256(
|
||||
(PASSWORD + PASSWORD_salt).encode("utf-8")
|
||||
).digest()
|
||||
# 生成AES-256密钥校验哈希值并保存
|
||||
AES_password_verify = hashlib.sha256(
|
||||
AES_password + verify_salt.encode("utf-8")
|
||||
).digest()
|
||||
(Config.app_path / "data/key/AES_password_verify.bin").write_bytes(
|
||||
AES_password_verify
|
||||
)
|
||||
# AES-256加密RSA私钥并保存密文
|
||||
AES_key = AES.new(AES_password, AES.MODE_ECB)
|
||||
private_key_local = AES_key.encrypt(pad(private_key.exportKey(), 32))
|
||||
(Config.app_path / "data/key/private_key.bin").write_bytes(private_key_local)
|
||||
|
||||
def AUTO_encryptor(self, note: str) -> str:
|
||||
"""
|
||||
使用AUTO_MAA的算法加密数据
|
||||
|
||||
:param note: 数据明文
|
||||
:type note: str
|
||||
"""
|
||||
|
||||
if note == "":
|
||||
return ""
|
||||
|
||||
# 读取RSA公钥
|
||||
public_key_local = RSA.import_key(
|
||||
(Config.app_path / "data/key/public_key.pem").read_bytes()
|
||||
)
|
||||
# 使用RSA公钥对数据进行加密
|
||||
cipher = PKCS1_OAEP.new(public_key_local)
|
||||
encrypted = cipher.encrypt(note.encode("utf-8"))
|
||||
return base64.b64encode(encrypted).decode("utf-8")
|
||||
|
||||
def AUTO_decryptor(self, note: str, PASSWORD: str) -> str:
|
||||
"""
|
||||
使用AUTO_MAA的算法解密数据
|
||||
|
||||
:param note: 数据密文
|
||||
:type note: str
|
||||
:param PASSWORD: 管理密钥
|
||||
:type PASSWORD: str
|
||||
:return: 解密后的明文
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
if note == "":
|
||||
return ""
|
||||
|
||||
# 读入RSA私钥密文、盐与校验哈希值
|
||||
private_key_local = (
|
||||
(Config.app_path / "data/key/private_key.bin").read_bytes().strip()
|
||||
)
|
||||
PASSWORD_salt = (
|
||||
(Config.app_path / "data/key/PASSWORDsalt.txt")
|
||||
.read_text(encoding="utf-8")
|
||||
.strip()
|
||||
)
|
||||
verify_salt = (
|
||||
(Config.app_path / "data/key/verifysalt.txt")
|
||||
.read_text(encoding="utf-8")
|
||||
.strip()
|
||||
)
|
||||
AES_password_verify = (
|
||||
(Config.app_path / "data/key/AES_password_verify.bin").read_bytes().strip()
|
||||
)
|
||||
# 将管理密钥转化为AES-256密钥并验证
|
||||
AES_password = hashlib.sha256(
|
||||
(PASSWORD + PASSWORD_salt).encode("utf-8")
|
||||
).digest()
|
||||
AES_password_SHA = hashlib.sha256(
|
||||
AES_password + verify_salt.encode("utf-8")
|
||||
).digest()
|
||||
if AES_password_SHA != AES_password_verify:
|
||||
return "管理密钥错误"
|
||||
else:
|
||||
# AES解密RSA私钥
|
||||
AES_key = AES.new(AES_password, AES.MODE_ECB)
|
||||
private_key_pem = unpad(AES_key.decrypt(private_key_local), 32)
|
||||
private_key = RSA.import_key(private_key_pem)
|
||||
# 使用RSA私钥解密数据
|
||||
decrypter = PKCS1_OAEP.new(private_key)
|
||||
note = decrypter.decrypt(base64.b64decode(note)).decode("utf-8")
|
||||
return note
|
||||
|
||||
def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None:
|
||||
"""
|
||||
修改管理密钥
|
||||
|
||||
:param PASSWORD_old: 旧管理密钥
|
||||
:type PASSWORD_old: str
|
||||
:param PASSWORD_new: 新管理密钥
|
||||
:type PASSWORD_new: str
|
||||
"""
|
||||
|
||||
for script in Config.script_dict.values():
|
||||
|
||||
# 使用旧管理密钥解密
|
||||
if script["Type"] == "Maa":
|
||||
for user in script["UserData"].values():
|
||||
user["Password"] = self.AUTO_decryptor(
|
||||
user["Config"].get(user["Config"].Info_Password), PASSWORD_old
|
||||
)
|
||||
|
||||
self.get_PASSWORD(PASSWORD_new)
|
||||
|
||||
for script in Config.script_dict.values():
|
||||
|
||||
# 使用新管理密钥重新加密
|
||||
if script["Type"] == "Maa":
|
||||
for user in script["UserData"].values():
|
||||
user["Config"].set(
|
||||
user["Config"].Info_Password,
|
||||
self.AUTO_encryptor(user["Password"]),
|
||||
)
|
||||
user["Password"] = None
|
||||
del user["Password"]
|
||||
|
||||
def reset_PASSWORD(self, PASSWORD_new: str) -> None:
|
||||
"""
|
||||
重置管理密钥
|
||||
|
||||
:param PASSWORD_new: 新管理密钥
|
||||
:type PASSWORD_new: str
|
||||
"""
|
||||
|
||||
self.get_PASSWORD(PASSWORD_new)
|
||||
|
||||
for script in Config.script_dict.values():
|
||||
|
||||
if script["Type"] == "Maa":
|
||||
for user in script["UserData"].values():
|
||||
user["Config"].set(
|
||||
user["Config"].Info_Password, self.AUTO_encryptor("数据已重置")
|
||||
)
|
||||
|
||||
def win_encryptor(
|
||||
self, note: str, description: str = None, entropy: bytes = None
|
||||
) -> str:
|
||||
"""
|
||||
使用Windows DPAPI加密数据
|
||||
|
||||
:param note: 数据明文
|
||||
:type note: str
|
||||
:param description: 描述信息
|
||||
:type description: str
|
||||
:param entropy: 随机熵
|
||||
:type entropy: bytes
|
||||
:return: 加密后的数据
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
if note == "":
|
||||
return ""
|
||||
|
||||
encrypted = win32crypt.CryptProtectData(
|
||||
note.encode("utf-8"), description, entropy, None, None, 0
|
||||
)
|
||||
return base64.b64encode(encrypted).decode("utf-8")
|
||||
|
||||
def win_decryptor(self, note: str, entropy: bytes = None) -> str:
|
||||
"""
|
||||
使用Windows DPAPI解密数据
|
||||
|
||||
:param note: 数据密文
|
||||
:type note: str
|
||||
:param entropy: 随机熵
|
||||
:type entropy: bytes
|
||||
:return: 解密后的明文
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
if note == "":
|
||||
return ""
|
||||
|
||||
decrypted = win32crypt.CryptUnprotectData(
|
||||
base64.b64decode(note), entropy, None, None, 0
|
||||
)
|
||||
return decrypted[1].decode("utf-8")
|
||||
|
||||
def check_PASSWORD(self, PASSWORD: str) -> bool:
|
||||
"""
|
||||
验证管理密钥
|
||||
|
||||
:param PASSWORD: 管理密钥
|
||||
:type PASSWORD: str
|
||||
:return: 是否验证通过
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
return bool(
|
||||
self.AUTO_decryptor(self.AUTO_encryptor("-"), PASSWORD) != "管理密钥错误"
|
||||
)
|
||||
|
||||
|
||||
Crypto = CryptoHandler()
|
||||
@@ -1,285 +0,0 @@
|
||||
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
|
||||
# Copyright © 2024-2025 DLmaster361
|
||||
|
||||
# This file incorporates work covered by the following copyright and
|
||||
# permission notice:
|
||||
#
|
||||
# skland-checkin-ghaction Copyright © 2023 Yanstory
|
||||
# https://github.com/Yanstory/skland-checkin-ghaction
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# Contact: DLmaster_361@163.com
|
||||
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA森空岛服务
|
||||
v4.4
|
||||
作者:DLmaster_361、ClozyA
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import hmac
|
||||
import hashlib
|
||||
import requests
|
||||
from urllib import parse
|
||||
|
||||
from app.core import Config, logger
|
||||
|
||||
|
||||
def skland_sign_in(token) -> dict:
|
||||
"""森空岛签到"""
|
||||
|
||||
app_code = "4ca99fa6b56cc2ba"
|
||||
# 用于获取grant code
|
||||
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
|
||||
# 用于获取cred
|
||||
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
|
||||
# 查询角色绑定
|
||||
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
|
||||
# 签到接口
|
||||
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
|
||||
|
||||
# 基础请求头
|
||||
header = {
|
||||
"cred": "",
|
||||
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
|
||||
"Accept-Encoding": "gzip",
|
||||
"Connection": "close",
|
||||
}
|
||||
header_login = header.copy()
|
||||
header_for_sign = {
|
||||
"platform": "1",
|
||||
"timestamp": "",
|
||||
"dId": "",
|
||||
"vName": "1.5.1",
|
||||
}
|
||||
|
||||
def generate_signature(token_for_sign: str, path, body_or_query):
|
||||
"""
|
||||
生成请求签名
|
||||
|
||||
:param token_for_sign: 用于加密的token
|
||||
:param path: 请求路径(如 /api/v1/game/player/binding)
|
||||
:param body_or_query: GET用query字符串,POST用body字符串
|
||||
:return: (sign, 新的header_for_sign字典)
|
||||
"""
|
||||
|
||||
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
|
||||
token_bytes = token_for_sign.encode("utf-8")
|
||||
header_ca = dict(header_for_sign)
|
||||
header_ca["timestamp"] = t
|
||||
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
|
||||
s = path + body_or_query + t + header_ca_str # 拼接原始字符串
|
||||
# HMAC-SHA256 + MD5得到最终sign
|
||||
hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
|
||||
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
|
||||
return md5, header_ca
|
||||
|
||||
def get_sign_header(url: str, method, body, old_header, sign_token):
|
||||
"""
|
||||
获取带签名的请求头
|
||||
|
||||
:param url: 请求完整url
|
||||
:param method: 请求方式 GET/POST
|
||||
:param body: POST请求体或GET时为None
|
||||
:param old_header: 原始请求头
|
||||
:param sign_token: 当前会话的签名token
|
||||
:return: 新请求头
|
||||
"""
|
||||
|
||||
h = json.loads(json.dumps(old_header))
|
||||
p = parse.urlparse(url)
|
||||
if method.lower() == "get":
|
||||
sign, header_ca = generate_signature(sign_token, p.path, p.query)
|
||||
else:
|
||||
sign, header_ca = generate_signature(
|
||||
sign_token, p.path, json.dumps(body) if body else ""
|
||||
)
|
||||
h["sign"] = sign
|
||||
for i in header_ca:
|
||||
h[i] = header_ca[i]
|
||||
return h
|
||||
|
||||
def copy_header(cred):
|
||||
"""
|
||||
复制请求头并添加cred
|
||||
|
||||
:param cred: 当前会话的cred
|
||||
:return: 新的请求头
|
||||
"""
|
||||
v = json.loads(json.dumps(header))
|
||||
v["cred"] = cred
|
||||
return v
|
||||
|
||||
def login_by_token(token_code):
|
||||
"""
|
||||
使用token一步步拿到cred和sign_token
|
||||
|
||||
:param token_code: 你的skyland token
|
||||
:return: (cred, sign_token)
|
||||
"""
|
||||
try:
|
||||
# token为json对象时提取data.content
|
||||
t = json.loads(token_code)
|
||||
token_code = t["data"]["content"]
|
||||
except:
|
||||
pass
|
||||
grant_code = get_grant_code(token_code)
|
||||
return get_cred(grant_code)
|
||||
|
||||
def get_cred(grant):
|
||||
"""
|
||||
通过grant code获取cred和sign_token
|
||||
|
||||
:param grant: grant code
|
||||
:return: (cred, sign_token)
|
||||
"""
|
||||
|
||||
rsp = requests.post(
|
||||
cred_code_url,
|
||||
json={"code": grant, "kind": 1},
|
||||
headers=header_login,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
).json()
|
||||
if rsp["code"] != 0:
|
||||
raise Exception(f'获得cred失败:{rsp.get("messgae")}')
|
||||
sign_token = rsp["data"]["token"]
|
||||
cred = rsp["data"]["cred"]
|
||||
return cred, sign_token
|
||||
|
||||
def get_grant_code(token):
|
||||
"""
|
||||
通过token获取grant code
|
||||
|
||||
:param token: 你的skyland token
|
||||
:return: grant code
|
||||
"""
|
||||
rsp = requests.post(
|
||||
grant_code_url,
|
||||
json={"appCode": app_code, "token": token, "type": 0},
|
||||
headers=header_login,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
).json()
|
||||
if rsp["status"] != 0:
|
||||
raise Exception(
|
||||
f'使用token: {token[:3]}******{token[-3:]} 获得认证代码失败:{rsp.get("msg")}'
|
||||
)
|
||||
return rsp["data"]["code"]
|
||||
|
||||
def get_binding_list(cred, sign_token):
|
||||
"""
|
||||
查询已绑定的角色列表
|
||||
|
||||
:param cred: 当前cred
|
||||
:param sign_token: 当前sign_token
|
||||
:return: 角色列表
|
||||
"""
|
||||
v = []
|
||||
rsp = requests.get(
|
||||
binding_url,
|
||||
headers=get_sign_header(
|
||||
binding_url, "get", None, copy_header(cred), sign_token
|
||||
),
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
).json()
|
||||
if rsp["code"] != 0:
|
||||
logger.error(
|
||||
f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}",
|
||||
module="森空岛签到",
|
||||
)
|
||||
if rsp.get("message") == "用户未登录":
|
||||
logger.error(
|
||||
f"森空岛服务 | 用户登录可能失效了,请重新登录!",
|
||||
module="森空岛签到",
|
||||
)
|
||||
return v
|
||||
# 只取明日方舟(arknights)的绑定账号
|
||||
for i in rsp["data"]["list"]:
|
||||
if i.get("appCode") != "arknights":
|
||||
continue
|
||||
v.extend(i.get("bindingList"))
|
||||
return v
|
||||
|
||||
def do_sign(cred, sign_token) -> dict:
|
||||
"""
|
||||
对所有绑定的角色进行签到
|
||||
|
||||
:param cred: 当前cred
|
||||
:param sign_token: 当前sign_token
|
||||
:return: 签到结果字典
|
||||
"""
|
||||
|
||||
characters = get_binding_list(cred, sign_token)
|
||||
result = {"成功": [], "重复": [], "失败": [], "总计": len(characters)}
|
||||
|
||||
for character in characters:
|
||||
|
||||
body = {
|
||||
"uid": character.get("uid"),
|
||||
"gameId": character.get("channelMasterId"),
|
||||
}
|
||||
rsp = requests.post(
|
||||
sign_url,
|
||||
headers=get_sign_header(
|
||||
sign_url, "post", body, copy_header(cred), sign_token
|
||||
),
|
||||
json=body,
|
||||
proxies={
|
||||
"http": Config.get(Config.update_ProxyAddress),
|
||||
"https": Config.get(Config.update_ProxyAddress),
|
||||
},
|
||||
).json()
|
||||
|
||||
if rsp["code"] != 0:
|
||||
|
||||
result[
|
||||
"重复" if rsp.get("message") == "请勿重复签到!" else "失败"
|
||||
].append(
|
||||
f"{character.get("nickName")}({character.get("channelName")})"
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
result["成功"].append(
|
||||
f"{character.get("nickName")}({character.get("channelName")})"
|
||||
)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
return result
|
||||
|
||||
# 主流程
|
||||
try:
|
||||
# 拿到cred和sign_token
|
||||
cred, sign_token = login_by_token(token)
|
||||
time.sleep(1)
|
||||
# 依次签到
|
||||
return do_sign(cred, sign_token)
|
||||
except Exception as e:
|
||||
logger.exception(f"森空岛服务 | 森空岛签到失败: {e}", module="森空岛签到")
|
||||
return {"成功": [], "重复": [], "失败": [], "总计": 0}
|
||||
@@ -1,352 +0,0 @@
|
||||
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
|
||||
# Copyright © 2024-2025 DLmaster361
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# Contact: DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA系统服务
|
||||
v4.4
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
from PySide6.QtWidgets import QApplication
|
||||
import sys
|
||||
import ctypes
|
||||
import win32gui
|
||||
import win32process
|
||||
import psutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import getpass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from app.core import Config, logger
|
||||
|
||||
|
||||
class _SystemHandler:
|
||||
|
||||
ES_CONTINUOUS = 0x80000000
|
||||
ES_SYSTEM_REQUIRED = 0x00000001
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self.set_Sleep()
|
||||
self.set_SelfStart()
|
||||
|
||||
def set_Sleep(self) -> None:
|
||||
"""同步系统休眠状态"""
|
||||
|
||||
if Config.get(Config.function_IfAllowSleep):
|
||||
# 设置系统电源状态
|
||||
ctypes.windll.kernel32.SetThreadExecutionState(
|
||||
self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
|
||||
)
|
||||
else:
|
||||
# 恢复系统电源状态
|
||||
ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)
|
||||
|
||||
def set_SelfStart(self) -> None:
|
||||
"""同步开机自启"""
|
||||
|
||||
if Config.get(Config.start_IfSelfStart) and not self.is_startup():
|
||||
|
||||
# 创建任务计划
|
||||
try:
|
||||
|
||||
# 获取当前用户和时间
|
||||
current_user = getpass.getuser()
|
||||
current_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
# XML 模板
|
||||
xml_content = f"""<?xml version="1.0" encoding="UTF-16"?>
|
||||
<Task version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task">
|
||||
<RegistrationInfo>
|
||||
<Date>{current_time}</Date>
|
||||
<Author>{current_user}</Author>
|
||||
<Description>AUTO_MAA自启动服务</Description>
|
||||
<URI>\\AUTO_MAA_AutoStart</URI>
|
||||
</RegistrationInfo>
|
||||
<Triggers>
|
||||
<LogonTrigger>
|
||||
<StartBoundary>{current_time}</StartBoundary>
|
||||
<Enabled>true</Enabled>
|
||||
</LogonTrigger>
|
||||
</Triggers>
|
||||
<Principals>
|
||||
<Principal id="Author">
|
||||
<LogonType>InteractiveToken</LogonType>
|
||||
<RunLevel>HighestAvailable</RunLevel>
|
||||
</Principal>
|
||||
</Principals>
|
||||
<Settings>
|
||||
<MultipleInstancesPolicy>IgnoreNew</MultipleInstancesPolicy>
|
||||
<DisallowStartIfOnBatteries>false</DisallowStartIfOnBatteries>
|
||||
<StopIfGoingOnBatteries>false</StopIfGoingOnBatteries>
|
||||
<AllowHardTerminate>false</AllowHardTerminate>
|
||||
<StartWhenAvailable>true</StartWhenAvailable>
|
||||
<RunOnlyIfNetworkAvailable>false</RunOnlyIfNetworkAvailable>
|
||||
<IdleSettings>
|
||||
<StopOnIdleEnd>false</StopOnIdleEnd>
|
||||
<RestartOnIdle>false</RestartOnIdle>
|
||||
</IdleSettings>
|
||||
<AllowStartOnDemand>true</AllowStartOnDemand>
|
||||
<Enabled>true</Enabled>
|
||||
<Hidden>false</Hidden>
|
||||
<RunOnlyIfIdle>false</RunOnlyIfIdle>
|
||||
<WakeToRun>false</WakeToRun>
|
||||
<ExecutionTimeLimit>PT0S</ExecutionTimeLimit>
|
||||
<Priority>7</Priority>
|
||||
</Settings>
|
||||
<Actions Context="Author">
|
||||
<Exec>
|
||||
<Command>"{Config.app_path_sys}"</Command>
|
||||
</Exec>
|
||||
</Actions>
|
||||
</Task>"""
|
||||
|
||||
# 创建临时 XML 文件并执行
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".xml", delete=False, encoding="utf-16"
|
||||
) as f:
|
||||
f.write(xml_content)
|
||||
xml_file = f.name
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[
|
||||
"schtasks",
|
||||
"/create",
|
||||
"/tn",
|
||||
"AUTO_MAA_AutoStart",
|
||||
"/xml",
|
||||
xml_file,
|
||||
"/f",
|
||||
],
|
||||
creationflags=subprocess.CREATE_NO_WINDOW,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.success(
|
||||
f"程序自启动任务计划已创建: {Config.app_path_sys}",
|
||||
module="系统服务",
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"程序自启动任务计划创建失败: {result.stderr}",
|
||||
module="系统服务",
|
||||
)
|
||||
|
||||
finally:
|
||||
# 删除临时文件
|
||||
try:
|
||||
Path(xml_file).unlink()
|
||||
except:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"程序自启动任务计划创建失败: {e}", module="系统服务")
|
||||
|
||||
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
|
||||
|
||||
try:
|
||||
|
||||
result = subprocess.run(
|
||||
["schtasks", "/delete", "/tn", "AUTO_MAA_AutoStart", "/f"],
|
||||
creationflags=subprocess.CREATE_NO_WINDOW,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.success("程序自启动任务计划已删除", module="系统服务")
|
||||
else:
|
||||
logger.error(
|
||||
f"程序自启动任务计划删除失败: {result.stderr}",
|
||||
module="系统服务",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"程序自启动任务计划删除失败: {e}", module="系统服务")
|
||||
|
||||
def set_power(self, mode) -> None:
|
||||
"""
|
||||
执行系统电源操作
|
||||
|
||||
:param mode: 电源操作模式,支持 "NoAction", "Shutdown", "Hibernate", "Sleep", "KillSelf", "ShutdownForce"
|
||||
"""
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
|
||||
if mode == "NoAction":
|
||||
|
||||
logger.info("不执行系统电源操作", module="系统服务")
|
||||
|
||||
elif mode == "Shutdown":
|
||||
|
||||
self.kill_emulator_processes()
|
||||
logger.info("执行关机操作", module="系统服务")
|
||||
subprocess.run(["shutdown", "/s", "/t", "0"])
|
||||
|
||||
elif mode == "ShutdownForce":
|
||||
logger.info("执行强制关机操作", module="系统服务")
|
||||
subprocess.run(["shutdown", "/s", "/t", "0", "/f"])
|
||||
|
||||
elif mode == "Hibernate":
|
||||
|
||||
logger.info("执行休眠操作", module="系统服务")
|
||||
subprocess.run(["shutdown", "/h"])
|
||||
|
||||
elif mode == "Sleep":
|
||||
|
||||
logger.info("执行睡眠操作", module="系统服务")
|
||||
subprocess.run(
|
||||
["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"]
|
||||
)
|
||||
|
||||
elif mode == "KillSelf":
|
||||
|
||||
logger.info("执行退出主程序操作", module="系统服务")
|
||||
Config.main_window.close()
|
||||
QApplication.quit()
|
||||
sys.exit(0)
|
||||
|
||||
elif sys.platform.startswith("linux"):
|
||||
|
||||
if mode == "NoAction":
|
||||
|
||||
logger.info("不执行系统电源操作", module="系统服务")
|
||||
|
||||
elif mode == "Shutdown":
|
||||
|
||||
logger.info("执行关机操作", module="系统服务")
|
||||
subprocess.run(["shutdown", "-h", "now"])
|
||||
|
||||
elif mode == "Hibernate":
|
||||
|
||||
logger.info("执行休眠操作", module="系统服务")
|
||||
subprocess.run(["systemctl", "hibernate"])
|
||||
|
||||
elif mode == "Sleep":
|
||||
|
||||
logger.info("执行睡眠操作", module="系统服务")
|
||||
subprocess.run(["systemctl", "suspend"])
|
||||
|
||||
elif mode == "KillSelf":
|
||||
|
||||
logger.info("执行退出主程序操作", module="系统服务")
|
||||
Config.main_window.close()
|
||||
QApplication.quit()
|
||||
sys.exit(0)
|
||||
|
||||
def kill_emulator_processes(self):
|
||||
"""这里暂时仅支持 MuMu 模拟器"""
|
||||
|
||||
logger.info("正在清除模拟器进程", module="系统服务")
|
||||
|
||||
keywords = ["Nemu", "nemu", "emulator", "MuMu"]
|
||||
for proc in psutil.process_iter(["pid", "name"]):
|
||||
try:
|
||||
pname = proc.info["name"].lower()
|
||||
if any(keyword.lower() in pname for keyword in keywords):
|
||||
proc.kill()
|
||||
logger.info(
|
||||
f"已关闭 MuMu 模拟器进程: {proc.info['name']}",
|
||||
module="系统服务",
|
||||
)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
|
||||
logger.success("模拟器进程清除完成", module="系统服务")
|
||||
|
||||
def is_startup(self) -> bool:
|
||||
"""判断程序是否已经开机自启"""
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["schtasks", "/query", "/tn", "AUTO_MAA_AutoStart"],
|
||||
creationflags=subprocess.CREATE_NO_WINDOW,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception as e:
|
||||
logger.exception(f"检查任务计划程序失败: {e}", module="系统服务")
|
||||
return False
|
||||
|
||||
def get_window_info(self) -> list:
|
||||
"""获取当前前台窗口信息"""
|
||||
|
||||
def callback(hwnd, window_info):
|
||||
if win32gui.IsWindowVisible(hwnd) and win32gui.GetWindowText(hwnd):
|
||||
_, pid = win32process.GetWindowThreadProcessId(hwnd)
|
||||
process = psutil.Process(pid)
|
||||
window_info.append((win32gui.GetWindowText(hwnd), process.exe()))
|
||||
return True
|
||||
|
||||
window_info = []
|
||||
win32gui.EnumWindows(callback, window_info)
|
||||
return window_info
|
||||
|
||||
def kill_process(self, path: Path) -> None:
|
||||
"""
|
||||
根据路径中止进程
|
||||
|
||||
:param path: 进程路径
|
||||
"""
|
||||
|
||||
logger.info(f"开始中止进程: {path}", module="系统服务")
|
||||
|
||||
for pid in self.search_pids(path):
|
||||
killprocess = subprocess.Popen(
|
||||
f"taskkill /F /T /PID {pid}",
|
||||
shell=True,
|
||||
creationflags=subprocess.CREATE_NO_WINDOW,
|
||||
)
|
||||
killprocess.wait()
|
||||
|
||||
logger.success(f"进程已中止: {path}", module="系统服务")
|
||||
|
||||
def search_pids(self, path: Path) -> list:
|
||||
"""
|
||||
根据路径查找进程PID
|
||||
|
||||
:param path: 进程路径
|
||||
:return: 匹配的进程PID列表
|
||||
"""
|
||||
|
||||
logger.info(f"开始查找进程 PID: {path}", module="系统服务")
|
||||
|
||||
pids = []
|
||||
for proc in psutil.process_iter(["pid", "exe"]):
|
||||
try:
|
||||
if proc.info["exe"] and proc.info["exe"].lower() == str(path).lower():
|
||||
pids.append(proc.info["pid"])
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
# 进程可能在此期间已结束或无法访问,忽略这些异常
|
||||
pass
|
||||
return pids
|
||||
|
||||
|
||||
System = _SystemHandler()
|
||||
Reference in New Issue
Block a user