feat: 支持本地数据保存时加密

This commit is contained in:
DLmaster361
2025-08-05 18:56:44 +08:00
parent d61b90baa4
commit 4ca7f9053f
12 changed files with 1586 additions and 404 deletions

View File

@@ -1,343 +0,0 @@
import uuid
from fastapi import FastAPI, HTTPException, Path, Body
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional, Literal
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
from app.core import Config
from app.utils import get_logger
# 此文件由ai生成 返回值非最终版本
# ======================
# Data Models
# ======================
# Script Models
class ScriptUser(BaseModel):
userId: str
config: Dict[str, Any] = {}
# Plan Models
class PlanDayConfig(BaseModel):
吃理智药: int
连战次数: str
关卡选择: str
备选_1: str
备选_2: str
备选_3: str
剩余理智: str
class PlanDetails(BaseModel):
周一: PlanDayConfig
周二: PlanDayConfig
周三: PlanDayConfig
周四: PlanDayConfig
周五: PlanDayConfig
周六: PlanDayConfig
周日: PlanDayConfig
class PlanCreate(BaseModel):
name: str
mode: str # "全局" or "周计划"
details: PlanDetails
class PlanUpdate(BaseModel):
name: Optional[str] = None
mode: Optional[str] = None
details: Optional[PlanDetails] = None
class PlanModeUpdate(BaseModel):
mode: str # "全局" or "周计划"
# Queue Models
class QueueCreate(BaseModel):
name: str
scripts: List[str]
schedule: str
description: Optional[str] = None
class QueueUpdate(BaseModel):
name: Optional[str] = None
scripts: Optional[List[str]] = None
schedule: Optional[str] = None
description: Optional[str] = None
# Task Models
class TaskCreate(BaseModel):
name: str
scriptId: str
planId: str
queueId: Optional[str] = None
priority: int = 0
parameters: Dict[str, Any] = {}
# Settings Models
class SettingsUpdate(BaseModel):
key: str
value: Any
# ======================
# API Endpoints
# ======================
# @app.get("/api/activity/latest", summary="获取最新活动内容")
# async def get_latest_activity():
# """
# 获取最新活动内容
# """
# # 实现获取最新活动的逻辑
# return {"status": "success", "data": {}}
# @app.post("/api/scripts/{scriptId}/users", summary="为脚本添加用户")
# async def add_script_user(
# scriptId: str = Path(..., description="脚本ID"), user: ScriptUser = Body(...)
# ):
# """
# 为脚本添加用户
# """
# # 实现为脚本添加用户的逻辑
# return {"status": "success"}
# @app.get("/api/scripts/{scriptId}/users", summary="查询脚本的所有下属用户")
# async def get_script_users(scriptId: str = Path(..., description="脚本ID")):
# """
# 查询脚本的所有下属用户
# """
# # 实现查询脚本的所有下属用户的逻辑
# return {"status": "success", "data": []}
# @app.get("/api/scripts/{scriptId}/users/{userId}", summary="查询脚本下的单个下属用户")
# async def get_script_user(
# scriptId: str = Path(..., description="脚本ID"),
# userId: str = Path(..., description="用户ID"),
# ):
# """
# 查询脚本下的单个下属用户
# """
# # 实现查询脚本下的单个下属用户的逻辑
# return {"status": "success", "data": {}}
# @app.put("/api/scripts/{scriptId}/users/{userId}", summary="更新脚本下属用户的关联信息")
# async def update_script_user(
# scriptId: str = Path(..., description="脚本ID"),
# userId: str = Path(..., description="用户ID"),
# config: Dict[str, Any] = Body(...),
# ):
# """
# 更新脚本下属用户的关联信息
# """
# # 实现更新脚本下属用户的关联信息的逻辑
# return {"status": "success"}
# @app.delete("/api/scripts/{scriptId}/users/{userId}", summary="从脚本移除用户")
# async def remove_script_user(
# scriptId: str = Path(..., description="脚本ID"),
# userId: str = Path(..., description="用户ID"),
# ):
# """
# 从脚本移除用户
# """
# # 实现从脚本移除用户的逻辑
# return {"status": "success"}
# @app.post("/api/add/plans", summary="创建计划")
# async def add_plan(plan: PlanCreate = Body(...)):
# """
# 创建计划
# {
# "name": "计划 1",
# "mode": "全局", // 或 "周计划"
# "details": {
# "周一": {
# "吃理智药": 0,
# "连战次数": "AUTO",
# "关卡选择": "当前/上次",
# "备选-1": "当前/上次",
# "备选-2": "当前/上次",
# "备选-3": "当前/上次",
# "剩余理智": "不使用"
# },
# // 其他天数...
# }
# }
# """
# # 实现创建计划的逻辑
# return {"status": "success", "planId": "new_plan_id"}
# @app.post("/api/get/plans", summary="查询所有计划")
# async def get_plans():
# """
# 查询所有计划
# """
# # 实现查询所有计划的逻辑
# return {"status": "success", "data": []}
# @app.post("/api/get/plans/{planId}", summary="查询单个计划")
# async def get_plan(planId: str = Path(..., description="计划ID")):
# """
# 查询单个计划
# """
# # 实现查询单个计划的逻辑
# return {"status": "success", "data": {}}
# @app.post("/api/update/plans/{planId}", summary="更新计划")
# async def update_plan(
# planId: str = Path(..., description="计划ID"), update_data: PlanUpdate = Body(...)
# ):
# """
# 更新计划
# """
# # 实现更新计划的逻辑
# return {"status": "success"}
# @app.post("/api/delete/plans/{planId}", summary="删除计划")
# async def delete_plan(planId: str = Path(..., description="计划ID")):
# """
# 删除计划
# """
# # 实现删除计划的逻辑
# return {"status": "success"}
# @app.post("/api/update/plans/{planId}/mode", summary="切换计划模式")
# async def update_plan_mode(
# planId: str = Path(..., description="计划ID"), mode_data: PlanModeUpdate = Body(...)
# ):
# """
# 切换计划模式
# {
# "mode": "周计划"
# }
# """
# # 实现切换计划模式的逻辑
# return {"status": "success"}
# @app.post("/api/add/queues", summary="创建调度队列")
# async def add_queue(queue: QueueCreate = Body(...)):
# """
# 创建调度队列
# """
# # 实现创建调度队列的逻辑
# return {"status": "success", "queueId": "new_queue_id"}
# @app.post("/api/get/queues", summary="查询所有调度队列")
# async def get_queues():
# """
# 查询所有调度队列
# """
# # 实现查询所有调度队列的逻辑
# return {"status": "success", "data": []}
# @app.post("/api/get/queues/{queueId}", summary="查询单个调度队列详情")
# async def get_queue(queueId: str = Path(..., description="调度队列ID")):
# """
# 查询单个调度队列详情
# """
# # 实现查询单个调度队列详情的逻辑
# return {"status": "success", "data": {}}
# @app.post("/api/update/queues/{queueId}", summary="更新调度队列")
# async def update_queue(
# queueId: str = Path(..., description="调度队列ID"),
# update_data: QueueUpdate = Body(...),
# ):
# """
# 更新调度队列
# """
# # 实现更新调度队列的逻辑
# return {"status": "success"}
# @app.post("/api/delete/queues/{queueId}", summary="删除调度队列")
# async def delete_queue(queueId: str = Path(..., description="调度队列ID")):
# """
# 删除调度队列
# """
# # 实现删除调度队列的逻辑
# return {"status": "success"}
# @app.post("/api/add/tasks", summary="添加任务")
# async def add_task(task: TaskCreate = Body(...)):
# """
# 添加任务
# """
# # 实现添加任务的逻辑
# return {"status": "success", "taskId": "new_task_id"}
# @app.post("/api/tasks/{taskId}/start", summary="开始任务")
# async def start_task(taskId: str = Path(..., description="任务ID")):
# """
# 开始任务
# """
# # 实现开始任务的逻辑
# return {"status": "success"}
# @app.post("/api/get/history", summary="查询历史记录")
# async def get_history():
# """
# 查询历史记录
# """
# # 实现查询历史记录的逻辑
# return {"status": "success", "data": []}
# @app.post("/api/update/settings", summary="更新部分设置")
# async def update_settings(settings: SettingsUpdate = Body(...)):
# """
# 更新部分设置
# """
# # 实现更新部分设置的逻辑
# return {"status": "success"}
# # ======================
# # Error Handlers
# # ======================
# @app.exception_handler(HTTPException)
# async def http_exception_handler(request, exc):
# return {"status": "error", "code": exc.status_code, "message": exc.detail}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -59,9 +59,6 @@ class GlobalConfig(ConfigBase):
) )
Function_IfSilence = ConfigItem("Function", "IfSilence", False, BoolValidator()) Function_IfSilence = ConfigItem("Function", "IfSilence", False, BoolValidator())
Function_BossKey = ConfigItem("Function", "BossKey", "") Function_BossKey = ConfigItem("Function", "BossKey", "")
Function_UnattendedMode = ConfigItem(
"Function", "UnattendedMode", False, BoolValidator()
)
Function_IfAgreeBilibili = ConfigItem( Function_IfAgreeBilibili = ConfigItem(
"Function", "IfAgreeBilibili", False, BoolValidator() "Function", "IfAgreeBilibili", False, BoolValidator()
) )
@@ -95,7 +92,9 @@ class GlobalConfig(ConfigBase):
Notify_IfPushPlyer = ConfigItem("Notify", "IfPushPlyer", False, BoolValidator()) Notify_IfPushPlyer = ConfigItem("Notify", "IfPushPlyer", False, BoolValidator())
Notify_IfSendMail = ConfigItem("Notify", "IfSendMail", False, BoolValidator()) Notify_IfSendMail = ConfigItem("Notify", "IfSendMail", False, BoolValidator())
Notify_SMTPServerAddress = ConfigItem("Notify", "SMTPServerAddress", "") Notify_SMTPServerAddress = ConfigItem("Notify", "SMTPServerAddress", "")
Notify_AuthorizationCode = ConfigItem("Notify", "AuthorizationCode", "") Notify_AuthorizationCode = ConfigItem(
"Notify", "AuthorizationCode", "", EncryptValidator()
)
Notify_FromAddress = ConfigItem("Notify", "FromAddress", "") Notify_FromAddress = ConfigItem("Notify", "FromAddress", "")
Notify_ToAddress = ConfigItem("Notify", "ToAddress", "") Notify_ToAddress = ConfigItem("Notify", "ToAddress", "")
Notify_IfServerChan = ConfigItem("Notify", "IfServerChan", False, BoolValidator()) Notify_IfServerChan = ConfigItem("Notify", "IfServerChan", False, BoolValidator())
@@ -114,7 +113,9 @@ class GlobalConfig(ConfigBase):
Update_ThreadNumb = ConfigItem("Update", "ThreadNumb", 8, RangeValidator(1, 32)) Update_ThreadNumb = ConfigItem("Update", "ThreadNumb", 8, RangeValidator(1, 32))
Update_ProxyAddress = ConfigItem("Update", "ProxyAddress", "") Update_ProxyAddress = ConfigItem("Update", "ProxyAddress", "")
Update_ProxyUrlList = ConfigItem("Update", "ProxyUrlList", []) Update_ProxyUrlList = ConfigItem("Update", "ProxyUrlList", [])
Update_MirrorChyanCDK = ConfigItem("Update", "MirrorChyanCDK", "") Update_MirrorChyanCDK = ConfigItem(
"Update", "MirrorChyanCDK", "", EncryptValidator()
)
class QueueItem(ConfigBase): class QueueItem(ConfigBase):
@@ -123,7 +124,7 @@ class QueueItem(ConfigBase):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.Info_ScriptId = ConfigItem("Info", "ScriptId", "", UidValidator()) self.Info_ScriptId = ConfigItem("Info", "ScriptId", None, UidValidator())
class TimeSet(ConfigBase): class TimeSet(ConfigBase):
@@ -221,7 +222,7 @@ class MaaUserConfig(ConfigBase):
"Normal", "Normal",
OptionsValidator(["Normal", "Rotation", "Custom"]), OptionsValidator(["Normal", "Rotation", "Custom"]),
) )
self.Info_Password = ConfigItem("Info", "Password", "") self.Info_Password = ConfigItem("Info", "Password", "", EncryptValidator())
self.Info_Notes = ConfigItem("Info", "Notes", "") self.Info_Notes = ConfigItem("Info", "Notes", "")
self.Info_MedicineNumb = ConfigItem( self.Info_MedicineNumb = ConfigItem(
"Info", "MedicineNumb", 0, RangeValidator(0, 1024) "Info", "MedicineNumb", 0, RangeValidator(0, 1024)

View File

@@ -37,7 +37,7 @@ def is_admin() -> bool:
return False return False
@logger.catch # @logger.catch
def main(): def main():
if is_admin(): if is_admin():

View File

@@ -22,11 +22,13 @@
import json import json
import uuid import uuid
import win32com.client
from copy import deepcopy from copy import deepcopy
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import List, Callable, Any, Dict, Union from typing import List, Any, Dict, Union
from urllib.parse import urlparse
from utils import dpapi_encrypt, dpapi_decrypt
class ConfigValidator: class ConfigValidator:
@@ -49,46 +51,66 @@ class RangeValidator(ConfigValidator):
self.max = max self.max = max
self.range = (min, max) self.range = (min, max)
def validate(self, value: int | float) -> bool: def validate(self, value: Any) -> bool:
if not isinstance(value, (int | float)):
return False
return self.min <= value <= self.max return self.min <= value <= self.max
def correct(self, value: int | float) -> int | float: def correct(self, value: Any) -> int | float:
if not isinstance(value, (int, float)):
try:
value = float(value)
except TypeError:
return self.min
return min(max(self.min, value), self.max) return min(max(self.min, value), self.max)
class OptionsValidator(ConfigValidator): class OptionsValidator(ConfigValidator):
"""选项验证器""" """选项验证器"""
def __init__(self, options): def __init__(self, options: list):
if not options: if not options:
raise ValueError("The `options` can't be empty.") raise ValueError("The `options` can't be empty.")
if isinstance(options, Enum): self.options = options
options = options._member_map_.values()
self.options = list(options)
def validate(self, value: Any) -> bool: def validate(self, value: Any) -> bool:
return value in self.options return value in self.options
def correct(self, value): def correct(self, value: Any) -> Any:
return value if self.validate(value) else self.options[0] return value if self.validate(value) else self.options[0]
class UidValidator(ConfigValidator): class UidValidator(ConfigValidator):
"""UID验证器""" """UID验证器"""
def validate(self, value: str) -> bool: def validate(self, value: Any) -> bool:
if value == "": if value is None:
return True return True
try: try:
uuid.UUID(value) uuid.UUID(value)
return True return True
except ValueError: except (TypeError, ValueError):
return False return False
def correct(self, value: str) -> str: def correct(self, value: Any) -> Any:
return "" return value if self.validate(value) else None
class EncryptValidator(ConfigValidator):
"""加数据验证器"""
def validate(self, value: Any) -> bool:
if not isinstance(value, str):
return False
try:
dpapi_decrypt(value)
return True
except:
return False
def correct(self, value: Any) -> Any:
return value if self.validate(value) else dpapi_encrypt("数据损坏,请重新设置")
class BoolValidator(OptionsValidator): class BoolValidator(OptionsValidator):
@@ -101,40 +123,44 @@ class BoolValidator(OptionsValidator):
class FileValidator(ConfigValidator): class FileValidator(ConfigValidator):
"""文件路径验证器""" """文件路径验证器"""
def validate(self, value): def validate(self, value: Any) -> bool:
return Path(value).exists() if not isinstance(value, str):
return False
if not Path(value).is_absolute():
return False
if Path(value).suffix == ".lnk":
return False
return True
def correct(self, value): def correct(self, value: Any) -> str:
path = Path(value) if not isinstance(value, str):
return str(path.absolute()).replace("\\", "/") value = "."
if not Path(value).is_absolute():
value = Path(value).resolve().as_posix()
if Path(value).suffix == ".lnk":
try:
shell = win32com.client.Dispatch("WScript.Shell")
shortcut = shell.CreateShortcut(value)
value = shortcut.TargetPath
except:
pass
return Path(value).resolve().as_posix()
class FolderValidator(ConfigValidator): class FolderValidator(ConfigValidator):
"""文件夹路径验证器""" """文件夹路径验证器"""
def validate(self, value): def validate(self, value: Any) -> bool:
return Path(value).exists() if not isinstance(value, str):
return False
if not Path(value).is_absolute():
return False
return True
def correct(self, value): def correct(self, value: Any) -> str:
path = Path(value) if not isinstance(value, str):
path.mkdir(exist_ok=True, parents=True) value = "."
return str(path.absolute()).replace("\\", "/") return Path(value).resolve().as_posix()
class FolderListValidator(ConfigValidator):
"""文件夹列表验证器"""
def validate(self, value):
return all(Path(i).exists() for i in value)
def correct(self, value: List[str]):
folders = []
for folder in value:
path = Path(folder)
if path.exists():
folders.append(str(path.absolute()).replace("\\", "/"))
return folders
class ConfigItem: class ConfigItem:
@@ -144,7 +170,7 @@ class ConfigItem:
self, self,
group: str, group: str,
name: str, name: str,
default: object, default: Any,
validator: None | ConfigValidator = None, validator: None | ConfigValidator = None,
): ):
""" """
@@ -165,11 +191,9 @@ class ConfigItem:
super().__init__() super().__init__()
self.group = group self.group = group
self.name = name self.name = name
self.value: Any = None self.value: Any = default
self.validator = validator or ConfigValidator() self.validator = validator or ConfigValidator()
self.setValue(default)
def setValue(self, value: Any): def setValue(self, value: Any):
""" """
设置配置项值,将自动进行验证和修正 设置配置项值,将自动进行验证和修正
@@ -180,7 +204,11 @@ class ConfigItem:
要设置的值,可以是任何合法类型 要设置的值,可以是任何合法类型
""" """
if self.value == value: if (
dpapi_decrypt(self.value)
if isinstance(self.validator, EncryptValidator)
else self.value
) == value:
return return
# deepcopy new value # deepcopy new value
@@ -189,9 +217,21 @@ class ConfigItem:
except: except:
self.value = value self.value = value
if isinstance(self.validator, EncryptValidator):
self.value = dpapi_encrypt(self.value)
if not self.validator.validate(self.value): if not self.validator.validate(self.value):
self.value = self.validator.correct(self.value) self.value = self.validator.correct(self.value)
def getValue(self) -> Any:
"""
获取配置项值
"""
if isinstance(self.validator, EncryptValidator):
return dpapi_decrypt(self.value)
return self.value
class ConfigBase: class ConfigBase:
""" """
@@ -271,7 +311,9 @@ class ConfigBase:
if self.file: if self.file:
await self.save() await self.save()
async def toDict(self, ignore_multi_config: bool = False) -> Dict[str, Any]: async def toDict(
self, ignore_multi_config: bool = False, if_decrypt: bool = True
) -> Dict[str, Any]:
"""将配置项转换为字典""" """将配置项转换为字典"""
data = {} data = {}
@@ -283,7 +325,9 @@ class ConfigBase:
if not data.get(item.group): if not data.get(item.group):
data[item.group] = {} data[item.group] = {}
if item.name: if item.name:
data[item.group][item.name] = item.value data[item.group][item.name] = (
item.getValue() if if_decrypt else item.value
)
elif not ignore_multi_config and isinstance(item, MultipleConfig): elif not ignore_multi_config and isinstance(item, MultipleConfig):
@@ -301,7 +345,7 @@ class ConfigBase:
configItem = getattr(self, f"{group}_{name}") configItem = getattr(self, f"{group}_{name}")
if isinstance(configItem, ConfigItem): if isinstance(configItem, ConfigItem):
return configItem.value return configItem.getValue()
else: else:
raise TypeError( raise TypeError(
f"Config item '{group}_{name}' is not a ConfigItem instance." f"Config item '{group}_{name}' is not a ConfigItem instance."
@@ -345,7 +389,7 @@ class ConfigBase:
self.file.parent.mkdir(parents=True, exist_ok=True) self.file.parent.mkdir(parents=True, exist_ok=True)
with self.file.open("w", encoding="utf-8") as f: with self.file.open("w", encoding="utf-8") as f:
json.dump( json.dump(
await self.toDict(not self.if_save_multi_config), await self.toDict(not self.if_save_multi_config, if_decrypt=False),
f, f,
ensure_ascii=False, ensure_ascii=False,
indent=4, indent=4,

37
app/services/__init__.py Normal file
View File

@@ -0,0 +1,37 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA服务包
v4.4
作者DLmaster_361
"""
__version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .notification import Notify
from ..utils.security import Crypto
from .system import System
from ..task.skland import skland_sign_in
__all__ = ["Notify", "Crypto", "System", "skland_sign_in"]

View File

@@ -0,0 +1,484 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA通知服务
v4.4
作者DLmaster_361
"""
import re
import smtplib
import time
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from pathlib import Path
from typing import Union
import requests
from PySide6.QtCore import QObject, Signal
from plyer import notification
from app.core import Config, logger
from app.utils.security import Crypto
from app.utils.ImageUtils import ImageUtils
class Notification(QObject):
push_info_bar = Signal(str, str, str, int)
def __init__(self, parent=None):
super().__init__(parent)
def push_plyer(self, title, message, ticker, t) -> bool:
"""
推送系统通知
:param title: 通知标题
:param message: 通知内容
:param ticker: 通知横幅
:param t: 通知持续时间
:return: bool
"""
if Config.get(Config.notify_IfPushPlyer):
logger.info(f"推送系统通知:{title}", module="通知服务")
notification.notify(
title=title,
message=message,
app_name="AUTO_MAA",
app_icon=str(Config.app_path / "resources/icons/AUTO_MAA.ico"),
timeout=t,
ticker=ticker,
toast=True,
)
return True
def send_mail(self, mode, title, content, to_address) -> None:
"""
推送邮件通知
:param mode: 邮件内容模式,支持 "文本""网页"
:param title: 邮件标题
:param content: 邮件内容
:param to_address: 收件人地址
"""
if (
Config.get(Config.notify_SMTPServerAddress) == ""
or Config.get(Config.notify_AuthorizationCode) == ""
or not bool(
re.match(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
Config.get(Config.notify_FromAddress),
)
)
or not bool(
re.match(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
to_address,
)
)
):
logger.error(
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
module="通知服务",
)
self.push_info_bar.emit(
"error",
"邮件通知推送异常",
"请正确设置邮件通知的SMTP服务器地址、授权码、发件人地址和收件人地址",
-1,
)
return None
try:
# 定义邮件正文
if mode == "文本":
message = MIMEText(content, "plain", "utf-8")
elif mode == "网页":
message = MIMEMultipart("alternative")
message["From"] = formataddr(
(
Header("AUTO_MAA通知服务", "utf-8").encode(),
Config.get(Config.notify_FromAddress),
)
) # 发件人显示的名字
message["To"] = formataddr(
(Header("AUTO_MAA用户", "utf-8").encode(), to_address)
) # 收件人显示的名字
message["Subject"] = Header(title, "utf-8")
if mode == "网页":
message.attach(MIMEText(content, "html", "utf-8"))
smtpObj = smtplib.SMTP_SSL(Config.get(Config.notify_SMTPServerAddress), 465)
smtpObj.login(
Config.get(Config.notify_FromAddress),
Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
)
smtpObj.sendmail(
Config.get(Config.notify_FromAddress), to_address, message.as_string()
)
smtpObj.quit()
logger.success(f"邮件发送成功:{title}", module="通知服务")
except Exception as e:
logger.exception(f"发送邮件时出错:{e}", module="通知服务")
self.push_info_bar.emit("error", "发送邮件时出错", f"{e}", -1)
def ServerChanPush(
self, title, content, send_key, tag, channel
) -> Union[bool, str]:
"""
使用Server酱推送通知
:param title: 通知标题
:param content: 通知内容
:param send_key: Server酱的SendKey
:param tag: 通知标签
:param channel: 通知频道
:return: bool or str
"""
if not send_key:
logger.error("请正确设置Server酱的SendKey", module="通知服务")
self.push_info_bar.emit(
"error", "Server酱通知推送异常", "请正确设置Server酱的SendKey", -1
)
return None
try:
# 构造 URL
if send_key.startswith("sctp"):
match = re.match(r"^sctp(\d+)t", send_key)
if match:
url = f"https://{match.group(1)}.push.ft07.com/send/{send_key}.send"
else:
raise ValueError("SendKey 格式错误sctp")
else:
url = f"https://sctapi.ftqq.com/{send_key}.send"
# 构建 tags 和 channel
def is_valid(s):
return s == "" or (
s == "|".join(s.split("|"))
and (s.count("|") == 0 or all(s.split("|")))
)
tags = "|".join(_.strip() for _ in tag.split("|"))
channels = "|".join(_.strip() for _ in channel.split("|"))
options = {}
if is_valid(tags):
options["tags"] = tags
else:
logger.warning("Server酱 Tag 配置不正确,将被忽略", module="通知服务")
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
"请正确设置 ServerChan 的 Tag",
-1,
)
if is_valid(channels):
options["channel"] = channels
else:
logger.warning(
"Server酱 Channel 配置不正确,将被忽略", module="通知服务"
)
self.push_info_bar.emit(
"warning",
"Server酱通知推送异常",
"请正确设置 ServerChan 的 Channel",
-1,
)
# 请求发送
params = {"title": title, "desp": content, **options}
headers = {"Content-Type": "application/json;charset=utf-8"}
response = requests.post(
url,
json=params,
headers=headers,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
result = response.json()
if result.get("code") == 0:
logger.success(f"Server酱推送通知成功{title}", module="通知服务")
return True
else:
error_code = result.get("code", "-1")
logger.exception(
f"Server酱通知推送失败响应码{error_code}", module="通知服务"
)
self.push_info_bar.emit(
"error", "Server酱通知推送失败", f"响应码:{error_code}", -1
)
return f"Server酱通知推送失败{error_code}"
except Exception as e:
logger.exception(f"Server酱通知推送异常{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"Server酱通知推送异常",
"请检查相关设置和网络连接。如全部配置正确,请稍后再试。",
-1,
)
return f"Server酱通知推送异常{str(e)}"
def CompanyWebHookBotPush(self, title, content, webhook_url) -> Union[bool, str]:
"""
使用企业微信群机器人推送通知
:param title: 通知标题
:param content: 通知内容
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool or str
"""
if webhook_url == "":
logger.error("请正确设置企业微信群机器人的WebHook地址", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"请正确设置企业微信群机器人的WebHook地址",
-1,
)
return None
content = f"{title}\n{content}"
data = {"msgtype": "text", "text": {"content": content}}
for _ in range(3):
try:
response = requests.post(
url=webhook_url,
json=data,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
info = response.json()
break
except Exception as e:
err = e
time.sleep(0.1)
else:
logger.error(f"推送企业微信群机器人时出错:{err}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
f"使用企业微信群机器人推送通知时出错:{err}",
-1,
)
return None
if info["errcode"] == 0:
logger.success(f"企业微信群机器人推送通知成功:{title}", module="通知服务")
return True
else:
logger.error(f"企业微信群机器人推送通知失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送失败",
f"使用企业微信群机器人推送通知时出错:{err}",
-1,
)
return f"使用企业微信群机器人推送通知时出错:{err}"
def CompanyWebHookBotPushImage(self, image_path: Path, webhook_url: str) -> bool:
"""
使用企业微信群机器人推送图片通知
:param image_path: 图片文件路径
:param webhook_url: 企业微信群机器人的WebHook地址
:return: bool
"""
try:
# 压缩图片
ImageUtils.compress_image_if_needed(image_path)
# 检查图片是否存在
if not image_path.exists():
logger.error(
"图片推送异常 | 图片不存在或者压缩失败,请检查图片路径是否正确",
module="通知服务",
)
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"图片不存在或者压缩失败,请检查图片路径是否正确",
-1,
)
return False
if not webhook_url:
logger.error(
"请正确设置企业微信群机器人的WebHook地址", module="通知服务"
)
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
"请正确设置企业微信群机器人的WebHook地址",
-1,
)
return False
# 获取图片base64和md5
try:
image_base64 = ImageUtils.get_base64_from_file(str(image_path))
image_md5 = ImageUtils.calculate_md5_from_file(str(image_path))
except Exception as e:
logger.exception(f"图片编码或MD5计算失败{e}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人通知推送异常",
f"图片编码或MD5计算失败{e}",
-1,
)
return False
data = {
"msgtype": "image",
"image": {"base64": image_base64, "md5": image_md5},
}
for _ in range(3):
try:
response = requests.post(
url=webhook_url,
json=data,
timeout=10,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
)
info = response.json()
break
except requests.RequestException as e:
err = e
logger.exception(
f"推送企业微信群机器人图片第{_+1}次失败:{e}", module="通知服务"
)
time.sleep(0.1)
else:
logger.error("推送企业微信群机器人图片时出错", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"使用企业微信群机器人推送图片时出错:{err}",
-1,
)
return False
if info.get("errcode") == 0:
logger.success(
f"企业微信群机器人推送图片成功:{image_path.name}",
module="通知服务",
)
return True
else:
logger.error(f"企业微信群机器人推送图片失败:{info}", module="通知服务")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"使用企业微信群机器人推送图片时出错:{info}",
-1,
)
return False
except Exception as e:
logger.error(f"推送企业微信群机器人图片时发生未知异常:{e}")
self.push_info_bar.emit(
"error",
"企业微信群机器人图片推送失败",
f"发生未知异常:{e}",
-1,
)
return False
def send_test_notification(self):
"""发送测试通知到所有已启用的通知渠道"""
logger.info("发送测试通知到所有已启用的通知渠道", module="通知服务")
# 发送系统通知
self.push_plyer(
"测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
"测试通知",
3,
)
# 发送邮件通知
if Config.get(Config.notify_IfSendMail):
self.send_mail(
"文本",
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_ToAddress),
)
# 发送Server酱通知
if Config.get(Config.notify_IfServerChan):
self.ServerChanPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
)
# 发送企业微信机器人通知
if Config.get(Config.notify_IfCompanyWebHookBot):
self.CompanyWebHookBotPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
Notify.CompanyWebHookBotPushImage(
Config.app_path / "resources/images/notification/test_notify.png",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
logger.info("测试通知发送完成", module="通知服务")
return True
Notify = Notification()

352
app/services/system.py Normal file
View File

@@ -0,0 +1,352 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA系统服务
v4.4
作者DLmaster_361
"""
from PySide6.QtWidgets import QApplication
import sys
import ctypes
import win32gui
import win32process
import psutil
import subprocess
import tempfile
import getpass
from datetime import datetime
from pathlib import Path
from app.core import Config
class _SystemHandler:
ES_CONTINUOUS = 0x80000000
ES_SYSTEM_REQUIRED = 0x00000001
def __init__(self):
self.set_Sleep()
self.set_SelfStart()
def set_Sleep(self) -> None:
"""同步系统休眠状态"""
if Config.get(Config.function_IfAllowSleep):
# 设置系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(
self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
)
else:
# 恢复系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)
def set_SelfStart(self) -> None:
"""同步开机自启"""
if Config.get(Config.start_IfSelfStart) and not self.is_startup():
# 创建任务计划
try:
# 获取当前用户和时间
current_user = getpass.getuser()
current_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# XML 模板
xml_content = f"""<?xml version="1.0" encoding="UTF-16"?>
<Task version="1.2" xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task">
<RegistrationInfo>
<Date>{current_time}</Date>
<Author>{current_user}</Author>
<Description>AUTO_MAA自启动服务</Description>
<URI>\\AUTO_MAA_AutoStart</URI>
</RegistrationInfo>
<Triggers>
<LogonTrigger>
<StartBoundary>{current_time}</StartBoundary>
<Enabled>true</Enabled>
</LogonTrigger>
</Triggers>
<Principals>
<Principal id="Author">
<LogonType>InteractiveToken</LogonType>
<RunLevel>HighestAvailable</RunLevel>
</Principal>
</Principals>
<Settings>
<MultipleInstancesPolicy>IgnoreNew</MultipleInstancesPolicy>
<DisallowStartIfOnBatteries>false</DisallowStartIfOnBatteries>
<StopIfGoingOnBatteries>false</StopIfGoingOnBatteries>
<AllowHardTerminate>false</AllowHardTerminate>
<StartWhenAvailable>true</StartWhenAvailable>
<RunOnlyIfNetworkAvailable>false</RunOnlyIfNetworkAvailable>
<IdleSettings>
<StopOnIdleEnd>false</StopOnIdleEnd>
<RestartOnIdle>false</RestartOnIdle>
</IdleSettings>
<AllowStartOnDemand>true</AllowStartOnDemand>
<Enabled>true</Enabled>
<Hidden>false</Hidden>
<RunOnlyIfIdle>false</RunOnlyIfIdle>
<WakeToRun>false</WakeToRun>
<ExecutionTimeLimit>PT0S</ExecutionTimeLimit>
<Priority>7</Priority>
</Settings>
<Actions Context="Author">
<Exec>
<Command>"{Config.app_path_sys}"</Command>
</Exec>
</Actions>
</Task>"""
# 创建临时 XML 文件并执行
with tempfile.NamedTemporaryFile(
mode="w", suffix=".xml", delete=False, encoding="utf-16"
) as f:
f.write(xml_content)
xml_file = f.name
try:
result = subprocess.run(
[
"schtasks",
"/create",
"/tn",
"AUTO_MAA_AutoStart",
"/xml",
xml_file,
"/f",
],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.success(
f"程序自启动任务计划已创建: {Config.app_path_sys}",
module="系统服务",
)
else:
logger.error(
f"程序自启动任务计划创建失败: {result.stderr}",
module="系统服务",
)
finally:
# 删除临时文件
try:
Path(xml_file).unlink()
except:
pass
except Exception as e:
logger.exception(f"程序自启动任务计划创建失败: {e}", module="系统服务")
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
try:
result = subprocess.run(
["schtasks", "/delete", "/tn", "AUTO_MAA_AutoStart", "/f"],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.success("程序自启动任务计划已删除", module="系统服务")
else:
logger.error(
f"程序自启动任务计划删除失败: {result.stderr}",
module="系统服务",
)
except Exception as e:
logger.exception(f"程序自启动任务计划删除失败: {e}", module="系统服务")
def set_power(self, mode) -> None:
"""
执行系统电源操作
:param mode: 电源操作模式,支持 "NoAction", "Shutdown", "Hibernate", "Sleep", "KillSelf", "ShutdownForce"
"""
if sys.platform.startswith("win"):
if mode == "NoAction":
logger.info("不执行系统电源操作", module="系统服务")
elif mode == "Shutdown":
self.kill_emulator_processes()
logger.info("执行关机操作", module="系统服务")
subprocess.run(["shutdown", "/s", "/t", "0"])
elif mode == "ShutdownForce":
logger.info("执行强制关机操作", module="系统服务")
subprocess.run(["shutdown", "/s", "/t", "0", "/f"])
elif mode == "Hibernate":
logger.info("执行休眠操作", module="系统服务")
subprocess.run(["shutdown", "/h"])
elif mode == "Sleep":
logger.info("执行睡眠操作", module="系统服务")
subprocess.run(
["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"]
)
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
sys.exit(0)
elif sys.platform.startswith("linux"):
if mode == "NoAction":
logger.info("不执行系统电源操作", module="系统服务")
elif mode == "Shutdown":
logger.info("执行关机操作", module="系统服务")
subprocess.run(["shutdown", "-h", "now"])
elif mode == "Hibernate":
logger.info("执行休眠操作", module="系统服务")
subprocess.run(["systemctl", "hibernate"])
elif mode == "Sleep":
logger.info("执行睡眠操作", module="系统服务")
subprocess.run(["systemctl", "suspend"])
elif mode == "KillSelf":
logger.info("执行退出主程序操作", module="系统服务")
Config.main_window.close()
QApplication.quit()
sys.exit(0)
def kill_emulator_processes(self):
"""这里暂时仅支持 MuMu 模拟器"""
logger.info("正在清除模拟器进程", module="系统服务")
keywords = ["Nemu", "nemu", "emulator", "MuMu"]
for proc in psutil.process_iter(["pid", "name"]):
try:
pname = proc.info["name"].lower()
if any(keyword.lower() in pname for keyword in keywords):
proc.kill()
logger.info(
f"已关闭 MuMu 模拟器进程: {proc.info['name']}",
module="系统服务",
)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
logger.success("模拟器进程清除完成", module="系统服务")
def is_startup(self) -> bool:
"""判断程序是否已经开机自启"""
try:
result = subprocess.run(
["schtasks", "/query", "/tn", "AUTO_MAA_AutoStart"],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
return result.returncode == 0
except Exception as e:
logger.exception(f"检查任务计划程序失败: {e}", module="系统服务")
return False
def get_window_info(self) -> list:
"""获取当前前台窗口信息"""
def callback(hwnd, window_info):
if win32gui.IsWindowVisible(hwnd) and win32gui.GetWindowText(hwnd):
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
window_info.append((win32gui.GetWindowText(hwnd), process.exe()))
return True
window_info = []
win32gui.EnumWindows(callback, window_info)
return window_info
def kill_process(self, path: Path) -> None:
"""
根据路径中止进程
:param path: 进程路径
"""
logger.info(f"开始中止进程: {path}", module="系统服务")
for pid in self.search_pids(path):
killprocess = subprocess.Popen(
f"taskkill /F /T /PID {pid}",
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
killprocess.wait()
logger.success(f"进程已中止: {path}", module="系统服务")
def search_pids(self, path: Path) -> list:
"""
根据路径查找进程PID
:param path: 进程路径
:return: 匹配的进程PID列表
"""
logger.info(f"开始查找进程 PID: {path}", module="系统服务")
pids = []
for proc in psutil.process_iter(["pid", "exe"]):
try:
if proc.info["exe"] and proc.info["exe"].lower() == str(path).lower():
pids.append(proc.info["pid"])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# 进程可能在此期间已结束或无法访问,忽略这些异常
pass
return pids
System = _SystemHandler()

285
app/task/skland.py Normal file
View File

@@ -0,0 +1,285 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file incorporates work covered by the following copyright and
# permission notice:
#
# skland-checkin-ghaction Copyright © 2023 Yanstory
# https://github.com/Yanstory/skland-checkin-ghaction
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA森空岛服务
v4.4
作者DLmaster_361、ClozyA
"""
import time
import json
import hmac
import hashlib
import requests
from urllib import parse
from app.core import Config, logger
def skland_sign_in(token) -> dict:
"""森空岛签到"""
app_code = "4ca99fa6b56cc2ba"
# 用于获取grant code
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 用于获取cred
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
# 查询角色绑定
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 签到接口
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 基础请求头
header = {
"cred": "",
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 34;) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
header_login = header.copy()
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
def generate_signature(token_for_sign: str, path, body_or_query):
"""
生成请求签名
:param token_for_sign: 用于加密的token
:param path: 请求路径(如 /api/v1/game/player/binding
:param body_or_query: GET用query字符串POST用body字符串
:return: (sign, 新的header_for_sign字典)
"""
t = str(int(time.time()) - 2) # 时间戳,-2秒以防服务器时间不一致
token_bytes = token_for_sign.encode("utf-8")
header_ca = dict(header_for_sign)
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str # 拼接原始字符串
# HMAC-SHA256 + MD5得到最终sign
hex_s = hmac.new(token_bytes, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
def get_sign_header(url: str, method, body, old_header, sign_token):
"""
获取带签名的请求头
:param url: 请求完整url
:param method: 请求方式 GET/POST
:param body: POST请求体或GET时为None
:param old_header: 原始请求头
:param sign_token: 当前会话的签名token
:return: 新请求头
"""
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
sign, header_ca = generate_signature(sign_token, p.path, p.query)
else:
sign, header_ca = generate_signature(
sign_token, p.path, json.dumps(body) if body else ""
)
h["sign"] = sign
for i in header_ca:
h[i] = header_ca[i]
return h
def copy_header(cred):
"""
复制请求头并添加cred
:param cred: 当前会话的cred
:return: 新的请求头
"""
v = json.loads(json.dumps(header))
v["cred"] = cred
return v
def login_by_token(token_code):
"""
使用token一步步拿到cred和sign_token
:param token_code: 你的skyland token
:return: (cred, sign_token)
"""
try:
# token为json对象时提取data.content
t = json.loads(token_code)
token_code = t["data"]["content"]
except:
pass
grant_code = get_grant_code(token_code)
return get_cred(grant_code)
def get_cred(grant):
"""
通过grant code获取cred和sign_token
:param grant: grant code
:return: (cred, sign_token)
"""
rsp = requests.post(
cred_code_url,
json={"code": grant, "kind": 1},
headers=header_login,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
).json()
if rsp["code"] != 0:
raise Exception(f'获得cred失败{rsp.get("messgae")}')
sign_token = rsp["data"]["token"]
cred = rsp["data"]["cred"]
return cred, sign_token
def get_grant_code(token):
"""
通过token获取grant code
:param token: 你的skyland token
:return: grant code
"""
rsp = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=header_login,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
).json()
if rsp["status"] != 0:
raise Exception(
f'使用token: {token[:3]}******{token[-3:]} 获得认证代码失败:{rsp.get("msg")}'
)
return rsp["data"]["code"]
def get_binding_list(cred, sign_token):
"""
查询已绑定的角色列表
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 角色列表
"""
v = []
rsp = requests.get(
binding_url,
headers=get_sign_header(
binding_url, "get", None, copy_header(cred), sign_token
),
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
).json()
if rsp["code"] != 0:
logger.error(
f"森空岛服务 | 请求角色列表出现问题:{rsp['message']}",
module="森空岛签到",
)
if rsp.get("message") == "用户未登录":
logger.error(
f"森空岛服务 | 用户登录可能失效了,请重新登录!",
module="森空岛签到",
)
return v
# 只取明日方舟arknights的绑定账号
for i in rsp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v
def do_sign(cred, sign_token) -> dict:
"""
对所有绑定的角色进行签到
:param cred: 当前cred
:param sign_token: 当前sign_token
:return: 签到结果字典
"""
characters = get_binding_list(cred, sign_token)
result = {"成功": [], "重复": [], "失败": [], "总计": len(characters)}
for character in characters:
body = {
"uid": character.get("uid"),
"gameId": character.get("channelMasterId"),
}
rsp = requests.post(
sign_url,
headers=get_sign_header(
sign_url, "post", body, copy_header(cred), sign_token
),
json=body,
proxies={
"http": Config.get(Config.update_ProxyAddress),
"https": Config.get(Config.update_ProxyAddress),
},
).json()
if rsp["code"] != 0:
result[
"重复" if rsp.get("message") == "请勿重复签到!" else "失败"
].append(
f"{character.get("nickName")}{character.get("channelName")}"
)
else:
result["成功"].append(
f"{character.get("nickName")}{character.get("channelName")}"
)
time.sleep(3)
return result
# 主流程
try:
# 拿到cred和sign_token
cred, sign_token = login_by_token(token)
time.sleep(1)
# 依次签到
return do_sign(cred, sign_token)
except Exception as e:
logger.exception(f"森空岛服务 | 森空岛签到失败: {e}", module="森空岛签到")
return {"成功": [], "重复": [], "失败": [], "总计": 0}

95
app/utils/ImageUtils.py Normal file
View File

@@ -0,0 +1,95 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2025 ClozyA
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA图像组件
v4.4
作者ClozyA
"""
import base64
import hashlib
from pathlib import Path
from PIL import Image
class ImageUtils:
@staticmethod
def get_base64_from_file(image_path):
"""从本地文件读取并返回base64编码字符串"""
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
@staticmethod
def calculate_md5_from_file(image_path):
"""从本地文件读取并返回md5值hex字符串"""
with open(image_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
@staticmethod
def calculate_md5_from_base64(base64_content):
"""从base64字符串计算md5"""
image_data = base64.b64decode(base64_content)
return hashlib.md5(image_data).hexdigest()
@staticmethod
def compress_image_if_needed(image_path: Path, max_size_mb=2) -> Path:
"""
如果图片大于max_size_mb则压缩并覆盖原文件返回原始路径Path对象
"""
if hasattr(Image, "Resampling"): # Pillow 9.1.0及以后
RESAMPLE = Image.Resampling.LANCZOS
else:
RESAMPLE = Image.ANTIALIAS
max_size = max_size_mb * 1024 * 1024
if image_path.stat().st_size <= max_size:
return image_path
img = Image.open(image_path)
suffix = image_path.suffix.lower()
quality = 90 if suffix in [".jpg", ".jpeg"] else None
step = 5
if suffix in [".jpg", ".jpeg"]:
while True:
img.save(image_path, quality=quality, optimize=True)
if image_path.stat().st_size <= max_size or quality <= 10:
break
quality -= step
elif suffix == ".png":
width, height = img.size
while True:
img.save(image_path, optimize=True)
if (
image_path.stat().st_size <= max_size
or width <= 200
or height <= 200
):
break
width = int(width * 0.95)
height = int(height * 0.95)
img = img.resize((width, height), RESAMPLE)
else:
raise ValueError("仅支持JPG/JPEG和PNG格式图片的压缩。")
return image_path

157
app/utils/ProcessManager.py Normal file
View File

@@ -0,0 +1,157 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
import psutil
import subprocess
from pathlib import Path
from datetime import datetime
from PySide6.QtCore import QTimer, QObject, Signal
class ProcessManager(QObject):
"""进程监视器类,用于跟踪主进程及其所有子进程的状态"""
processClosed = Signal()
def __init__(self):
super().__init__()
self.main_pid = None
self.tracked_pids = set()
self.check_timer = QTimer()
self.check_timer.timeout.connect(self.check_processes)
def open_process(self, path: Path, args: list = [], tracking_time: int = 60) -> int:
"""
启动一个新进程并返回其pid并开始监视该进程
:param path: 可执行文件的路径
:param args: 启动参数列表
:param tracking_time: 子进程追踪持续时间(秒)
:return: 新进程的PID
"""
process = subprocess.Popen(
[path, *args],
cwd=path.parent,
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
self.start_monitoring(process.pid, tracking_time)
def start_monitoring(self, pid: int, tracking_time: int = 60) -> None:
"""
启动进程监视器,跟踪指定的主进程及其子进程
:param pid: 被监视进程的PID
:param tracking_time: 子进程追踪持续时间(秒)
"""
self.clear()
self.main_pid = pid
self.tracking_time = tracking_time
# 扫描并记录所有相关进程
try:
# 获取主进程
main_proc = psutil.Process(self.main_pid)
self.tracked_pids.add(self.main_pid)
# 递归获取所有子进程
if tracking_time:
for child in main_proc.children(recursive=True):
self.tracked_pids.add(child.pid)
except psutil.NoSuchProcess:
pass
# 启动持续追踪机制
self.start_time = datetime.now()
self.check_timer.start(100)
def check_processes(self) -> None:
"""检查跟踪的进程是否仍在运行,并更新子进程列表"""
# 仅在时限内持续更新跟踪的进程列表,发现新的子进程
if (datetime.now() - self.start_time).total_seconds() < self.tracking_time:
current_pids = set(self.tracked_pids)
for pid in current_pids:
try:
proc = psutil.Process(pid)
for child in proc.children():
if child.pid not in self.tracked_pids:
# 新发现的子进程
self.tracked_pids.add(child.pid)
except psutil.NoSuchProcess:
continue
if not self.is_running():
self.clear()
self.processClosed.emit()
def is_running(self) -> bool:
"""检查所有跟踪的进程是否还在运行"""
for pid in self.tracked_pids:
try:
proc = psutil.Process(pid)
if proc.is_running():
return True
except psutil.NoSuchProcess:
continue
return False
def kill(self, if_force: bool = False) -> None:
"""停止监视器并中止所有跟踪的进程"""
self.check_timer.stop()
for pid in self.tracked_pids:
try:
proc = psutil.Process(pid)
if if_force:
kill_process = subprocess.Popen(
["taskkill", "/F", "/T", "/PID", str(pid)],
creationflags=subprocess.CREATE_NO_WINDOW,
)
kill_process.wait()
proc.terminate()
except psutil.NoSuchProcess:
continue
if self.main_pid:
self.processClosed.emit()
self.clear()
def clear(self) -> None:
"""清空跟踪的进程列表"""
self.main_pid = None
self.check_timer.stop()
self.tracked_pids.clear()

View File

@@ -25,5 +25,6 @@ __license__ = "GPL-3.0 license"
from .logger import get_logger from .logger import get_logger
from .security import dpapi_encrypt, dpapi_decrypt
__all__ = ["get_logger"] __all__ = ["get_logger", "dpapi_encrypt", "dpapi_decrypt"]

69
app/utils/security.py Normal file
View File

@@ -0,0 +1,69 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
import base64
import win32crypt
def dpapi_encrypt(
note: str, description: None | str = None, entropy: None | bytes = None
) -> str:
"""
使用Windows DPAPI加密数据
:param note: 数据明文
:type note: str
:param description: 描述信息
:type description: str
:param entropy: 随机熵
:type entropy: bytes
:return: 加密后的数据
:rtype: str
"""
if note == "":
return ""
encrypted = win32crypt.CryptProtectData(
note.encode("utf-8"), description, entropy, None, None, 0
)
return base64.b64encode(encrypted).decode("utf-8")
def dpapi_decrypt(note: str, entropy: None | bytes = None) -> str:
"""
使用Windows DPAPI解密数据
:param note: 数据密文
:type note: str
:param entropy: 随机熵
:type entropy: bytes
:return: 解密后的明文
:rtype: str
"""
if note == "":
return ""
decrypted = win32crypt.CryptUnprotectData(
base64.b64decode(note), entropy, None, None, 0
)
return decrypted[1].decode("utf-8")