diff --git a/app/api/api.py b/app/api/api.py
deleted file mode 100644
index 54c2e7f..0000000
--- a/app/api/api.py
+++ /dev/null
@@ -1,343 +0,0 @@
-import uuid
-
-from fastapi import FastAPI, HTTPException, Path, Body
-from pydantic import BaseModel, Field
-from typing import Dict, Any, List, Optional, Literal
-from datetime import datetime
-
-from fastapi.middleware.cors import CORSMiddleware
-
-from app.core import Config
-from app.utils import get_logger
-
-
-# 此文件由ai生成 返回值非最终版本
-
-
-# ======================
-# Data Models
-# ======================
-
-
-# Script Models
-
-
-class ScriptUser(BaseModel):
- userId: str
- config: Dict[str, Any] = {}
-
-
-# Plan Models
-class PlanDayConfig(BaseModel):
- 吃理智药: int
- 连战次数: str
- 关卡选择: str
- 备选_1: str
- 备选_2: str
- 备选_3: str
- 剩余理智: str
-
-
-class PlanDetails(BaseModel):
- 周一: PlanDayConfig
- 周二: PlanDayConfig
- 周三: PlanDayConfig
- 周四: PlanDayConfig
- 周五: PlanDayConfig
- 周六: PlanDayConfig
- 周日: PlanDayConfig
-
-
-class PlanCreate(BaseModel):
- name: str
- mode: str # "全局" or "周计划"
- details: PlanDetails
-
-
-class PlanUpdate(BaseModel):
- name: Optional[str] = None
- mode: Optional[str] = None
- details: Optional[PlanDetails] = None
-
-
-class PlanModeUpdate(BaseModel):
- mode: str # "全局" or "周计划"
-
-
-# Queue Models
-class QueueCreate(BaseModel):
- name: str
- scripts: List[str]
- schedule: str
- description: Optional[str] = None
-
-
-class QueueUpdate(BaseModel):
- name: Optional[str] = None
- scripts: Optional[List[str]] = None
- schedule: Optional[str] = None
- description: Optional[str] = None
-
-
-# Task Models
-class TaskCreate(BaseModel):
- name: str
- scriptId: str
- planId: str
- queueId: Optional[str] = None
- priority: int = 0
- parameters: Dict[str, Any] = {}
-
-
-# Settings Models
-class SettingsUpdate(BaseModel):
- key: str
- value: Any
-
-
-# ======================
-# API Endpoints
-# ======================
-
-
-# @app.get("/api/activity/latest", summary="获取最新活动内容")
-# async def get_latest_activity():
-# """
-# 获取最新活动内容
-# """
-# # 实现获取最新活动的逻辑
-# return {"status": "success", "data": {}}
-
-
-# @app.post("/api/scripts/{scriptId}/users", summary="为脚本添加用户")
-# async def add_script_user(
-# scriptId: str = Path(..., description="脚本ID"), user: ScriptUser = Body(...)
-# ):
-# """
-# 为脚本添加用户
-# """
-# # 实现为脚本添加用户的逻辑
-# return {"status": "success"}
-
-
-# @app.get("/api/scripts/{scriptId}/users", summary="查询脚本的所有下属用户")
-# async def get_script_users(scriptId: str = Path(..., description="脚本ID")):
-# """
-# 查询脚本的所有下属用户
-# """
-# # 实现查询脚本的所有下属用户的逻辑
-# return {"status": "success", "data": []}
-
-
-# @app.get("/api/scripts/{scriptId}/users/{userId}", summary="查询脚本下的单个下属用户")
-# async def get_script_user(
-# scriptId: str = Path(..., description="脚本ID"),
-# userId: str = Path(..., description="用户ID"),
-# ):
-# """
-# 查询脚本下的单个下属用户
-# """
-# # 实现查询脚本下的单个下属用户的逻辑
-# return {"status": "success", "data": {}}
-
-
-# @app.put("/api/scripts/{scriptId}/users/{userId}", summary="更新脚本下属用户的关联信息")
-# async def update_script_user(
-# scriptId: str = Path(..., description="脚本ID"),
-# userId: str = Path(..., description="用户ID"),
-# config: Dict[str, Any] = Body(...),
-# ):
-# """
-# 更新脚本下属用户的关联信息
-# """
-# # 实现更新脚本下属用户的关联信息的逻辑
-# return {"status": "success"}
-
-
-# @app.delete("/api/scripts/{scriptId}/users/{userId}", summary="从脚本移除用户")
-# async def remove_script_user(
-# scriptId: str = Path(..., description="脚本ID"),
-# userId: str = Path(..., description="用户ID"),
-# ):
-# """
-# 从脚本移除用户
-# """
-# # 实现从脚本移除用户的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/add/plans", summary="创建计划")
-# async def add_plan(plan: PlanCreate = Body(...)):
-# """
-# 创建计划
-# {
-# "name": "计划 1",
-# "mode": "全局", // 或 "周计划"
-# "details": {
-# "周一": {
-# "吃理智药": 0,
-# "连战次数": "AUTO",
-# "关卡选择": "当前/上次",
-# "备选-1": "当前/上次",
-# "备选-2": "当前/上次",
-# "备选-3": "当前/上次",
-# "剩余理智": "不使用"
-# },
-# // 其他天数...
-# }
-# }
-# """
-# # 实现创建计划的逻辑
-# return {"status": "success", "planId": "new_plan_id"}
-
-
-# @app.post("/api/get/plans", summary="查询所有计划")
-# async def get_plans():
-# """
-# 查询所有计划
-# """
-# # 实现查询所有计划的逻辑
-# return {"status": "success", "data": []}
-
-
-# @app.post("/api/get/plans/{planId}", summary="查询单个计划")
-# async def get_plan(planId: str = Path(..., description="计划ID")):
-# """
-# 查询单个计划
-# """
-# # 实现查询单个计划的逻辑
-# return {"status": "success", "data": {}}
-
-
-# @app.post("/api/update/plans/{planId}", summary="更新计划")
-# async def update_plan(
-# planId: str = Path(..., description="计划ID"), update_data: PlanUpdate = Body(...)
-# ):
-# """
-# 更新计划
-# """
-# # 实现更新计划的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/delete/plans/{planId}", summary="删除计划")
-# async def delete_plan(planId: str = Path(..., description="计划ID")):
-# """
-# 删除计划
-# """
-# # 实现删除计划的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/update/plans/{planId}/mode", summary="切换计划模式")
-# async def update_plan_mode(
-# planId: str = Path(..., description="计划ID"), mode_data: PlanModeUpdate = Body(...)
-# ):
-# """
-# 切换计划模式
-# {
-# "mode": "周计划"
-# }
-# """
-# # 实现切换计划模式的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/add/queues", summary="创建调度队列")
-# async def add_queue(queue: QueueCreate = Body(...)):
-# """
-# 创建调度队列
-# """
-# # 实现创建调度队列的逻辑
-# return {"status": "success", "queueId": "new_queue_id"}
-
-
-# @app.post("/api/get/queues", summary="查询所有调度队列")
-# async def get_queues():
-# """
-# 查询所有调度队列
-# """
-# # 实现查询所有调度队列的逻辑
-# return {"status": "success", "data": []}
-
-
-# @app.post("/api/get/queues/{queueId}", summary="查询单个调度队列详情")
-# async def get_queue(queueId: str = Path(..., description="调度队列ID")):
-# """
-# 查询单个调度队列详情
-# """
-# # 实现查询单个调度队列详情的逻辑
-# return {"status": "success", "data": {}}
-
-
-# @app.post("/api/update/queues/{queueId}", summary="更新调度队列")
-# async def update_queue(
-# queueId: str = Path(..., description="调度队列ID"),
-# update_data: QueueUpdate = Body(...),
-# ):
-# """
-# 更新调度队列
-# """
-# # 实现更新调度队列的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/delete/queues/{queueId}", summary="删除调度队列")
-# async def delete_queue(queueId: str = Path(..., description="调度队列ID")):
-# """
-# 删除调度队列
-# """
-# # 实现删除调度队列的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/add/tasks", summary="添加任务")
-# async def add_task(task: TaskCreate = Body(...)):
-# """
-# 添加任务
-# """
-# # 实现添加任务的逻辑
-# return {"status": "success", "taskId": "new_task_id"}
-
-
-# @app.post("/api/tasks/{taskId}/start", summary="开始任务")
-# async def start_task(taskId: str = Path(..., description="任务ID")):
-# """
-# 开始任务
-# """
-# # 实现开始任务的逻辑
-# return {"status": "success"}
-
-
-# @app.post("/api/get/history", summary="查询历史记录")
-# async def get_history():
-# """
-# 查询历史记录
-# """
-# # 实现查询历史记录的逻辑
-# return {"status": "success", "data": []}
-
-
-# @app.post("/api/update/settings", summary="更新部分设置")
-# async def update_settings(settings: SettingsUpdate = Body(...)):
-# """
-# 更新部分设置
-# """
-# # 实现更新部分设置的逻辑
-# return {"status": "success"}
-
-
-# # ======================
-# # Error Handlers
-# # ======================
-
-
-# @app.exception_handler(HTTPException)
-# async def http_exception_handler(request, exc):
-# return {"status": "error", "code": exc.status_code, "message": exc.detail}
-
-
-# if __name__ == "__main__":
-# import uvicorn
-
-# uvicorn.run(app, host="0.0.0.0", port=8000)
diff --git a/app/core/config.py b/app/core/config.py
index 3c6dae6..c695e0f 100644
--- a/app/core/config.py
+++ b/app/core/config.py
@@ -59,9 +59,6 @@ class GlobalConfig(ConfigBase):
)
Function_IfSilence = ConfigItem("Function", "IfSilence", False, BoolValidator())
Function_BossKey = ConfigItem("Function", "BossKey", "")
- Function_UnattendedMode = ConfigItem(
- "Function", "UnattendedMode", False, BoolValidator()
- )
Function_IfAgreeBilibili = ConfigItem(
"Function", "IfAgreeBilibili", False, BoolValidator()
)
@@ -95,7 +92,9 @@ class GlobalConfig(ConfigBase):
Notify_IfPushPlyer = ConfigItem("Notify", "IfPushPlyer", False, BoolValidator())
Notify_IfSendMail = ConfigItem("Notify", "IfSendMail", False, BoolValidator())
Notify_SMTPServerAddress = ConfigItem("Notify", "SMTPServerAddress", "")
- Notify_AuthorizationCode = ConfigItem("Notify", "AuthorizationCode", "")
+ Notify_AuthorizationCode = ConfigItem(
+ "Notify", "AuthorizationCode", "", EncryptValidator()
+ )
Notify_FromAddress = ConfigItem("Notify", "FromAddress", "")
Notify_ToAddress = ConfigItem("Notify", "ToAddress", "")
Notify_IfServerChan = ConfigItem("Notify", "IfServerChan", False, BoolValidator())
@@ -114,7 +113,9 @@ class GlobalConfig(ConfigBase):
Update_ThreadNumb = ConfigItem("Update", "ThreadNumb", 8, RangeValidator(1, 32))
Update_ProxyAddress = ConfigItem("Update", "ProxyAddress", "")
Update_ProxyUrlList = ConfigItem("Update", "ProxyUrlList", [])
- Update_MirrorChyanCDK = ConfigItem("Update", "MirrorChyanCDK", "")
+ Update_MirrorChyanCDK = ConfigItem(
+ "Update", "MirrorChyanCDK", "", EncryptValidator()
+ )
class QueueItem(ConfigBase):
@@ -123,7 +124,7 @@ class QueueItem(ConfigBase):
def __init__(self) -> None:
super().__init__()
- self.Info_ScriptId = ConfigItem("Info", "ScriptId", "", UidValidator())
+ self.Info_ScriptId = ConfigItem("Info", "ScriptId", None, UidValidator())
class TimeSet(ConfigBase):
@@ -221,7 +222,7 @@ class MaaUserConfig(ConfigBase):
"Normal",
OptionsValidator(["Normal", "Rotation", "Custom"]),
)
- self.Info_Password = ConfigItem("Info", "Password", "")
+ self.Info_Password = ConfigItem("Info", "Password", "", EncryptValidator())
self.Info_Notes = ConfigItem("Info", "Notes", "无")
self.Info_MedicineNumb = ConfigItem(
"Info", "MedicineNumb", 0, RangeValidator(0, 1024)
diff --git a/app/main.py b/app/main.py
index d7b5c6f..30a524b 100644
--- a/app/main.py
+++ b/app/main.py
@@ -37,7 +37,7 @@ def is_admin() -> bool:
return False
-@logger.catch
+# @logger.catch
def main():
if is_admin():
diff --git a/app/models/ConfigBase.py b/app/models/ConfigBase.py
index c79b977..6c8d363 100644
--- a/app/models/ConfigBase.py
+++ b/app/models/ConfigBase.py
@@ -22,11 +22,13 @@
import json
import uuid
+import win32com.client
from copy import deepcopy
-from enum import Enum
from pathlib import Path
-from typing import List, Callable, Any, Dict, Union
-from urllib.parse import urlparse
+from typing import List, Any, Dict, Union
+
+
+from utils import dpapi_encrypt, dpapi_decrypt
class ConfigValidator:
@@ -49,46 +51,66 @@ class RangeValidator(ConfigValidator):
self.max = max
self.range = (min, max)
- def validate(self, value: int | float) -> bool:
+ def validate(self, value: Any) -> bool:
+ if not isinstance(value, (int | float)):
+ return False
return self.min <= value <= self.max
- def correct(self, value: int | float) -> int | float:
+ def correct(self, value: Any) -> int | float:
+ if not isinstance(value, (int, float)):
+ try:
+ value = float(value)
+ except TypeError:
+ return self.min
return min(max(self.min, value), self.max)
class OptionsValidator(ConfigValidator):
"""选项验证器"""
- def __init__(self, options):
+ def __init__(self, options: list):
if not options:
raise ValueError("The `options` can't be empty.")
- if isinstance(options, Enum):
- options = options._member_map_.values()
-
- self.options = list(options)
+ self.options = options
def validate(self, value: Any) -> bool:
return value in self.options
- def correct(self, value):
+ def correct(self, value: Any) -> Any:
return value if self.validate(value) else self.options[0]
class UidValidator(ConfigValidator):
"""UID验证器"""
- def validate(self, value: str) -> bool:
- if value == "":
+ def validate(self, value: Any) -> bool:
+ if value is None:
return True
try:
uuid.UUID(value)
return True
- except ValueError:
+ except (TypeError, ValueError):
return False
- def correct(self, value: str) -> str:
- return ""
+ def correct(self, value: Any) -> Any:
+ return value if self.validate(value) else None
+
+
+class EncryptValidator(ConfigValidator):
+ """加数据验证器"""
+
+ def validate(self, value: Any) -> bool:
+ if not isinstance(value, str):
+ return False
+ try:
+ dpapi_decrypt(value)
+ return True
+ except:
+ return False
+
+ def correct(self, value: Any) -> Any:
+ return value if self.validate(value) else dpapi_encrypt("数据损坏,请重新设置")
class BoolValidator(OptionsValidator):
@@ -101,40 +123,44 @@ class BoolValidator(OptionsValidator):
class FileValidator(ConfigValidator):
"""文件路径验证器"""
- def validate(self, value):
- return Path(value).exists()
+ def validate(self, value: Any) -> bool:
+ if not isinstance(value, str):
+ return False
+ if not Path(value).is_absolute():
+ return False
+ if Path(value).suffix == ".lnk":
+ return False
+ return True
- def correct(self, value):
- path = Path(value)
- return str(path.absolute()).replace("\\", "/")
+ def correct(self, value: Any) -> str:
+ if not isinstance(value, str):
+ value = "."
+ if not Path(value).is_absolute():
+ value = Path(value).resolve().as_posix()
+ if Path(value).suffix == ".lnk":
+ try:
+ shell = win32com.client.Dispatch("WScript.Shell")
+ shortcut = shell.CreateShortcut(value)
+ value = shortcut.TargetPath
+ except:
+ pass
+ return Path(value).resolve().as_posix()
class FolderValidator(ConfigValidator):
"""文件夹路径验证器"""
- def validate(self, value):
- return Path(value).exists()
+ def validate(self, value: Any) -> bool:
+ if not isinstance(value, str):
+ return False
+ if not Path(value).is_absolute():
+ return False
+ return True
- def correct(self, value):
- path = Path(value)
- path.mkdir(exist_ok=True, parents=True)
- return str(path.absolute()).replace("\\", "/")
-
-
-class FolderListValidator(ConfigValidator):
- """文件夹列表验证器"""
-
- def validate(self, value):
- return all(Path(i).exists() for i in value)
-
- def correct(self, value: List[str]):
- folders = []
- for folder in value:
- path = Path(folder)
- if path.exists():
- folders.append(str(path.absolute()).replace("\\", "/"))
-
- return folders
+ def correct(self, value: Any) -> str:
+ if not isinstance(value, str):
+ value = "."
+ return Path(value).resolve().as_posix()
class ConfigItem:
@@ -144,7 +170,7 @@ class ConfigItem:
self,
group: str,
name: str,
- default: object,
+ default: Any,
validator: None | ConfigValidator = None,
):
"""
@@ -165,11 +191,9 @@ class ConfigItem:
super().__init__()
self.group = group
self.name = name
- self.value: Any = None
+ self.value: Any = default
self.validator = validator or ConfigValidator()
- self.setValue(default)
-
def setValue(self, value: Any):
"""
设置配置项值,将自动进行验证和修正
@@ -180,7 +204,11 @@ class ConfigItem:
要设置的值,可以是任何合法类型
"""
- if self.value == value:
+ if (
+ dpapi_decrypt(self.value)
+ if isinstance(self.validator, EncryptValidator)
+ else self.value
+ ) == value:
return
# deepcopy new value
@@ -189,9 +217,21 @@ class ConfigItem:
except:
self.value = value
+ if isinstance(self.validator, EncryptValidator):
+ self.value = dpapi_encrypt(self.value)
+
if not self.validator.validate(self.value):
self.value = self.validator.correct(self.value)
+ def getValue(self) -> Any:
+ """
+ 获取配置项值
+ """
+
+ if isinstance(self.validator, EncryptValidator):
+ return dpapi_decrypt(self.value)
+ return self.value
+
class ConfigBase:
"""
@@ -271,7 +311,9 @@ class ConfigBase:
if self.file:
await self.save()
- async def toDict(self, ignore_multi_config: bool = False) -> Dict[str, Any]:
+ async def toDict(
+ self, ignore_multi_config: bool = False, if_decrypt: bool = True
+ ) -> Dict[str, Any]:
"""将配置项转换为字典"""
data = {}
@@ -283,7 +325,9 @@ class ConfigBase:
if not data.get(item.group):
data[item.group] = {}
if item.name:
- data[item.group][item.name] = item.value
+ data[item.group][item.name] = (
+ item.getValue() if if_decrypt else item.value
+ )
elif not ignore_multi_config and isinstance(item, MultipleConfig):
@@ -301,7 +345,7 @@ class ConfigBase:
configItem = getattr(self, f"{group}_{name}")
if isinstance(configItem, ConfigItem):
- return configItem.value
+ return configItem.getValue()
else:
raise TypeError(
f"Config item '{group}_{name}' is not a ConfigItem instance."
@@ -345,7 +389,7 @@ class ConfigBase:
self.file.parent.mkdir(parents=True, exist_ok=True)
with self.file.open("w", encoding="utf-8") as f:
json.dump(
- await self.toDict(not self.if_save_multi_config),
+ await self.toDict(not self.if_save_multi_config, if_decrypt=False),
f,
ensure_ascii=False,
indent=4,
diff --git a/app/services/__init__.py b/app/services/__init__.py
new file mode 100644
index 0000000..546e9e9
--- /dev/null
+++ b/app/services/__init__.py
@@ -0,0 +1,37 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+"""
+AUTO_MAA
+AUTO_MAA服务包
+v4.4
+作者:DLmaster_361
+"""
+
+__version__ = "4.2.0"
+__author__ = "DLmaster361 "
+__license__ = "GPL-3.0 license"
+
+from .notification import Notify
+from ..utils.security import Crypto
+from .system import System
+from ..task.skland import skland_sign_in
+
+__all__ = ["Notify", "Crypto", "System", "skland_sign_in"]
diff --git a/app/services/notification.py b/app/services/notification.py
new file mode 100644
index 0000000..0ce78fd
--- /dev/null
+++ b/app/services/notification.py
@@ -0,0 +1,484 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+"""
+AUTO_MAA
+AUTO_MAA通知服务
+v4.4
+作者:DLmaster_361
+"""
+
+import re
+import smtplib
+import time
+from email.header import Header
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.utils import formataddr
+from pathlib import Path
+from typing import Union
+
+import requests
+from PySide6.QtCore import QObject, Signal
+
+from plyer import notification
+
+from app.core import Config, logger
+from app.utils.security import Crypto
+from app.utils.ImageUtils import ImageUtils
+
+
+class Notification(QObject):
+
+ push_info_bar = Signal(str, str, str, int)
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+
+ def push_plyer(self, title, message, ticker, t) -> bool:
+ """
+ 推送系统通知
+
+ :param title: 通知标题
+ :param message: 通知内容
+ :param ticker: 通知横幅
+ :param t: 通知持续时间
+ :return: bool
+ """
+
+ if Config.get(Config.notify_IfPushPlyer):
+
+ logger.info(f"推送系统通知:{title}", module="通知服务")
+
+ notification.notify(
+ title=title,
+ message=message,
+ app_name="AUTO_MAA",
+ app_icon=str(Config.app_path / "resources/icons/AUTO_MAA.ico"),
+ timeout=t,
+ ticker=ticker,
+ toast=True,
+ )
+
+ return True
+
+ def send_mail(self, mode, title, content, to_address) -> None:
+ """
+ 推送邮件通知
+
+ :param mode: 邮件内容模式,支持 "文本" 和 "网页"
+ :param title: 邮件标题
+ :param content: 邮件内容
+ :param to_address: 收件人地址
+ """
+
+ if (
+ Config.get(Config.notify_SMTPServerAddress) == ""
+ or Config.get(Config.notify_AuthorizationCode) == ""
+ or not bool(
+ re.match(
+ r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
+ Config.get(Config.notify_FromAddress),
+ )
+ )
+ or not bool(
+ re.match(
+ r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
+ to_address,
+ )
+ )
+ ):
+ logger.error(
+ "请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
+ module="通知服务",
+ )
+ self.push_info_bar.emit(
+ "error",
+ "邮件通知推送异常",
+ "请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
+ -1,
+ )
+ return None
+
+ try:
+ # 定义邮件正文
+ if mode == "文本":
+ message = MIMEText(content, "plain", "utf-8")
+ elif mode == "网页":
+ message = MIMEMultipart("alternative")
+ message["From"] = formataddr(
+ (
+ Header("AUTO_MAA通知服务", "utf-8").encode(),
+ Config.get(Config.notify_FromAddress),
+ )
+ ) # 发件人显示的名字
+ message["To"] = formataddr(
+ (Header("AUTO_MAA用户", "utf-8").encode(), to_address)
+ ) # 收件人显示的名字
+ message["Subject"] = Header(title, "utf-8")
+
+ if mode == "网页":
+ message.attach(MIMEText(content, "html", "utf-8"))
+
+ smtpObj = smtplib.SMTP_SSL(Config.get(Config.notify_SMTPServerAddress), 465)
+ smtpObj.login(
+ Config.get(Config.notify_FromAddress),
+ Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
+ )
+ smtpObj.sendmail(
+ Config.get(Config.notify_FromAddress), to_address, message.as_string()
+ )
+ smtpObj.quit()
+ logger.success(f"邮件发送成功:{title}", module="通知服务")
+ except Exception as e:
+ logger.exception(f"发送邮件时出错:{e}", module="通知服务")
+ self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
+
+ def ServerChanPush(
+ self, title, content, send_key, tag, channel
+ ) -> Union[bool, str]:
+ """
+ 使用Server酱推送通知
+
+ :param title: 通知标题
+ :param content: 通知内容
+ :param send_key: Server酱的SendKey
+ :param tag: 通知标签
+ :param channel: 通知频道
+ :return: bool or str
+ """
+
+ if not send_key:
+ logger.error("请正确设置Server酱的SendKey", module="通知服务")
+ self.push_info_bar.emit(
+ "error", "Server酱通知推送异常", "请正确设置Server酱的SendKey", -1
+ )
+ return None
+
+ try:
+ # 构造 URL
+ if send_key.startswith("sctp"):
+ match = re.match(r"^sctp(\d+)t", send_key)
+ if match:
+ url = f"https://{match.group(1)}.push.ft07.com/send/{send_key}.send"
+ else:
+ raise ValueError("SendKey 格式错误(sctp)")
+ else:
+ url = f"https://sctapi.ftqq.com/{send_key}.send"
+
+ # 构建 tags 和 channel
+ def is_valid(s):
+ return s == "" or (
+ s == "|".join(s.split("|"))
+ and (s.count("|") == 0 or all(s.split("|")))
+ )
+
+ tags = "|".join(_.strip() for _ in tag.split("|"))
+ channels = "|".join(_.strip() for _ in channel.split("|"))
+
+ options = {}
+ if is_valid(tags):
+ options["tags"] = tags
+ else:
+ logger.warning("Server酱 Tag 配置不正确,将被忽略", module="通知服务")
+ self.push_info_bar.emit(
+ "warning",
+ "Server酱通知推送异常",
+ "请正确设置 ServerChan 的 Tag",
+ -1,
+ )
+
+ if is_valid(channels):
+ options["channel"] = channels
+ else:
+ logger.warning(
+ "Server酱 Channel 配置不正确,将被忽略", module="通知服务"
+ )
+ self.push_info_bar.emit(
+ "warning",
+ "Server酱通知推送异常",
+ "请正确设置 ServerChan 的 Channel",
+ -1,
+ )
+
+ # 请求发送
+ params = {"title": title, "desp": content, **options}
+ headers = {"Content-Type": "application/json;charset=utf-8"}
+
+ response = requests.post(
+ url,
+ json=params,
+ headers=headers,
+ timeout=10,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ )
+ result = response.json()
+
+ if result.get("code") == 0:
+ logger.success(f"Server酱推送通知成功:{title}", module="通知服务")
+ return True
+ else:
+ error_code = result.get("code", "-1")
+ logger.exception(
+ f"Server酱通知推送失败:响应码:{error_code}", module="通知服务"
+ )
+ self.push_info_bar.emit(
+ "error", "Server酱通知推送失败", f"响应码:{error_code}", -1
+ )
+ return f"Server酱通知推送失败:{error_code}"
+
+ except Exception as e:
+ logger.exception(f"Server酱通知推送异常:{e}", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "Server酱通知推送异常",
+ "请检查相关设置和网络连接。如全部配置正确,请稍后再试。",
+ -1,
+ )
+ return f"Server酱通知推送异常:{str(e)}"
+
+ def CompanyWebHookBotPush(self, title, content, webhook_url) -> Union[bool, str]:
+ """
+ 使用企业微信群机器人推送通知
+
+ :param title: 通知标题
+ :param content: 通知内容
+ :param webhook_url: 企业微信群机器人的WebHook地址
+ :return: bool or str
+ """
+
+ if webhook_url == "":
+ logger.error("请正确设置企业微信群机器人的WebHook地址", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送异常",
+ "请正确设置企业微信群机器人的WebHook地址",
+ -1,
+ )
+ return None
+
+ content = f"{title}\n{content}"
+ data = {"msgtype": "text", "text": {"content": content}}
+
+ for _ in range(3):
+ try:
+ response = requests.post(
+ url=webhook_url,
+ json=data,
+ timeout=10,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ )
+ info = response.json()
+ break
+ except Exception as e:
+ err = e
+ time.sleep(0.1)
+ else:
+ logger.error(f"推送企业微信群机器人时出错:{err}", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送失败",
+ f"使用企业微信群机器人推送通知时出错:{err}",
+ -1,
+ )
+ return None
+
+ if info["errcode"] == 0:
+ logger.success(f"企业微信群机器人推送通知成功:{title}", module="通知服务")
+ return True
+ else:
+ logger.error(f"企业微信群机器人推送通知失败:{info}", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送失败",
+ f"使用企业微信群机器人推送通知时出错:{err}",
+ -1,
+ )
+ return f"使用企业微信群机器人推送通知时出错:{err}"
+
+ def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> bool:
+ """
+ 使用企业微信群机器人推送图片通知
+
+ :param image_path: 图片文件路径
+ :param webhook_url: 企业微信群机器人的WebHook地址
+ :return: bool
+ """
+
+ try:
+ # 压缩图片
+ ImageUtils.compress_image_if_needed(image_path)
+
+ # 检查图片是否存在
+ if not image_path.exists():
+ logger.error(
+ "图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确",
+ module="通知服务",
+ )
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送异常",
+ "图片不存在或者压缩失败,请检查图片路径是否正确",
+ -1,
+ )
+ return False
+
+ if not webhook_url:
+ logger.error(
+ "请正确设置企业微信群机器人的WebHook地址", module="通知服务"
+ )
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送异常",
+ "请正确设置企业微信群机器人的WebHook地址",
+ -1,
+ )
+ return False
+
+ # 获取图片base64和md5
+ try:
+ image_base64 = ImageUtils.get_base64_from_file(str(image_path))
+ image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
+ except Exception as e:
+ logger.exception(f"图片编码或MD5计算失败:{e}", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人通知推送异常",
+ f"图片编码或MD5计算失败:{e}",
+ -1,
+ )
+ return False
+
+ data = {
+ "msgtype": "image",
+ "image": {"base64": image_base64, "md5": image_md5},
+ }
+
+ for _ in range(3):
+ try:
+ response = requests.post(
+ url=webhook_url,
+ json=data,
+ timeout=10,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ )
+ info = response.json()
+ break
+ except requests.RequestException as e:
+ err = e
+ logger.exception(
+ f"推送企业微信群机器人图片第{_+1}次失败:{e}", module="通知服务"
+ )
+ time.sleep(0.1)
+ else:
+ logger.error("推送企业微信群机器人图片时出错", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人图片推送失败",
+ f"使用企业微信群机器人推送图片时出错:{err}",
+ -1,
+ )
+ return False
+
+ if info.get("errcode") == 0:
+ logger.success(
+ f"企业微信群机器人推送图片成功:{image_path.name}",
+ module="通知服务",
+ )
+ return True
+ else:
+ logger.error(f"企业微信群机器人推送图片失败:{info}", module="通知服务")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人图片推送失败",
+ f"使用企业微信群机器人推送图片时出错:{info}",
+ -1,
+ )
+ return False
+
+ except Exception as e:
+ logger.error(f"推送企业微信群机器人图片时发生未知异常:{e}")
+ self.push_info_bar.emit(
+ "error",
+ "企业微信群机器人图片推送失败",
+ f"发生未知异常:{e}",
+ -1,
+ )
+ return False
+
+ def send_test_notification(self):
+ """发送测试通知到所有已启用的通知渠道"""
+
+ logger.info("发送测试通知到所有已启用的通知渠道", module="通知服务")
+
+ # 发送系统通知
+ self.push_plyer(
+ "测试通知",
+ "这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
+ "测试通知",
+ 3,
+ )
+
+ # 发送邮件通知
+ if Config.get(Config.notify_IfSendMail):
+ self.send_mail(
+ "文本",
+ "AUTO_MAA测试通知",
+ "这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
+ Config.get(Config.notify_ToAddress),
+ )
+
+ # 发送Server酱通知
+ if Config.get(Config.notify_IfServerChan):
+ self.ServerChanPush(
+ "AUTO_MAA测试通知",
+ "这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
+ Config.get(Config.notify_ServerChanKey),
+ Config.get(Config.notify_ServerChanTag),
+ Config.get(Config.notify_ServerChanChannel),
+ )
+
+ # 发送企业微信机器人通知
+ if Config.get(Config.notify_IfCompanyWebHookBot):
+ self.CompanyWebHookBotPush(
+ "AUTO_MAA测试通知",
+ "这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
+ Config.get(Config.notify_CompanyWebHookBotUrl),
+ )
+ Notify.CompanyWebHookBotPushImage(
+ Config.app_path / "resources/images/notification/test_notify.png",
+ Config.get(Config.notify_CompanyWebHookBotUrl),
+ )
+
+ logger.info("测试通知发送完成", module="通知服务")
+
+ return True
+
+
+Notify = Notification()
diff --git a/app/services/system.py b/app/services/system.py
new file mode 100644
index 0000000..7e2ef90
--- /dev/null
+++ b/app/services/system.py
@@ -0,0 +1,352 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+"""
+AUTO_MAA
+AUTO_MAA系统服务
+v4.4
+作者:DLmaster_361
+"""
+
+from PySide6.QtWidgets import QApplication
+import sys
+import ctypes
+import win32gui
+import win32process
+import psutil
+import subprocess
+import tempfile
+import getpass
+from datetime import datetime
+from pathlib import Path
+
+from app.core import Config
+
+
+class _SystemHandler:
+
+ ES_CONTINUOUS = 0x80000000
+ ES_SYSTEM_REQUIRED = 0x00000001
+
+ def __init__(self):
+
+ self.set_Sleep()
+ self.set_SelfStart()
+
+ def set_Sleep(self) -> None:
+ """同步系统休眠状态"""
+
+ if Config.get(Config.function_IfAllowSleep):
+ # 设置系统电源状态
+ ctypes.windll.kernel32.SetThreadExecutionState(
+ self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
+ )
+ else:
+ # 恢复系统电源状态
+ ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)
+
+ def set_SelfStart(self) -> None:
+ """同步开机自启"""
+
+ if Config.get(Config.start_IfSelfStart) and not self.is_startup():
+
+ # 创建任务计划
+ try:
+
+ # 获取当前用户和时间
+ current_user = getpass.getuser()
+ current_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
+
+ # XML 模板
+ xml_content = f"""
+
+
+ {current_time}
+ {current_user}
+ AUTO_MAA自启动服务
+ \\AUTO_MAA_AutoStart
+
+
+
+ {current_time}
+ true
+
+
+
+
+ InteractiveToken
+ HighestAvailable
+
+
+
+ IgnoreNew
+ false
+ false
+ false
+ true
+ false
+
+ false
+ false
+
+ true
+ true
+ false
+ false
+ false
+ PT0S
+ 7
+
+
+
+ "{Config.app_path_sys}"
+
+
+ """
+
+ # 创建临时 XML 文件并执行
+ with tempfile.NamedTemporaryFile(
+ mode="w", suffix=".xml", delete=False, encoding="utf-16"
+ ) as f:
+ f.write(xml_content)
+ xml_file = f.name
+
+ try:
+ result = subprocess.run(
+ [
+ "schtasks",
+ "/create",
+ "/tn",
+ "AUTO_MAA_AutoStart",
+ "/xml",
+ xml_file,
+ "/f",
+ ],
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ stdin=subprocess.DEVNULL,
+ capture_output=True,
+ text=True,
+ )
+
+ if result.returncode == 0:
+ logger.success(
+ f"程序自启动任务计划已创建: {Config.app_path_sys}",
+ module="系统服务",
+ )
+ else:
+ logger.error(
+ f"程序自启动任务计划创建失败: {result.stderr}",
+ module="系统服务",
+ )
+
+ finally:
+ # 删除临时文件
+ try:
+ Path(xml_file).unlink()
+ except:
+ pass
+
+ except Exception as e:
+ logger.exception(f"程序自启动任务计划创建失败: {e}", module="系统服务")
+
+ elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
+
+ try:
+
+ result = subprocess.run(
+ ["schtasks", "/delete", "/tn", "AUTO_MAA_AutoStart", "/f"],
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ stdin=subprocess.DEVNULL,
+ capture_output=True,
+ text=True,
+ )
+
+ if result.returncode == 0:
+ logger.success("程序自启动任务计划已删除", module="系统服务")
+ else:
+ logger.error(
+ f"程序自启动任务计划删除失败: {result.stderr}",
+ module="系统服务",
+ )
+
+ except Exception as e:
+ logger.exception(f"程序自启动任务计划删除失败: {e}", module="系统服务")
+
+ def set_power(self, mode) -> None:
+ """
+ 执行系统电源操作
+
+ :param mode: 电源操作模式,支持 "NoAction", "Shutdown", "Hibernate", "Sleep", "KillSelf", "ShutdownForce"
+ """
+
+ if sys.platform.startswith("win"):
+
+ if mode == "NoAction":
+
+ logger.info("不执行系统电源操作", module="系统服务")
+
+ elif mode == "Shutdown":
+
+ self.kill_emulator_processes()
+ logger.info("执行关机操作", module="系统服务")
+ subprocess.run(["shutdown", "/s", "/t", "0"])
+
+ elif mode == "ShutdownForce":
+ logger.info("执行强制关机操作", module="系统服务")
+ subprocess.run(["shutdown", "/s", "/t", "0", "/f"])
+
+ elif mode == "Hibernate":
+
+ logger.info("执行休眠操作", module="系统服务")
+ subprocess.run(["shutdown", "/h"])
+
+ elif mode == "Sleep":
+
+ logger.info("执行睡眠操作", module="系统服务")
+ subprocess.run(
+ ["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"]
+ )
+
+ elif mode == "KillSelf":
+
+ logger.info("执行退出主程序操作", module="系统服务")
+ Config.main_window.close()
+ QApplication.quit()
+ sys.exit(0)
+
+ elif sys.platform.startswith("linux"):
+
+ if mode == "NoAction":
+
+ logger.info("不执行系统电源操作", module="系统服务")
+
+ elif mode == "Shutdown":
+
+ logger.info("执行关机操作", module="系统服务")
+ subprocess.run(["shutdown", "-h", "now"])
+
+ elif mode == "Hibernate":
+
+ logger.info("执行休眠操作", module="系统服务")
+ subprocess.run(["systemctl", "hibernate"])
+
+ elif mode == "Sleep":
+
+ logger.info("执行睡眠操作", module="系统服务")
+ subprocess.run(["systemctl", "suspend"])
+
+ elif mode == "KillSelf":
+
+ logger.info("执行退出主程序操作", module="系统服务")
+ Config.main_window.close()
+ QApplication.quit()
+ sys.exit(0)
+
+ def kill_emulator_processes(self):
+ """这里暂时仅支持 MuMu 模拟器"""
+
+ logger.info("正在清除模拟器进程", module="系统服务")
+
+ keywords = ["Nemu", "nemu", "emulator", "MuMu"]
+ for proc in psutil.process_iter(["pid", "name"]):
+ try:
+ pname = proc.info["name"].lower()
+ if any(keyword.lower() in pname for keyword in keywords):
+ proc.kill()
+ logger.info(
+ f"已关闭 MuMu 模拟器进程: {proc.info['name']}",
+ module="系统服务",
+ )
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ continue
+
+ logger.success("模拟器进程清除完成", module="系统服务")
+
+ def is_startup(self) -> bool:
+ """判断程序是否已经开机自启"""
+
+ try:
+ result = subprocess.run(
+ ["schtasks", "/query", "/tn", "AUTO_MAA_AutoStart"],
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ stdin=subprocess.DEVNULL,
+ capture_output=True,
+ text=True,
+ )
+ return result.returncode == 0
+ except Exception as e:
+ logger.exception(f"检查任务计划程序失败: {e}", module="系统服务")
+ return False
+
+ def get_window_info(self) -> list:
+ """获取当前前台窗口信息"""
+
+ def callback(hwnd, window_info):
+ if win32gui.IsWindowVisible(hwnd) and win32gui.GetWindowText(hwnd):
+ _, pid = win32process.GetWindowThreadProcessId(hwnd)
+ process = psutil.Process(pid)
+ window_info.append((win32gui.GetWindowText(hwnd), process.exe()))
+ return True
+
+ window_info = []
+ win32gui.EnumWindows(callback, window_info)
+ return window_info
+
+ def kill_process(self, path: Path) -> None:
+ """
+ 根据路径中止进程
+
+ :param path: 进程路径
+ """
+
+ logger.info(f"开始中止进程: {path}", module="系统服务")
+
+ for pid in self.search_pids(path):
+ killprocess = subprocess.Popen(
+ f"taskkill /F /T /PID {pid}",
+ shell=True,
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ )
+ killprocess.wait()
+
+ logger.success(f"进程已中止: {path}", module="系统服务")
+
+ def search_pids(self, path: Path) -> list:
+ """
+ 根据路径查找进程PID
+
+ :param path: 进程路径
+ :return: 匹配的进程PID列表
+ """
+
+ logger.info(f"开始查找进程 PID: {path}", module="系统服务")
+
+ pids = []
+ for proc in psutil.process_iter(["pid", "exe"]):
+ try:
+ if proc.info["exe"] and proc.info["exe"].lower() == str(path).lower():
+ pids.append(proc.info["pid"])
+ except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
+ # 进程可能在此期间已结束或无法访问,忽略这些异常
+ pass
+ return pids
+
+
+System = _SystemHandler()
diff --git a/app/task/skland.py b/app/task/skland.py
new file mode 100644
index 0000000..901a352
--- /dev/null
+++ b/app/task/skland.py
@@ -0,0 +1,285 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file incorporates work covered by the following copyright and
+# permission notice:
+#
+# skland-checkin-ghaction Copyright © 2023 Yanstory
+# https://github.com/Yanstory/skland-checkin-ghaction
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+
+"""
+AUTO_MAA
+AUTO_MAA森空岛服务
+v4.4
+作者:DLmaster_361、ClozyA
+"""
+
+import time
+import json
+import hmac
+import hashlib
+import requests
+from urllib import parse
+
+from app.core import Config, logger
+
+
+def skland_sign_in(token) -> dict:
+ """森空岛签到"""
+
+ app_code = "4ca99fa6b56cc2ba"
+ # 用于获取grant code
+ grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
+ # 用于获取cred
+ cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
+ # 查询角色绑定
+ binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
+ # 签到接口
+ sign_url = "https://zonai.skland.com/api/v1/game/attendance"
+
+ # 基础请求头
+ header = {
+ "cred": "",
+ "User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
+ "Accept-Encoding": "gzip",
+ "Connection": "close",
+ }
+ header_login = header.copy()
+ header_for_sign = {
+ "platform": "1",
+ "timestamp": "",
+ "dId": "",
+ "vName": "1.5.1",
+ }
+
+ def generate_signature(token_for_sign: str, path, body_or_query):
+ """
+ 生成请求签名
+
+ :param token_for_sign: 用于加密的token
+ :param path: 请求路径(如 /api/v1/game/player/binding)
+ :param body_or_query: GET用query字符串,POST用body字符串
+ :return: (sign, 新的header_for_sign字典)
+ """
+
+ t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
+ token_bytes = token_for_sign.encode("utf-8")
+ header_ca = dict(header_for_sign)
+ header_ca["timestamp"] = t
+ header_ca_str = json.dumps(header_ca, separators=(",", ":"))
+ s = path + body_or_query + t + header_ca_str # 拼接原始字符串
+ # HMAC-SHA256 + MD5得到最终sign
+ hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
+ md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
+ return md5, header_ca
+
+ def get_sign_header(url: str, method, body, old_header, sign_token):
+ """
+ 获取带签名的请求头
+
+ :param url: 请求完整url
+ :param method: 请求方式 GET/POST
+ :param body: POST请求体或GET时为None
+ :param old_header: 原始请求头
+ :param sign_token: 当前会话的签名token
+ :return: 新请求头
+ """
+
+ h = json.loads(json.dumps(old_header))
+ p = parse.urlparse(url)
+ if method.lower() == "get":
+ sign, header_ca = generate_signature(sign_token, p.path, p.query)
+ else:
+ sign, header_ca = generate_signature(
+ sign_token, p.path, json.dumps(body) if body else ""
+ )
+ h["sign"] = sign
+ for i in header_ca:
+ h[i] = header_ca[i]
+ return h
+
+ def copy_header(cred):
+ """
+ 复制请求头并添加cred
+
+ :param cred: 当前会话的cred
+ :return: 新的请求头
+ """
+ v = json.loads(json.dumps(header))
+ v["cred"] = cred
+ return v
+
+ def login_by_token(token_code):
+ """
+ 使用token一步步拿到cred和sign_token
+
+ :param token_code: 你的skyland token
+ :return: (cred, sign_token)
+ """
+ try:
+ # token为json对象时提取data.content
+ t = json.loads(token_code)
+ token_code = t["data"]["content"]
+ except:
+ pass
+ grant_code = get_grant_code(token_code)
+ return get_cred(grant_code)
+
+ def get_cred(grant):
+ """
+ 通过grant code获取cred和sign_token
+
+ :param grant: grant code
+ :return: (cred, sign_token)
+ """
+
+ rsp = requests.post(
+ cred_code_url,
+ json={"code": grant, "kind": 1},
+ headers=header_login,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ ).json()
+ if rsp["code"] != 0:
+ raise Exception(f'获得cred失败:{rsp.get("messgae")}')
+ sign_token = rsp["data"]["token"]
+ cred = rsp["data"]["cred"]
+ return cred, sign_token
+
+ def get_grant_code(token):
+ """
+ 通过token获取grant code
+
+ :param token: 你的skyland token
+ :return: grant code
+ """
+ rsp = requests.post(
+ grant_code_url,
+ json={"appCode": app_code, "token": token, "type": 0},
+ headers=header_login,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ ).json()
+ if rsp["status"] != 0:
+ raise Exception(
+ f'使用token: {token[:3]}******{token[-3:]} 获得认证代码失败:{rsp.get("msg")}'
+ )
+ return rsp["data"]["code"]
+
+ def get_binding_list(cred, sign_token):
+ """
+ 查询已绑定的角色列表
+
+ :param cred: 当前cred
+ :param sign_token: 当前sign_token
+ :return: 角色列表
+ """
+ v = []
+ rsp = requests.get(
+ binding_url,
+ headers=get_sign_header(
+ binding_url, "get", None, copy_header(cred), sign_token
+ ),
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ ).json()
+ if rsp["code"] != 0:
+ logger.error(
+ f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}",
+ module="森空岛签到",
+ )
+ if rsp.get("message") == "用户未登录":
+ logger.error(
+ f"森空岛服务 | 用户登录可能失效了,请重新登录!",
+ module="森空岛签到",
+ )
+ return v
+ # 只取明日方舟(arknights)的绑定账号
+ for i in rsp["data"]["list"]:
+ if i.get("appCode") != "arknights":
+ continue
+ v.extend(i.get("bindingList"))
+ return v
+
+ def do_sign(cred, sign_token) -> dict:
+ """
+ 对所有绑定的角色进行签到
+
+ :param cred: 当前cred
+ :param sign_token: 当前sign_token
+ :return: 签到结果字典
+ """
+
+ characters = get_binding_list(cred, sign_token)
+ result = {"成功": [], "重复": [], "失败": [], "总计": len(characters)}
+
+ for character in characters:
+
+ body = {
+ "uid": character.get("uid"),
+ "gameId": character.get("channelMasterId"),
+ }
+ rsp = requests.post(
+ sign_url,
+ headers=get_sign_header(
+ sign_url, "post", body, copy_header(cred), sign_token
+ ),
+ json=body,
+ proxies={
+ "http": Config.get(Config.update_ProxyAddress),
+ "https": Config.get(Config.update_ProxyAddress),
+ },
+ ).json()
+
+ if rsp["code"] != 0:
+
+ result[
+ "重复" if rsp.get("message") == "请勿重复签到!" else "失败"
+ ].append(
+ f"{character.get("nickName")}({character.get("channelName")})"
+ )
+
+ else:
+
+ result["成功"].append(
+ f"{character.get("nickName")}({character.get("channelName")})"
+ )
+
+ time.sleep(3)
+
+ return result
+
+ # 主流程
+ try:
+ # 拿到cred和sign_token
+ cred, sign_token = login_by_token(token)
+ time.sleep(1)
+ # 依次签到
+ return do_sign(cred, sign_token)
+ except Exception as e:
+ logger.exception(f"森空岛服务 | 森空岛签到失败: {e}", module="森空岛签到")
+ return {"成功": [], "重复": [], "失败": [], "总计": 0}
diff --git a/app/utils/ImageUtils.py b/app/utils/ImageUtils.py
new file mode 100644
index 0000000..2631dc9
--- /dev/null
+++ b/app/utils/ImageUtils.py
@@ -0,0 +1,95 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2025 ClozyA
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+"""
+AUTO_MAA
+AUTO_MAA图像组件
+v4.4
+作者:ClozyA
+"""
+
+import base64
+import hashlib
+from pathlib import Path
+
+from PIL import Image
+
+
+class ImageUtils:
+ @staticmethod
+ def get_base64_from_file(image_path):
+ """从本地文件读取并返回base64编码字符串"""
+ with open(image_path, "rb") as f:
+ return base64.b64encode(f.read()).decode("utf-8")
+
+ @staticmethod
+ def calculate_md5_from_file(image_path):
+ """从本地文件读取并返回md5值(hex字符串)"""
+ with open(image_path, "rb") as f:
+ return hashlib.md5(f.read()).hexdigest()
+
+ @staticmethod
+ def calculate_md5_from_base64(base64_content):
+ """从base64字符串计算md5"""
+ image_data = base64.b64decode(base64_content)
+ return hashlib.md5(image_data).hexdigest()
+
+ @staticmethod
+ def compress_image_if_needed(image_path: Path, max_size_mb=2) -> Path:
+ """
+ 如果图片大于max_size_mb,则压缩并覆盖原文件,返回原始路径(Path对象)
+ """
+ if hasattr(Image, "Resampling"): # Pillow 9.1.0及以后
+ RESAMPLE = Image.Resampling.LANCZOS
+ else:
+ RESAMPLE = Image.ANTIALIAS
+
+ max_size = max_size_mb * 1024 * 1024
+ if image_path.stat().st_size <= max_size:
+ return image_path
+
+ img = Image.open(image_path)
+ suffix = image_path.suffix.lower()
+ quality = 90 if suffix in [".jpg", ".jpeg"] else None
+ step = 5
+
+ if suffix in [".jpg", ".jpeg"]:
+ while True:
+ img.save(image_path, quality=quality, optimize=True)
+ if image_path.stat().st_size <= max_size or quality <= 10:
+ break
+ quality -= step
+ elif suffix == ".png":
+ width, height = img.size
+ while True:
+ img.save(image_path, optimize=True)
+ if (
+ image_path.stat().st_size <= max_size
+ or width <= 200
+ or height <= 200
+ ):
+ break
+ width = int(width * 0.95)
+ height = int(height * 0.95)
+ img = img.resize((width, height), RESAMPLE)
+ else:
+ raise ValueError("仅支持JPG/JPEG和PNG格式图片的压缩。")
+
+ return image_path
diff --git a/app/utils/ProcessManager.py b/app/utils/ProcessManager.py
new file mode 100644
index 0000000..d29528f
--- /dev/null
+++ b/app/utils/ProcessManager.py
@@ -0,0 +1,157 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+
+import psutil
+import subprocess
+from pathlib import Path
+from datetime import datetime
+
+from PySide6.QtCore import QTimer, QObject, Signal
+
+
+class ProcessManager(QObject):
+ """进程监视器类,用于跟踪主进程及其所有子进程的状态"""
+
+ processClosed = Signal()
+
+ def __init__(self):
+ super().__init__()
+
+ self.main_pid = None
+ self.tracked_pids = set()
+
+ self.check_timer = QTimer()
+ self.check_timer.timeout.connect(self.check_processes)
+
+ def open_process(self, path: Path, args: list = [], tracking_time: int = 60) -> int:
+ """
+ 启动一个新进程并返回其pid,并开始监视该进程
+
+ :param path: 可执行文件的路径
+ :param args: 启动参数列表
+ :param tracking_time: 子进程追踪持续时间(秒)
+ :return: 新进程的PID
+ """
+
+ process = subprocess.Popen(
+ [path, *args],
+ cwd=path.parent,
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ self.start_monitoring(process.pid, tracking_time)
+
+ def start_monitoring(self, pid: int, tracking_time: int = 60) -> None:
+ """
+ 启动进程监视器,跟踪指定的主进程及其子进程
+
+ :param pid: 被监视进程的PID
+ :param tracking_time: 子进程追踪持续时间(秒)
+ """
+
+ self.clear()
+
+ self.main_pid = pid
+ self.tracking_time = tracking_time
+
+ # 扫描并记录所有相关进程
+ try:
+ # 获取主进程
+ main_proc = psutil.Process(self.main_pid)
+ self.tracked_pids.add(self.main_pid)
+
+ # 递归获取所有子进程
+ if tracking_time:
+ for child in main_proc.children(recursive=True):
+ self.tracked_pids.add(child.pid)
+
+ except psutil.NoSuchProcess:
+ pass
+
+ # 启动持续追踪机制
+ self.start_time = datetime.now()
+ self.check_timer.start(100)
+
+ def check_processes(self) -> None:
+ """检查跟踪的进程是否仍在运行,并更新子进程列表"""
+
+ # 仅在时限内持续更新跟踪的进程列表,发现新的子进程
+ if (datetime.now() - self.start_time).total_seconds() < self.tracking_time:
+
+ current_pids = set(self.tracked_pids)
+ for pid in current_pids:
+ try:
+ proc = psutil.Process(pid)
+ for child in proc.children():
+ if child.pid not in self.tracked_pids:
+ # 新发现的子进程
+ self.tracked_pids.add(child.pid)
+ except psutil.NoSuchProcess:
+ continue
+
+ if not self.is_running():
+ self.clear()
+ self.processClosed.emit()
+
+ def is_running(self) -> bool:
+ """检查所有跟踪的进程是否还在运行"""
+
+ for pid in self.tracked_pids:
+ try:
+ proc = psutil.Process(pid)
+ if proc.is_running():
+ return True
+ except psutil.NoSuchProcess:
+ continue
+
+ return False
+
+ def kill(self, if_force: bool = False) -> None:
+ """停止监视器并中止所有跟踪的进程"""
+
+ self.check_timer.stop()
+
+ for pid in self.tracked_pids:
+ try:
+ proc = psutil.Process(pid)
+ if if_force:
+ kill_process = subprocess.Popen(
+ ["taskkill", "/F", "/T", "/PID", str(pid)],
+ creationflags=subprocess.CREATE_NO_WINDOW,
+ )
+ kill_process.wait()
+ proc.terminate()
+ except psutil.NoSuchProcess:
+ continue
+
+ if self.main_pid:
+ self.processClosed.emit()
+ self.clear()
+
+ def clear(self) -> None:
+ """清空跟踪的进程列表"""
+
+ self.main_pid = None
+ self.check_timer.stop()
+ self.tracked_pids.clear()
diff --git a/app/utils/__init__.py b/app/utils/__init__.py
index ce10f21..cd7286d 100644
--- a/app/utils/__init__.py
+++ b/app/utils/__init__.py
@@ -25,5 +25,6 @@ __license__ = "GPL-3.0 license"
from .logger import get_logger
+from .security import dpapi_encrypt, dpapi_decrypt
-__all__ = ["get_logger"]
+__all__ = ["get_logger", "dpapi_encrypt", "dpapi_decrypt"]
diff --git a/app/utils/security.py b/app/utils/security.py
new file mode 100644
index 0000000..6dad28a
--- /dev/null
+++ b/app/utils/security.py
@@ -0,0 +1,69 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+
+import base64
+import win32crypt
+
+
+def dpapi_encrypt(
+ note: str, description: None | str = None, entropy: None | bytes = None
+) -> str:
+ """
+ 使用Windows DPAPI加密数据
+
+ :param note: 数据明文
+ :type note: str
+ :param description: 描述信息
+ :type description: str
+ :param entropy: 随机熵
+ :type entropy: bytes
+ :return: 加密后的数据
+ :rtype: str
+ """
+
+ if note == "":
+ return ""
+
+ encrypted = win32crypt.CryptProtectData(
+ note.encode("utf-8"), description, entropy, None, None, 0
+ )
+ return base64.b64encode(encrypted).decode("utf-8")
+
+
+def dpapi_decrypt(note: str, entropy: None | bytes = None) -> str:
+ """
+ 使用Windows DPAPI解密数据
+
+ :param note: 数据密文
+ :type note: str
+ :param entropy: 随机熵
+ :type entropy: bytes
+ :return: 解密后的明文
+ :rtype: str
+ """
+
+ if note == "":
+ return ""
+
+ decrypted = win32crypt.CryptUnprotectData(
+ base64.b64decode(note), entropy, None, None, 0
+ )
+ return decrypted[1].decode("utf-8")