feat(core): 重构日志记录,载入更多日志记录项

This commit is contained in:
DLmaster361
2025-07-18 18:12:47 +08:00
parent 8427bd9f6b
commit 9b492b5e0d
26 changed files with 2217 additions and 800 deletions

View File

@@ -33,13 +33,14 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from pathlib import Path
from typing import Union
import requests
from PySide6.QtCore import QObject, Signal
from loguru import logger
from plyer import notification
from app.core import Config
from app.core import Config, logger
from app.services.security import Crypto
from app.utils.ImageUtils import ImageUtils
@@ -51,11 +52,21 @@ class Notification(QObject):
def __init__(self, parent=None):
super().__init__(parent)
def push_plyer(self, title, message, ticker, t):
"""推送系统通知"""
def push_plyer(self, title, message, ticker, t) -> bool:
"""
推送系统通知
:param title: 通知标题
:param message: 通知内容
:param ticker: 通知横幅
:param t: 通知持续时间
:return: bool
"""
if Config.get(Config.notify_IfPushPlyer):
logger.info(f"推送系统通知:{title}", module="通知服务")
notification.notify(
title=title,
message=message,
@@ -69,7 +80,15 @@ class Notification(QObject):
return True
def send_mail(self, mode, title, content, to_address) -> None:
"""推送邮件通知"""
"""
推送邮件通知
:param mode: 邮件内容模式,支持 "文本""网页"
:param title: 邮件标题
:param content: 邮件内容
:param to_address: 收件人地址
"""
if (
Config.get(Config.notify_SMTPServerAddress) == ""
or Config.get(Config.notify_AuthorizationCode) == ""
@@ -87,7 +106,8 @@ class Notification(QObject):
)
):
logger.error(
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址"
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
module="通知服务",
)
self.push_info_bar.emit(
"error",
@@ -110,42 +130,43 @@ class Notification(QObject):
)
) # 发件人显示的名字
message["To"] = formataddr(
(
Header("AUTO_MAA用户", "utf-8").encode(),
to_address,
)
(Header("AUTO_MAA用户", "utf-8").encode(), to_address)
) # 收件人显示的名字
message["Subject"] = Header(title, "utf-8")
if mode == "网页":
message.attach(MIMEText(content, "html", "utf-8"))
smtpObj = smtplib.SMTP_SSL(
Config.get(Config.notify_SMTPServerAddress),
465,
)
smtpObj = smtplib.SMTP_SSL(Config.get(Config.notify_SMTPServerAddress), 465)
smtpObj.login(
Config.get(Config.notify_FromAddress),
Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
)
smtpObj.sendmail(
Config.get(Config.notify_FromAddress),
to_address,
message.as_string(),
Config.get(Config.notify_FromAddress), to_address, message.as_string()
)
smtpObj.quit()
logger.success("邮件发送成功")
return None
logger.success(f"邮件发送成功{title}", module="通知服务")
except Exception as e:
logger.error(f"发送邮件时出错:\n{e}")
logger.exception(f"发送邮件时出错:{e}", module="通知服务")
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
return None
return None
def ServerChanPush(self, title, content, send_key, tag, channel):
"""使用Server酱推送通知"""
def ServerChanPush(
self, title, content, send_key, tag, channel
) -> Union[bool, str]:
"""
使用Server酱推送通知
:param title: 通知标题
:param content: 通知内容
:param send_key: Server酱的SendKey
:param tag: 通知标签
:param channel: 通知频道
:return: bool or str
"""
if not send_key:
logger.error("请正确设置Server酱的SendKey")
logger.error("请正确设置Server酱的SendKey", module="通知服务")
self.push_info_bar.emit(
"error", "Server酱通知推送异常", "请正确设置Server酱的SendKey", -1
)
@@ -176,7 +197,7 @@ class Notification(QObject):
if is_valid(tags):
options["tags"] = tags
else:
logger.warning("Server酱 Tag 配置不正确,将被忽略")
logger.warning("Server酱 Tag 配置不正确,将被忽略", module="通知服务")
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
@@ -187,7 +208,9 @@ class Notification(QObject):
if is_valid(channels):
options["channel"] = channels
else:
logger.warning("Server酱 Channel 配置不正确,将被忽略")
logger.warning(
"Server酱 Channel 配置不正确,将被忽略", module="通知服务"
)
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
@@ -212,18 +235,20 @@ class Notification(QObject):
result = response.json()
if result.get("code") == 0:
logger.info("Server酱推送通知成功")
logger.success(f"Server酱推送通知成功{title}", module="通知服务")
return True
else:
error_code = result.get("code", "-1")
logger.error(f"Server酱通知推送失败响应码{error_code}")
logger.exception(
f"Server酱通知推送失败响应码{error_code}", module="通知服务"
)
self.push_info_bar.emit(
"error", "Server酱通知推送失败", f"响应码:{error_code}", -1
)
return f"Server酱通知推送失败{error_code}"
except Exception as e:
logger.exception("Server酱通知推送异常")
logger.exception(f"Server酱通知推送异常{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"Server酱通知推送异常",
@@ -232,10 +257,18 @@ class Notification(QObject):
)
return f"Server酱通知推送异常{str(e)}"
def CompanyWebHookBotPush(self, title, content, webhook_url):
"""使用企业微信群机器人推送通知"""
def CompanyWebHookBotPush(self, title, content, webhook_url) -> Union[bool, str]:
"""
使用企业微信群机器人推送通知
:param title: 通知标题
:param content: 通知内容
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool or str
"""
if webhook_url == "":
logger.error("请正确设置企业微信群机器人的WebHook地址")
logger.error("请正确设置企业微信群机器人的WebHook地址", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
@@ -264,7 +297,7 @@ class Notification(QObject):
err = e
time.sleep(0.1)
else:
logger.error(f"推送企业微信群机器人时出错:{err}")
logger.error(f"推送企业微信群机器人时出错:{err}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
@@ -274,10 +307,10 @@ class Notification(QObject):
return None
if info["errcode"] == 0:
logger.info("企业微信群机器人推送通知成功")
logger.success(f"企业微信群机器人推送通知成功{title}", module="通知服务")
return True
else:
logger.error(f"企业微信群机器人推送通知失败:{info}")
logger.error(f"企业微信群机器人推送通知失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
@@ -287,7 +320,14 @@ class Notification(QObject):
return f"使用企业微信群机器人推送通知时出错:{err}"
def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> bool:
"""使用企业微信群机器人推送图片通知"""
"""
使用企业微信群机器人推送图片通知
:param image_path: 图片文件路径
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool
"""
try:
# 压缩图片
ImageUtils.compress_image_if_needed(image_path)
@@ -295,7 +335,8 @@ class Notification(QObject):
# 检查图片是否存在
if not image_path.exists():
logger.error(
"图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确"
"图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确",
module="通知服务",
)
self.push_info_bar.emit(
"error",
@@ -306,7 +347,9 @@ class Notification(QObject):
return False
if not webhook_url:
logger.error("请正确设置企业微信群机器人的WebHook地址")
logger.error(
"请正确设置企业微信群机器人的WebHook地址", module="通知服务"
)
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
@@ -320,7 +363,7 @@ class Notification(QObject):
image_base64 = ImageUtils.get_base64_from_file(str(image_path))
image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
except Exception as e:
logger.exception(f"图片编码或MD5计算失败{e}")
logger.exception(f"图片编码或MD5计算失败{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
@@ -349,10 +392,12 @@ class Notification(QObject):
break
except requests.RequestException as e:
err = e
logger.warning(f"推送企业微信群机器人图片第{_+1}次失败:{e}")
logger.exception(
f"推送企业微信群机器人图片第{_+1}次失败:{e}", module="通知服务"
)
time.sleep(0.1)
else:
logger.error(f"推送企业微信群机器人图片时出错{err}")
logger.error("推送企业微信群机器人图片时出错", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
@@ -362,10 +407,13 @@ class Notification(QObject):
return False
if info.get("errcode") == 0:
logger.info("企业微信群机器人推送图片成功")
logger.success(
f"企业微信群机器人推送图片成功:{image_path.name}",
module="通知服务",
)
return True
else:
logger.error(f"企业微信群机器人推送图片失败:{info}")
logger.error(f"企业微信群机器人推送图片失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
@@ -386,6 +434,9 @@ class Notification(QObject):
def send_test_notification(self):
"""发送测试通知到所有已启用的通知渠道"""
logger.info("发送测试通知到所有已启用的通知渠道", module="通知服务")
# 发送系统通知
self.push_plyer(
"测试通知",
@@ -425,6 +476,8 @@ class Notification(QObject):
Config.get(Config.notify_CompanyWebHookBotUrl),
)
logger.info("测试通知发送完成", module="通知服务")
return True

View File

@@ -25,18 +25,15 @@ v4.4
作者DLmaster_361
"""
from loguru import logger
import hashlib
import random
import secrets
import base64
import win32crypt
from pathlib import Path
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Util.Padding import pad, unpad
from typing import List, Dict, Union
from app.core import Config
@@ -44,7 +41,12 @@ from app.core import Config
class CryptoHandler:
def get_PASSWORD(self, PASSWORD: str) -> None:
"""配置管理密钥"""
"""
配置管理密钥
:param PASSWORD: 管理密钥
:type PASSWORD: str
"""
# 生成目录
Config.key_path.mkdir(parents=True, exist_ok=True)
@@ -85,7 +87,12 @@ class CryptoHandler:
(Config.app_path / "data/key/private_key.bin").write_bytes(private_key_local)
def AUTO_encryptor(self, note: str) -> str:
"""使用AUTO_MAA的算法加密数据"""
"""
使用AUTO_MAA的算法加密数据
:param note: 数据明文
:type note: str
"""
if note == "":
return ""
@@ -100,7 +107,16 @@ class CryptoHandler:
return base64.b64encode(encrypted).decode("utf-8")
def AUTO_decryptor(self, note: str, PASSWORD: str) -> str:
"""使用AUTO_MAA的算法解密数据"""
"""
使用AUTO_MAA的算法解密数据
:param note: 数据密文
:type note: str
:param PASSWORD: 管理密钥
:type PASSWORD: str
:return: 解密后的明文
:rtype: str
"""
if note == "":
return ""
@@ -142,7 +158,14 @@ class CryptoHandler:
return note
def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None:
"""修改管理密钥"""
"""
修改管理密钥
:param PASSWORD_old: 旧管理密钥
:type PASSWORD_old: str
:param PASSWORD_new: 新管理密钥
:type PASSWORD_new: str
"""
for member in Config.member_dict.values():
@@ -168,7 +191,12 @@ class CryptoHandler:
del user["Password"]
def reset_PASSWORD(self, PASSWORD_new: str) -> None:
"""重置管理密钥"""
"""
重置管理密钥
:param PASSWORD_new: 新管理密钥
:type PASSWORD_new: str
"""
self.get_PASSWORD(PASSWORD_new)
@@ -176,12 +204,25 @@ class CryptoHandler:
if member["Type"] == "Maa":
for user in member["UserData"].values():
user["Config"].set(user["Config"].Info_Password, "")
user["Config"].set(
user["Config"].Info_Password, self.AUTO_encryptor("数据已重置")
)
def win_encryptor(
self, note: str, description: str = None, entropy: bytes = None
) -> str:
"""使用Windows DPAPI加密数据"""
"""
使用Windows DPAPI加密数据
:param note: 数据明文
:type note: str
:param description: 描述信息
:type description: str
:param entropy: 随机熵
:type entropy: bytes
:return: 加密后的数据
:rtype: str
"""
if note == "":
return ""
@@ -192,7 +233,16 @@ class CryptoHandler:
return base64.b64encode(encrypted).decode("utf-8")
def win_decryptor(self, note: str, entropy: bytes = None) -> str:
"""使用Windows DPAPI解密数据"""
"""
使用Windows DPAPI解密数据
:param note: 数据密文
:type note: str
:param entropy: 随机熵
:type entropy: bytes
:return: 解密后的明文
:rtype: str
"""
if note == "":
return ""
@@ -202,21 +252,15 @@ class CryptoHandler:
)
return decrypted[1].decode("utf-8")
def search_member(self) -> List[Dict[str, Union[Path, list]]]:
"""搜索所有脚本实例及其用户数据库路径"""
member_list = []
if (Config.app_path / "config/MaaConfig").exists():
for subdir in (Config.app_path / "config/MaaConfig").iterdir():
if subdir.is_dir():
member_list.append({"Path": subdir / "user_data.db"})
return member_list
def check_PASSWORD(self, PASSWORD: str) -> bool:
"""验证管理密钥"""
"""
验证管理密钥
:param PASSWORD: 管理密钥
:type PASSWORD: str
:return: 是否验证通过
:rtype: bool
"""
return bool(
self.AUTO_decryptor(self.AUTO_encryptor("-"), PASSWORD) != "管理密钥错误"

View File

@@ -32,7 +32,6 @@ v4.4
作者DLmaster_361、ClozyA
"""
from loguru import logger
import time
import json
import hmac
@@ -40,7 +39,7 @@ import hashlib
import requests
from urllib import parse
from app.core import Config
from app.core import Config, logger
def skland_sign_in(token) -> dict:
@@ -71,15 +70,16 @@ def skland_sign_in(token) -> dict:
"vName": "1.5.1",
}
# 生成签名
def generate_signature(token_for_sign: str, path, body_or_query):
"""
生成请求签名
:param token_for_sign: 用于加密的token
:param path: 请求路径(如 /api/v1/game/player/binding
:param body_or_query: GET用query字符串POST用body字符串
:return: (sign, 新的header_for_sign字典)
"""
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
token_bytes = token_for_sign.encode("utf-8")
header_ca = dict(header_for_sign)
@@ -91,10 +91,10 @@ def skland_sign_in(token) -> dict:
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
# 获取带签名的header
def get_sign_header(url: str, method, body, old_header, sign_token):
"""
获取带签名的请求头
:param url: 请求完整url
:param method: 请求方式 GET/POST
:param body: POST请求体或GET时为None
@@ -102,6 +102,7 @@ def skland_sign_in(token) -> dict:
:param sign_token: 当前会话的签名token
:return: 新请求头
"""
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
@@ -115,15 +116,21 @@ def skland_sign_in(token) -> dict:
h[i] = header_ca[i]
return h
# 复制请求头并添加cred
def copy_header(cred):
"""
复制请求头并添加cred
:param cred: 当前会话的cred
:return: 新的请求头
"""
v = json.loads(json.dumps(header))
v["cred"] = cred
return v
# 使用token一步步拿到cred和sign_token
def login_by_token(token_code):
"""
使用token一步步拿到cred和sign_token
:param token_code: 你的skyland token
:return: (cred, sign_token)
"""
@@ -136,8 +143,14 @@ def skland_sign_in(token) -> dict:
grant_code = get_grant_code(token_code)
return get_cred(grant_code)
# 通过grant code换cred和sign_token
def get_cred(grant):
"""
通过grant code获取cred和sign_token
:param grant: grant code
:return: (cred, sign_token)
"""
rsp = requests.post(
cred_code_url,
json={"code": grant, "kind": 1},
@@ -153,8 +166,13 @@ def skland_sign_in(token) -> dict:
cred = rsp["data"]["cred"]
return cred, sign_token
# 通过token换grant code
def get_grant_code(token):
"""
通过token获取grant code
:param token: 你的skyland token
:return: grant code
"""
rsp = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
@@ -170,10 +188,10 @@ def skland_sign_in(token) -> dict:
)
return rsp["data"]["code"]
# 获取已绑定的角色列表
def get_binding_list(cred, sign_token):
"""
查询绑定的角色
查询绑定的角色列表
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 角色列表
@@ -190,9 +208,15 @@ def skland_sign_in(token) -> dict:
},
).json()
if rsp["code"] != 0:
logger.error(f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}")
logger.error(
f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}",
module="森空岛签到",
)
if rsp.get("message") == "用户未登录":
logger.error(f"森空岛服务 | 用户登录可能失效了,请重新登录!")
logger.error(
f"森空岛服务 | 用户登录可能失效了,请重新登录!",
module="森空岛签到",
)
return v
# 只取明日方舟arknights的绑定账号
for i in rsp["data"]["list"]:
@@ -201,10 +225,10 @@ def skland_sign_in(token) -> dict:
v.extend(i.get("bindingList"))
return v
# 执行签到
def do_sign(cred, sign_token) -> dict:
"""
对所有绑定的角色进行签到
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 签到结果字典
@@ -257,5 +281,5 @@ def skland_sign_in(token) -> dict:
# 依次签到
return do_sign(cred, sign_token)
except Exception as e:
logger.error(f"森空岛服务 | 森空岛签到失败: {e}")
logger.exception(f"森空岛服务 | 森空岛签到失败: {e}", module="森空岛签到")
return {"成功": [], "重复": [], "失败": [], "总计": 0}

View File

@@ -25,7 +25,6 @@ v4.4
作者DLmaster_361
"""
from loguru import logger
from PySide6.QtWidgets import QApplication
import sys
import ctypes
@@ -38,7 +37,7 @@ import getpass
from datetime import datetime
from pathlib import Path
from app.core import Config
from app.core import Config, logger
class _SystemHandler:
@@ -147,9 +146,15 @@ class _SystemHandler:
)
if result.returncode == 0:
logger.info(f"任务计划程序自启动已创建: {Config.app_path_sys}")
logger.success(
f"程序自启动任务计划已创建: {Config.app_path_sys}",
module="系统服务",
)
else:
logger.error(f"创建任务计划失败: {result.stderr}")
logger.error(
f"程序自启动任务计划创建失败: {result.stderr}",
module="系统服务",
)
finally:
# 删除临时文件
@@ -159,7 +164,7 @@ class _SystemHandler:
pass
except Exception as e:
logger.exception(f"设置任务计划程序自启动失败: {e}")
logger.exception(f"程序自启动任务计划创建失败: {e}", module="系统服务")
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
@@ -174,40 +179,49 @@ class _SystemHandler:
)
if result.returncode == 0:
logger.info("任务计划程序自启动已删除")
logger.success("程序自启动任务计划已删除", module="系统服务")
else:
logger.error(f"删除任务计划失败: {result.stderr}")
logger.error(
f"程序自启动任务计划删除失败: {result.stderr}",
module="系统服务",
)
except Exception as e:
logger.exception(f"删除任务计划程序自启动失败: {e}")
logger.exception(f"程序自启动任务计划删除失败: {e}", module="系统服务")
def set_power(self, mode) -> None:
"""
执行系统电源操作
:param mode: 电源操作模式,支持 "NoAction", "Shutdown", "Hibernate", "Sleep", "KillSelf"
"""
if sys.platform.startswith("win"):
if mode == "NoAction":
logger.info("不执行系统电源操作")
logger.info("不执行系统电源操作", module="系统服务")
elif mode == "Shutdown":
logger.info("执行关机操作")
logger.info("执行关机操作", module="系统服务")
subprocess.run(["shutdown", "/s", "/t", "0"])
elif mode == "Hibernate":
logger.info("执行休眠操作")
logger.info("执行休眠操作", module="系统服务")
subprocess.run(["shutdown", "/h"])
elif mode == "Sleep":
logger.info("执行睡眠操作")
logger.info("执行睡眠操作", module="系统服务")
subprocess.run(
["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"]
)
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
sys.exit(0)
@@ -216,25 +230,26 @@ class _SystemHandler:
if mode == "NoAction":
logger.info("不执行系统电源操作")
logger.info("不执行系统电源操作", module="系统服务")
elif mode == "Shutdown":
logger.info("执行关机操作")
logger.info("执行关机操作", module="系统服务")
subprocess.run(["shutdown", "-h", "now"])
elif mode == "Hibernate":
logger.info("执行休眠操作")
logger.info("执行休眠操作", module="系统服务")
subprocess.run(["systemctl", "hibernate"])
elif mode == "Sleep":
logger.info("执行睡眠操作")
logger.info("执行睡眠操作", module="系统服务")
subprocess.run(["systemctl", "suspend"])
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
sys.exit(0)
@@ -252,11 +267,11 @@ class _SystemHandler:
)
return result.returncode == 0
except Exception as e:
logger.error(f"检查任务计划程序失败: {e}")
logger.exception(f"检查任务计划程序失败: {e}", module="系统服务")
return False
def get_window_info(self) -> list:
"""获取当前窗口信息"""
"""获取当前前台窗口信息"""
def callback(hwnd, window_info):
if win32gui.IsWindowVisible(hwnd) and win32gui.GetWindowText(hwnd):
@@ -270,7 +285,13 @@ class _SystemHandler:
return window_info
def kill_process(self, path: Path) -> None:
"""根据路径中止进程"""
"""
根据路径中止进程
:param path: 进程路径
"""
logger.info(f"开始中止进程: {path}", module="系统服务")
for pid in self.search_pids(path):
killprocess = subprocess.Popen(
@@ -280,8 +301,17 @@ class _SystemHandler:
)
killprocess.wait()
logger.success(f"进程已中止: {path}", module="系统服务")
def search_pids(self, path: Path) -> list:
"""根据路径查找进程PID"""
"""
根据路径查找进程PID
:param path: 进程路径
:return: 匹配的进程PID列表
"""
logger.info(f"开始查找进程 PID: {path}", module="系统服务")
pids = []
for proc in psutil.process_iter(["pid", "exe"]):