feat(core): 初步完成通用调度模块

This commit is contained in:
DLmaster361
2025-07-10 02:29:08 +08:00
parent 7c315624b1
commit 1c0a65957d
22 changed files with 3280 additions and 408 deletions

View File

@@ -84,6 +84,7 @@ jobs:
onefile-tempdir-spec: "{TEMP}/AUTO_MAA"
windows-console-mode: attach
windows-icon-from-ico: resources/icons/AUTO_MAA.ico
windows-uac-admin: true
company-name: AUTO_MAA Team
product-name: AUTO_MAA
file-version: ${{ steps.get_version.outputs.main_version }}

View File

@@ -29,7 +29,15 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .config import QueueConfig, MaaConfig, MaaUserConfig, MaaPlanConfig, Config
from .config import (
QueueConfig,
MaaConfig,
MaaUserConfig,
MaaPlanConfig,
GeneralConfig,
GeneralSubConfig,
Config,
)
from .main_info_bar import MainInfoBar
from .network import Network
from .sound_player import SoundPlayer
@@ -42,6 +50,8 @@ __all__ = [
"MaaConfig",
"MaaUserConfig",
"MaaPlanConfig",
"GeneralConfig",
"GeneralSubConfig",
"MainInfoBar",
"Network",
"SoundPlayer",

View File

@@ -56,6 +56,17 @@ from typing import Union, Dict, List
from .network import Network
class FileValidator(ConfigValidator):
"""File validator"""
def validate(self, value):
return Path(value).exists()
def correct(self, value):
path = Path(value)
return str(path.absolute()).replace("\\", "/")
class UrlListValidator(ConfigValidator):
"""Url list validator"""
@@ -382,6 +393,9 @@ class MaaConfig(LQConfig):
"RunSet", "AutoUpdateMaa", False, BoolValidator()
)
def get_name(self) -> str:
return self.get(self.MaaSet_Name)
class MaaUserConfig(LQConfig):
"""MAA用户配置"""
@@ -574,13 +588,118 @@ class MaaPlanConfig(LQConfig):
return self.config_item_dict["ALL"][name]
class GeneralConfig(LQConfig):
"""通用配置"""
def __init__(self) -> None:
super().__init__()
self.Script_Name = ConfigItem("Script", "Name", "")
self.Script_RootPath = ConfigItem("Script", "RootPath", ".", FolderValidator())
self.Script_ScriptPath = ConfigItem(
"Script", "ScriptPath", ".", FileValidator()
)
self.Script_Arguments = ConfigItem("Script", "Arguments", "")
self.Script_ConfigPath = ConfigItem(
"Script", "ConfigPath", ".", FolderValidator()
)
self.Script_LogPath = ConfigItem("Script", "LogPath", ".", FileValidator())
self.Script_LogTimeStart = ConfigItem(
"Script", "LogTimeStart", 0, RangeValidator(0, 1024)
)
self.Script_LogTimeEnd = ConfigItem(
"Script", "LogTimeEnd", 0, RangeValidator(0, 1024)
)
self.Script_LogTimeFormat = ConfigItem(
"Script", "LogTimeFormat", "%Y-%m-%d %H:%M:%S"
)
self.Script_SuccessLog = ConfigItem("Script", "SuccessLog", "")
self.Script_ErrorLog = ConfigItem("Script", "ErrorLog", "")
self.Game_Enabled = ConfigItem("Game", "Enabled", True, BoolValidator())
self.Game_Style = OptionsConfigItem(
"Game", "Style", "Emulator", OptionsValidator(["Emulator", "Client"])
)
self.Game_Path = ConfigItem("Game", "Path", ".", FileValidator())
self.Game_Arguments = ConfigItem("Game", "Arguments", "")
self.Game_WaitTime = ConfigItem("Game", "WaitTime", 0, RangeValidator(0, 1024))
self.Game_IfForceClose = ConfigItem(
"Game", "IfForceClose", False, BoolValidator()
)
self.Run_ProxyTimesLimit = RangeConfigItem(
"Run", "ProxyTimesLimit", 0, RangeValidator(0, 1024)
)
self.Run_RunTimesLimit = RangeConfigItem(
"Run", "RunTimesLimit", 3, RangeValidator(1, 1024)
)
self.Run_RunTimeLimit = RangeConfigItem(
"Run", "RunTimeLimit", 10, RangeValidator(1, 1024)
)
def get_name(self) -> str:
return self.get(self.Script_Name)
class GeneralSubConfig(LQConfig):
"""通用子配置"""
def __init__(self) -> None:
super().__init__()
self.Info_Name = ConfigItem("Info", "Name", "新配置")
self.Info_Status = ConfigItem("Info", "Status", True, BoolValidator())
self.Info_RemainedDay = ConfigItem(
"Info", "RemainedDay", -1, RangeValidator(-1, 1024)
)
self.Info_IfScriptBeforeTask = ConfigItem(
"Info", "IfScriptBeforeTask", False, BoolValidator()
)
self.Info_ScriptBeforeTask = ConfigItem(
"Info", "ScriptBeforeTask", "", FileValidator()
)
self.Info_IfScriptAfterTask = ConfigItem(
"Info", "IfScriptAfterTask", False, BoolValidator()
)
self.Info_ScriptAfterTask = ConfigItem(
"Info", "ScriptAfterTask", "", FileValidator()
)
self.Info_Notes = ConfigItem("Info", "Notes", "")
self.Data_LastProxyDate = ConfigItem("Data", "LastProxyDate", "2000-01-01")
self.Data_ProxyTimes = ConfigItem(
"Data", "ProxyTimes", 0, RangeValidator(0, 1024)
)
self.Notify_Enabled = ConfigItem("Notify", "Enabled", False, BoolValidator())
self.Notify_IfSendStatistic = ConfigItem(
"Notify", "IfSendStatistic", False, BoolValidator()
)
self.Notify_IfSendMail = ConfigItem(
"Notify", "IfSendMail", False, BoolValidator()
)
self.Notify_ToAddress = ConfigItem("Notify", "ToAddress", "")
self.Notify_IfServerChan = ConfigItem(
"Notify", "IfServerChan", False, BoolValidator()
)
self.Notify_ServerChanKey = ConfigItem("Notify", "ServerChanKey", "")
self.Notify_ServerChanChannel = ConfigItem("Notify", "ServerChanChannel", "")
self.Notify_ServerChanTag = ConfigItem("Notify", "ServerChanTag", "")
self.Notify_IfCompanyWebHookBot = ConfigItem(
"Notify", "IfCompanyWebHookBot", False, BoolValidator()
)
self.Notify_CompanyWebHookBotUrl = ConfigItem(
"Notify", "CompanyWebHookBotUrl", ""
)
class AppConfig(GlobalConfig):
VERSION = "4.3.12.0"
VERSION = "4.4.0.1"
gameid_refreshed = Signal()
PASSWORD_refreshed = Signal()
user_info_changed = Signal()
sub_info_changed = Signal()
power_sign_changed = Signal()
def __init__(self) -> None:
@@ -777,7 +896,7 @@ class AppConfig(GlobalConfig):
db = sqlite3.connect(self.database_path)
cur = db.cursor()
cur.execute("CREATE TABLE version(v text)")
cur.execute("INSERT INTO version VALUES(?)", ("v1.5",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.6",))
db.commit()
cur.close()
db.close()
@@ -788,221 +907,9 @@ class AppConfig(GlobalConfig):
cur.execute("SELECT * FROM version WHERE True")
version = cur.fetchall()
if version[0][0] != "v1.5":
if version[0][0] != "v1.6":
logger.info("数据文件版本更新开始")
if_streaming = False
# v1.0-->v1.1
if version[0][0] == "v1.0" or if_streaming:
logger.info("数据文件版本更新v1.0-->v1.1")
if_streaming = True
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
cur.execute("DROP TABLE IF EXISTS adminx")
cur.execute(
"CREATE TABLE adminx(admin text,id text,server text,day int,status text,last date,game text,game_1 text,game_2 text,routines text,annihilation text,infrastructure text,password byte,notes text,numb int,mode text,uid int)"
)
for i in range(len(data)):
cur.execute(
"INSERT INTO adminx VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
data[i][0], # 0 0 0
data[i][1], # 1 1 -
"Official", # 2 2 -
data[i][2], # 3 3 1
data[i][3], # 4 4 2
data[i][4], # 5 5 3
data[i][5], # 6 6 -
data[i][6], # 7 7 -
data[i][7], # 8 8 -
"y", # 9 - 4
data[i][8], # 10 9 5
data[i][9], # 11 10 -
data[i][10], # 12 11 6
data[i][11], # 13 12 7
data[i][12], # 14 - -
"simple", # 15 - -
data[i][13], # 16 - -
),
)
cur.execute("DELETE FROM version WHERE v = ?", ("v1.0",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.1",))
db.commit()
# v1.1-->v1.2
if version[0][0] == "v1.1" or if_streaming:
logger.info("数据文件版本更新v1.1-->v1.2")
if_streaming = True
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
for i in range(len(data)):
cur.execute(
"UPDATE adminx SET infrastructure = 'n' WHERE mode = ? AND uid = ?",
(
data[i][15],
data[i][16],
),
)
cur.execute("DELETE FROM version WHERE v = ?", ("v1.1",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.2",))
db.commit()
# v1.2-->v1.3
if version[0][0] == "v1.2" or if_streaming:
logger.info("数据文件版本更新v1.2-->v1.3")
if_streaming = True
cur.execute("ALTER TABLE adminx RENAME COLUMN routines TO routine")
cur.execute("DELETE FROM version WHERE v = ?", ("v1.2",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.3",))
db.commit()
# v1.3-->v1.4
if version[0][0] == "v1.3" or if_streaming:
logger.info("数据文件版本更新v1.3-->v1.4")
if_streaming = True
(self.app_path / "config/MaaConfig").mkdir(parents=True, exist_ok=True)
shutil.move(
self.app_path / "data/MaaConfig",
self.app_path / "config/MaaConfig",
)
(self.app_path / "config/MaaConfig/MaaConfig").rename(
self.app_path / "config/MaaConfig/脚本_1"
)
shutil.copy(
self.database_path,
self.app_path / "config/MaaConfig/脚本_1/user_data.db",
)
cur.execute("DROP TABLE IF EXISTS adminx")
cur.execute("DELETE FROM version WHERE v = ?", ("v1.3",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.4",))
db.commit()
with (self.app_path / "config/gui.json").open(
"r", encoding="utf-8"
) as f:
info = json.load(f)
maa_config = {
"MaaSet": {
"Name": "",
"Path": info["Default"]["MaaSet.path"],
},
"RunSet": {
"AnnihilationTimeLimit": info["Default"][
"TimeLimit.annihilation"
],
"RoutineTimeLimit": info["Default"]["TimeLimit.routine"],
"RunTimesLimit": info["Default"]["TimesLimit.run"],
},
}
with (self.app_path / "config/MaaConfig/脚本_1/config.json").open(
"w", encoding="utf-8"
) as f:
json.dump(maa_config, f, ensure_ascii=False, indent=4)
config = {
"Function": {
"BossKey": info["Default"]["SelfSet.BossKey"],
"IfAllowSleep": bool(
info["Default"]["SelfSet.IfSleep"] == "True"
),
"IfSilence": bool(
info["Default"]["SelfSet.IfSilence"] == "True"
),
},
"Notify": {
"IfPushPlyer": True,
"IfSendErrorOnly": bool(
info["Default"]["SelfSet.IfSendMail.OnlyError"] == "True"
),
"IfSendMail": bool(
info["Default"]["SelfSet.IfSendMail"] == "True"
),
"MailAddress": info["Default"]["SelfSet.MailAddress"],
},
"Start": {
"IfRunDirectly": bool(
info["Default"]["SelfSet.IfProxyDirectly"] == "True"
),
"IfSelfStart": bool(
info["Default"]["SelfSet.IfSelfStart"] == "True"
),
},
"UI": {
"IfShowTray": bool(
info["Default"]["SelfSet.IfToTray"] == "True"
),
"IfToTray": bool(info["Default"]["SelfSet.IfToTray"] == "True"),
"location": info["Default"]["SelfSet.UIlocation"],
"maximized": bool(
info["Default"]["SelfSet.UImaximized"] == "True"
),
"size": info["Default"]["SelfSet.UIsize"],
},
"Update": {"IfAutoUpdate": False},
}
with (self.app_path / "config/config.json").open(
"w", encoding="utf-8"
) as f:
json.dump(config, f, ensure_ascii=False, indent=4)
queue_config = {
"QueueSet": {"Enabled": True, "Name": ""},
"Queue": {
"Member_1": "脚本_1",
"Member_10": "禁用",
"Member_2": "禁用",
"Member_3": "禁用",
"Member_4": "禁用",
"Member_5": "禁用",
"Member_6": "禁用",
"Member_7": "禁用",
"Member_8": "禁用",
"Member_9": "禁用",
},
"Time": {
"TimeEnabled_0": bool(
info["Default"]["TimeSet.set1"] == "True"
),
"TimeEnabled_1": bool(
info["Default"]["TimeSet.set2"] == "True"
),
"TimeEnabled_2": bool(
info["Default"]["TimeSet.set3"] == "True"
),
"TimeEnabled_3": bool(
info["Default"]["TimeSet.set4"] == "True"
),
"TimeEnabled_4": bool(
info["Default"]["TimeSet.set5"] == "True"
),
"TimeEnabled_5": bool(
info["Default"]["TimeSet.set6"] == "True"
),
"TimeEnabled_6": bool(
info["Default"]["TimeSet.set7"] == "True"
),
"TimeEnabled_7": bool(
info["Default"]["TimeSet.set8"] == "True"
),
"TimeEnabled_8": bool(
info["Default"]["TimeSet.set9"] == "True"
),
"TimeEnabled_9": bool(
info["Default"]["TimeSet.set10"] == "True"
),
"TimeSet_0": info["Default"]["TimeSet.run1"],
"TimeSet_1": info["Default"]["TimeSet.run2"],
"TimeSet_2": info["Default"]["TimeSet.run3"],
"TimeSet_3": info["Default"]["TimeSet.run4"],
"TimeSet_4": info["Default"]["TimeSet.run5"],
"TimeSet_5": info["Default"]["TimeSet.run6"],
"TimeSet_6": info["Default"]["TimeSet.run7"],
"TimeSet_7": info["Default"]["TimeSet.run8"],
"TimeSet_8": info["Default"]["TimeSet.run9"],
"TimeSet_9": info["Default"]["TimeSet.run10"],
},
}
(self.app_path / "config/QueueConfig").mkdir(
parents=True, exist_ok=True
)
with (self.app_path / "config/QueueConfig/调度队列_1.json").open(
"w", encoding="utf-8"
) as f:
json.dump(queue_config, f, ensure_ascii=False, indent=4)
(self.app_path / "config/gui.json").unlink()
# v1.4-->v1.5
if version[0][0] == "v1.4" or if_streaming:
logger.info("数据文件版本更新v1.4-->v1.5")
@@ -1130,7 +1037,38 @@ class AppConfig(GlobalConfig):
shutil.rmtree(config["Path"] / f"simple")
if (config["Path"] / f"beta").exists():
shutil.rmtree(config["Path"] / f"beta")
# v1.5-->v1.6
if version[0][0] == "v1.5" or if_streaming:
logger.info("数据文件版本更新v1.5-->v1.6")
if_streaming = True
cur.execute("DELETE FROM version WHERE v = ?", ("v1.5",))
cur.execute("INSERT INTO version VALUES(?)", ("v1.6",))
db.commit()
# 删除旧的注册表项
import winreg
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
)
try:
value, _ = winreg.QueryValueEx(key, "AUTO_MAA")
winreg.CloseKey(key)
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
winreg.KEY_SET_VALUE,
winreg.KEY_ALL_ACCESS
| winreg.KEY_WRITE
| winreg.KEY_CREATE_SUB_KEY,
)
winreg.DeleteValue(key, "AUTO_MAA")
winreg.CloseKey(key)
except FileNotFoundError:
pass
cur.close()
db.close()
logger.info("数据文件版本更新完成")
@@ -1145,8 +1083,8 @@ class AppConfig(GlobalConfig):
Union[
str,
Path,
MaaConfig,
Dict[str, Dict[str, Union[Path, MaaUserConfig]]],
Union[MaaConfig, GeneralConfig],
Dict[str, Dict[str, Union[Path, MaaUserConfig, GeneralSubConfig]]],
],
],
] = {}
@@ -1164,12 +1102,26 @@ class AppConfig(GlobalConfig):
"Config": maa_config,
"UserData": None,
}
if (self.app_path / "config/GeneralConfig").exists():
for general_dir in (self.app_path / "config/GeneralConfig").iterdir():
if general_dir.is_dir():
general_config = GeneralConfig()
general_config.load(general_dir / "config.json", general_config)
general_config.save()
self.member_dict[general_dir.name] = {
"Type": "General",
"Path": general_dir,
"Config": general_config,
"SubData": None,
}
self.member_dict = dict(
sorted(self.member_dict.items(), key=lambda x: int(x[0][3:]))
)
def search_maa_user(self, name) -> None:
def search_maa_user(self, name: str) -> None:
user_dict: Dict[str, Dict[str, Union[Path, MaaUserConfig]]] = {}
for user_dir in (Config.member_dict[name]["Path"] / "UserData").iterdir():
@@ -1179,15 +1131,28 @@ class AppConfig(GlobalConfig):
user_config.load(user_dir / "config.json", user_config)
user_config.save()
user_dict[user_dir.stem] = {
"Path": user_dir,
"Config": user_config,
}
user_dict[user_dir.stem] = {"Path": user_dir, "Config": user_config}
self.member_dict[name]["UserData"] = dict(
sorted(user_dict.items(), key=lambda x: int(x[0][3:]))
)
def search_general_sub(self, name: str) -> None:
user_dict: Dict[str, Dict[str, Union[Path, GeneralSubConfig]]] = {}
for sub_dir in (Config.member_dict[name]["Path"] / "SubData").iterdir():
if sub_dir.is_dir():
sub_config = GeneralSubConfig()
sub_config.load(sub_dir / "config.json", sub_config)
sub_config.save()
user_dict[sub_dir.stem] = {"Path": sub_dir, "Config": sub_config}
self.member_dict[name]["SubData"] = dict(
sorted(user_dict.items(), key=lambda x: int(x[0][3:]))
)
def search_plan(self) -> None:
"""搜索所有计划表"""
@@ -1267,7 +1232,7 @@ class AppConfig(GlobalConfig):
if user["Config"].get(user["Config"].Info_GameIdMode) == old:
user["Config"].set(user["Config"].Info_GameIdMode, new)
def change_user_info(
def change_maa_user_info(
self, name: str, user_data: Dict[str, Dict[str, Union[str, Path, dict]]]
) -> None:
"""代理完成后保存改动的用户信息"""
@@ -1301,7 +1266,28 @@ class AppConfig(GlobalConfig):
info["Config"]["Data"]["CustomInfrastPlanIndex"],
)
self.user_info_changed.emit()
self.sub_info_changed.emit()
def change_general_sub_info(
self, name: str, sub_data: Dict[str, Dict[str, Union[str, Path, dict]]]
) -> None:
"""代理完成后保存改动的配置信息"""
for sub, info in sub_data.items():
sub_config = self.member_dict[name]["SubData"][sub]["Config"]
sub_config.set(
sub_config.Info_RemainedDay, info["Config"]["Info"]["RemainedDay"]
)
sub_config.set(
sub_config.Data_LastProxyDate, info["Config"]["Data"]["LastProxyDate"]
)
sub_config.set(
sub_config.Data_ProxyTimes, info["Config"]["Data"]["ProxyTimes"]
)
self.sub_info_changed.emit()
def set_power_sign(self, sign: str) -> None:
"""设置当前电源状态"""

View File

@@ -36,7 +36,7 @@ from .config import Config
from .main_info_bar import MainInfoBar
from .network import Network
from .sound_player import SoundPlayer
from app.models import MaaManager
from app.models import MaaManager, GeneralManager
from app.services import System
@@ -48,7 +48,8 @@ class Task(QThread):
play_sound = Signal(str)
question = Signal(str, str)
question_response = Signal(bool)
update_user_info = Signal(str, dict)
update_maa_user_info = Signal(str, dict)
update_general_sub_info = Signal(str, dict)
create_task_list = Signal(list)
create_user_list = Signal(list)
update_task_list = Signal(list)
@@ -89,7 +90,31 @@ class Task(QThread):
self.task.play_sound.connect(self.play_sound.emit)
self.task.accomplish.connect(lambda: self.accomplish.emit([]))
self.task.run()
try:
self.task.run()
except Exception as e:
logger.exception(f"任务异常:{self.name},错误信息:{e}")
self.push_info_bar.emit("error", "任务异常", self.name, -1)
elif self.mode == "设置通用脚本":
logger.info(f"任务开始:设置{self.name}")
self.push_info_bar.emit("info", "设置通用脚本", self.name, 3000)
self.task = GeneralManager(
self.mode,
Config.member_dict[self.name],
self.info["SetSubInfo"]["Path"],
)
self.task.push_info_bar.connect(self.push_info_bar.emit)
self.task.play_sound.connect(self.play_sound.emit)
self.task.accomplish.connect(lambda: self.accomplish.emit([]))
try:
self.task.run()
except Exception as e:
logger.exception(f"任务异常:{self.name},错误信息:{e}")
self.push_info_bar.emit("error", "任务异常", self.name, -1)
else:
@@ -97,11 +122,8 @@ class Task(QThread):
[
(
value
if Config.member_dict[value]["Config"].get(
Config.member_dict[value]["Config"].MaaSet_Name
)
== ""
else f"{value} - {Config.member_dict[value]["Config"].get(Config.member_dict[value]["Config"].MaaSet_Name)}"
if Config.member_dict[value]["Config"].get_name() == ""
else f"{value} - {Config.member_dict[value]["Config"].get_name()}"
),
"等待",
value,
@@ -150,24 +172,60 @@ class Task(QThread):
self.task.create_user_list.connect(self.create_user_list.emit)
self.task.update_user_list.connect(self.update_user_list.emit)
self.task.update_log_text.connect(self.update_log_text.emit)
self.task.update_user_info.connect(self.update_user_info.emit)
self.task.update_user_info.connect(self.update_maa_user_info.emit)
self.task.accomplish.connect(
lambda log: self.task_accomplish(task[2], log)
)
elif Config.member_dict[task[2]]["Type"] == "General":
self.task = GeneralManager(
self.mode[0:4],
Config.member_dict[task[2]],
)
self.task.question.connect(self.question.emit)
self.question_response.disconnect()
self.question_response.connect(self.task.question_response.emit)
self.task.push_info_bar.connect(self.push_info_bar.emit)
self.task.play_sound.connect(self.play_sound.emit)
self.task.create_user_list.connect(self.create_user_list.emit)
self.task.update_user_list.connect(self.update_user_list.emit)
self.task.update_log_text.connect(self.update_log_text.emit)
self.task.update_sub_info.connect(self.update_general_sub_info.emit)
self.task.accomplish.connect(
lambda log: self.task_accomplish(task[2], log)
)
try:
self.task.run()
Config.running_list.remove(task[2])
task[1] = "完成"
self.update_task_list.emit(self.task_list)
logger.info(f"任务完成:{task[0]}")
self.push_info_bar.emit("info", "任务完成", task[0], 3000)
task[1] = "完成"
self.update_task_list.emit(self.task_list)
logger.info(f"任务完成:{task[0]}")
self.push_info_bar.emit("info", "任务完成", task[0], 3000)
except Exception as e:
self.task_accomplish(
task[2],
{
"Time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"History": f"任务异常,异常简报:{e}",
},
)
task[1] = "异常"
self.update_task_list.emit(self.task_list)
logger.exception(f"任务异常:{task[0]},错误信息:{e}")
self.push_info_bar.emit("error", "任务异常", task[0], -1)
Config.running_list.remove(task[2])
self.accomplish.emit(self.logs)
def task_accomplish(self, name: str, log: dict):
"""保存保存任务结果"""
"""保存任务结果"""
self.logs.append([name, log])
self.task.deleteLater()
@@ -207,7 +265,10 @@ class _TaskManager(QObject):
)
self.task_dict[name].push_info_bar.connect(MainInfoBar.push_info_bar)
self.task_dict[name].play_sound.connect(SoundPlayer.play)
self.task_dict[name].update_user_info.connect(Config.change_user_info)
self.task_dict[name].update_maa_user_info.connect(Config.change_maa_user_info)
self.task_dict[name].update_general_sub_info.connect(
Config.change_general_sub_info
)
self.task_dict[name].accomplish.connect(
lambda logs: self.remove_task(mode, name, logs)
)

View File

@@ -40,7 +40,7 @@ from typing import Union, List, Dict
from app.core import Config, MaaConfig, MaaUserConfig
from app.services import Notify, Crypto, System, skland_sign_in
from app.utils.ImageUtils import ImageUtils
from app.utils import ProcessManager
class MaaManager(QObject):
@@ -58,8 +58,6 @@ class MaaManager(QObject):
interrupt = Signal()
accomplish = Signal(dict)
isInterruptionRequested = False
def __init__(
self,
mode: str,
@@ -81,17 +79,25 @@ class MaaManager(QObject):
self.config_path = config["Path"]
self.user_config_path = user_config_path
self.emulator_process_manager = ProcessManager()
self.maa_process_manager = ProcessManager()
self.log_monitor = QFileSystemWatcher()
self.log_monitor_timer = QTimer()
self.log_monitor_timer.timeout.connect(self.refresh_maa_log)
self.monitor_loop = QEventLoop()
self.maa_process_manager.processClosed.connect(
lambda: self.log_monitor.fileChanged.emit("进程结束检查")
)
self.question_loop = QEventLoop()
self.question_response.connect(self.__capture_response)
self.question_response.connect(self.question_loop.quit)
self.wait_loop = QEventLoop()
self.isInterruptionRequested = False
self.interrupt.connect(self.quit_monitor)
self.maa_version = None
@@ -505,9 +511,8 @@ class MaaManager(QObject):
logger.info(
f"{self.name} | 启动模拟器:{self.emulator_path},参数:{self.emulator_arguments}"
)
self.emulator_process = subprocess.Popen(
[self.emulator_path, *self.emulator_arguments],
creationflags=subprocess.CREATE_NO_WINDOW,
self.emulator_process_manager.open_process(
self.emulator_path, self.emulator_arguments, 0
)
except Exception as e:
logger.error(f"{self.name} | 启动模拟器时出现异常:{e}")
@@ -526,10 +531,7 @@ class MaaManager(QObject):
self.search_ADB_address()
# 创建MAA任务
maa = subprocess.Popen(
[self.maa_exe_path],
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.maa_process_manager.open_process(self.maa_exe_path, [], 10)
# 监测MAA运行状态
self.start_monitor(start_time, mode_book[mode])
@@ -579,11 +581,11 @@ class MaaManager(QObject):
f"{self.maa_result}\n正在中止相关程序\n请等待10s"
)
# 无命令行中止MAA与其子程序
self.maa_process_manager.kill(if_force=True)
System.kill_process(self.maa_exe_path)
# 中止模拟器进程
self.emulator_process.terminate()
self.emulator_process.wait()
self.emulator_process_manager.kill()
self.if_open_emulator = True
@@ -644,8 +646,7 @@ class MaaManager(QObject):
)
# 任务结束后再次手动中止模拟器进程,防止退出不彻底
if self.if_kill_emulator:
self.emulator_process.terminate()
self.emulator_process.wait()
self.emulator_process_manager.kill()
self.if_open_emulator = True
# 记录剿灭情况
@@ -777,10 +778,7 @@ class MaaManager(QObject):
# 记录当前时间
start_time = datetime.now()
# 创建MAA任务
maa = subprocess.Popen(
[self.maa_exe_path],
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.maa_process_manager.open_process(self.maa_exe_path, [], 10)
# 监测MAA运行状态
self.start_monitor(start_time, "人工排查")
@@ -799,6 +797,7 @@ class MaaManager(QObject):
f"{self.maa_result}\n正在中止相关程序\n请等待10s"
)
# 无命令行中止MAA与其子程序
self.maa_process_manager.kill(if_force=True)
System.kill_process(self.maa_exe_path)
self.if_open_emulator = True
self.sleep(10)
@@ -845,10 +844,7 @@ class MaaManager(QObject):
# 配置MAA
self.set_maa(self.mode, "")
# 创建MAA任务
maa = subprocess.Popen(
[self.maa_exe_path],
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.maa_process_manager.open_process(self.maa_exe_path, [], 10)
# 记录当前时间
start_time = datetime.now()
@@ -870,6 +866,7 @@ class MaaManager(QObject):
# 关闭可能未正常退出的MAA进程
if self.isInterruptionRequested:
self.maa_process_manager.kill(if_force=True)
System.kill_process(self.maa_exe_path)
# 复原MAA配置文件
@@ -1024,6 +1021,7 @@ class MaaManager(QObject):
self.ADB_address = ADB_address
# 覆写当前ADB地址
self.maa_process_manager.kill(if_force=True)
System.kill_process(self.maa_exe_path)
with self.maa_set_path.open(mode="r", encoding="utf-8") as f:
data = json.load(f)
@@ -1053,7 +1051,7 @@ class MaaManager(QObject):
# 一分钟内未执行日志变化检查,强制检查一次
if datetime.now() - self.last_check_time > timedelta(minutes=1):
self.log_monitor.fileChanged.emit(self.log_monitor.files()[0])
self.log_monitor.fileChanged.emit("1分钟超时检查")
def check_maa_log(self, start_time: datetime, mode: str) -> list:
"""获取MAA日志并检查以判断MAA程序运行状态"""
@@ -1155,7 +1153,10 @@ class MaaManager(QObject):
elif "已停止" in log:
self.maa_result = "MAA在完成任务前中止"
elif "MaaAssistantArknights GUI exited" in log:
elif (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
):
self.maa_result = "MAA在完成任务前退出"
elif datetime.now() - latest_time > timedelta(
@@ -1178,7 +1179,10 @@ class MaaManager(QObject):
self.maa_result = "MAA未检测到任何模拟器"
elif "已停止" in log:
self.maa_result = "MAA在完成任务前中止"
elif "MaaAssistantArknights GUI exited" in log:
elif (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
):
self.maa_result = "MAA在完成任务前退出"
elif self.isInterruptionRequested:
self.maa_result = "任务被手动中止"
@@ -1186,7 +1190,10 @@ class MaaManager(QObject):
self.maa_result = "Wait"
elif mode == "设置MAA":
if "MaaAssistantArknights GUI exited" in log:
if (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
):
self.maa_result = "Success!"
else:
self.maa_result = "Wait"
@@ -1229,6 +1236,7 @@ class MaaManager(QObject):
user_data = self.data[index]["Config"]
# 配置MAA前关闭可能未正常退出的MAA进程
self.maa_process_manager.kill(if_force=True)
System.kill_process(self.maa_exe_path)
# 预导入MAA配置文件

View File

@@ -29,6 +29,7 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .general import GeneralManager
from .MAA import MaaManager
__all__ = ["MaaManager"]
__all__ = ["GeneralManager", "MaaManager"]

905
app/models/general.py Normal file
View File

@@ -0,0 +1,905 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
通用功能组件
v4.3
作者DLmaster_361
"""
from loguru import logger
from PySide6.QtCore import QObject, Signal, QEventLoop, QFileSystemWatcher, QTimer
import os
import sys
import shutil
import subprocess
from functools import partial
from datetime import datetime, timedelta
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from typing import Union, List, Dict
from app.core import Config, GeneralConfig, GeneralSubConfig
from app.services import Notify, System
from app.utils import ProcessManager
class GeneralManager(QObject):
"""通用脚本通用控制器"""
question = Signal(str, str)
question_response = Signal(bool)
update_sub_info = Signal(str, dict)
push_info_bar = Signal(str, str, str, int)
play_sound = Signal(str)
create_user_list = Signal(list)
update_user_list = Signal(list)
update_log_text = Signal(str)
interrupt = Signal()
accomplish = Signal(dict)
def __init__(
self,
mode: str,
config: Dict[
str,
Union[
str,
Path,
GeneralConfig,
Dict[str, Dict[str, Union[Path, GeneralSubConfig]]],
],
],
sub_config_path: Path = None,
):
super(GeneralManager, self).__init__()
self.sub_list = []
self.mode = mode
self.config_path = config["Path"]
self.sub_config_path = sub_config_path
self.game_process_manager = ProcessManager()
self.script_process_manager = ProcessManager()
self.log_monitor = QFileSystemWatcher()
self.log_monitor_timer = QTimer()
self.log_monitor_timer.timeout.connect(self.refresh_log)
self.monitor_loop = QEventLoop()
self.script_process_manager.processClosed.connect(
lambda: self.log_monitor.fileChanged.emit("进程结束检查")
)
self.question_loop = QEventLoop()
self.question_response.connect(self.__capture_response)
self.question_response.connect(self.question_loop.quit)
self.wait_loop = QEventLoop()
self.isInterruptionRequested = False
self.interrupt.connect(self.quit_monitor)
self.task_dict = {}
self.set = config["Config"].toDict()
self.data: Dict[str, Dict[str, Union[Path, dict]]] = {}
if self.mode != "设置通用脚本":
for name, info in config["SubData"].items():
self.data[name] = {
"Path": info["Path"],
"Config": info["Config"].toDict(),
}
self.data = dict(sorted(self.data.items(), key=lambda x: int(x[0][3:])))
def check_config_info(self) -> bool:
"""检查配置完整性"""
if not (
Path(self.set["Script"]["RootPath"]).exists()
and Path(self.set["Script"]["ScriptPath"]).exists()
and Path(self.set["Script"]["ConfigPath"]).exists()
and Path(self.set["Script"]["LogPath"]).exists()
and self.set["Script"]["LogTimeFormat"]
and self.set["Script"]["ErrorLog"]
) or (
self.set["Game"]["Enabled"] and not Path(self.set["Game"]["Path"]).exists()
):
logger.error("脚本配置缺失")
self.push_info_bar.emit("error", "脚本配置缺失", "请检查脚本配置!", -1)
return False
return True
def configure(self):
"""提取配置信息"""
self.name = self.set["Script"]["Name"]
self.script_root_path = Path(self.set["Script"]["RootPath"])
self.script_exe_path = Path(self.set["Script"]["ScriptPath"])
self.script_config_path = Path(self.set["Script"]["ConfigPath"])
self.script_log_path = Path(self.set["Script"]["LogPath"])
self.game_path = Path(self.set["Game"]["Path"])
self.log_time_range = [
self.set["Script"]["LogTimeStart"],
self.set["Script"]["LogTimeEnd"],
]
self.success_log = [
_.strip() for _ in self.set["Script"]["SuccessLog"].split("|")
]
print(f"Success Log: {self.success_log}")
self.error_log = [_.strip() for _ in self.set["Script"]["ErrorLog"].split("|")]
def run(self):
"""主进程,运行通用脚本代理进程"""
current_date = datetime.now().strftime("%m-%d")
curdate = Config.server_date().strftime("%Y-%m-%d")
begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 检查配置完整性
if not self.check_config_info():
self.accomplish.emit(
{"Time": begin_time, "History": "由于配置不完整,通用代理进程中止"}
)
return None
self.configure()
# 整理用户数据,筛选需代理的用户
if self.mode != "设置通用脚本":
self.data = dict(sorted(self.data.items(), key=lambda x: int(x[0][3:])))
self.sub_list: List[List[str, str, str]] = [
[_["Config"]["Info"]["Name"], "等待", index]
for index, _ in self.data.items()
if (
_["Config"]["Info"]["RemainedDay"] != 0
and _["Config"]["Info"]["Status"]
)
]
self.create_user_list.emit(self.sub_list)
# 自动代理模式
if self.mode == "自动代理":
# 执行情况预处理
for _ in self.sub_list:
if self.data[_[2]]["Config"]["Data"]["LastProxyDate"] != curdate:
self.data[_[2]]["Config"]["Data"]["LastProxyDate"] = curdate
self.data[_[2]]["Config"]["Data"]["ProxyTimes"] = 0
_[
0
] += f" - 第{self.data[_[2]]['Config']['Data']['ProxyTimes'] + 1}次代理"
# 开始代理
for sub in self.sub_list:
sub_data = self.data[sub[2]]["Config"]
if self.isInterruptionRequested:
break
if (
self.set["Run"]["ProxyTimesLimit"] == 0
or sub_data["Data"]["ProxyTimes"]
< self.set["Run"]["ProxyTimesLimit"]
):
sub[1] = "运行"
self.update_user_list.emit(self.sub_list)
else:
sub[1] = "跳过"
self.update_user_list.emit(self.sub_list)
continue
logger.info(f"{self.name} | 开始代理配置: {sub[0]}")
sub_logs_list = []
sub_start_time = datetime.now()
run_book = False
if not (self.data[sub[2]]["Path"] / "ConfigFiles").exists():
logger.error(f"{self.name} | 配置: {sub[0]} - 未找到配置文件")
self.push_info_bar.emit(
"error",
"启动通用代理进程失败",
f"未找到{sub[0]}的配置文件!",
-1,
)
run_book = False
continue
# 尝试次数循环
for i in range(self.set["Run"]["RunTimesLimit"]):
if self.isInterruptionRequested or run_book:
break
logger.info(
f"{self.name} | 用户: {sub[0]} - 尝试次数: {i + 1}/{self.set['Run']['RunTimesLimit']}"
)
# 记录当前时间
start_time = datetime.now()
# 配置脚本
self.set_sub(sub[2])
# 执行任务前脚本
if (
sub_data["Info"]["IfScriptBeforeTask"]
and Path(sub_data["Info"]["ScriptBeforeTask"]).exists()
):
self.execute_script_task(
Path(sub_data["Info"]["ScriptBeforeTask"]), "脚本前任务"
)
# 启动游戏/模拟器
if self.set["Game"]["Enabled"]:
try:
logger.info(
f"{self.name} | 启动游戏/模拟器:{self.game_path},参数:{self.set['Game']['Arguments']}"
)
self.game_process_manager.open_process(
self.game_path,
str(self.set["Game"]["Arguments"]).split(" "),
0,
)
except Exception as e:
logger.error(
f"{self.name} | 启动游戏/模拟器时出现异常:{e}"
)
self.push_info_bar.emit(
"error",
"启动游戏/模拟器时出现异常",
"请检查游戏/模拟器路径设置",
-1,
)
self.script_result = "游戏/模拟器启动失败"
break
# 添加静默进程标记
if self.set["Game"]["Style"] == "Emulator":
Config.silence_list.append(self.game_path)
self.update_log_text.emit(
f"正在等待游戏/模拟器完成启动\n请等待{self.set['Game']['WaitTime']}s"
)
self.sleep(self.set["Game"]["WaitTime"])
# 10s后移除静默进程标记
if self.set["Game"]["Style"] == "Emulator":
QTimer.singleShot(
10000,
partial(Config.silence_list.remove, self.game_path),
)
# 运行脚本任务
self.script_process_manager.open_process(
self.script_exe_path,
str(self.set["Script"]["Arguments"]).split(" "),
)
# 监测运行状态
self.start_monitor(start_time)
if self.script_result == "Success!":
# 标记任务完成
run_book = True
# 中止相关程序
self.script_process_manager.kill()
System.kill_process(self.script_exe_path)
if self.set["Game"]["Enabled"]:
self.game_process_manager.kill()
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
logger.info(
f"{self.name} | 配置: {sub[0]} - 通用脚本进程完成代理任务"
)
self.update_log_text.emit(
"检测到通用脚本进程完成代理任务\n正在等待相关程序结束\n请等待10s"
)
self.sleep(10)
else:
logger.error(
f"{self.name} | 配置: {sub[0]} - 代理任务异常: {self.script_result}"
)
# 打印中止信息
# 此时log变量内存储的就是出现异常的日志信息可以保存或发送用于问题排查
self.update_log_text.emit(
f"{self.script_result}\n正在中止相关程序\n请等待10s"
)
# 中止相关程序
self.script_process_manager.kill()
if self.set["Game"]["Enabled"]:
self.game_process_manager.kill()
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
# 推送异常通知
Notify.push_plyer(
"用户自动代理出现异常!",
f"用户 {sub[0].replace("_", " 今天的")}出现一次异常",
f"{sub[0].replace("_", " ")}出现异常",
1,
)
if i == self.set["Run"]["RunTimesLimit"] - 1:
self.play_sound.emit("子任务失败")
else:
self.play_sound.emit(self.script_result)
self.sleep(10)
# 执行任务后脚本
if (
sub_data["Info"]["IfScriptAfterTask"]
and Path(sub_data["Info"]["ScriptAfterTask"]).exists()
):
self.execute_script_task(
Path(sub_data["Info"]["ScriptAfterTask"]), "脚本后任务"
)
# # 保存运行日志以及统计信息
# Config.save_maa_log(
# Config.app_path
# / f"history/{curdate}/{sub_data['Info']['Name']}/{start_time.strftime("%H-%M-%S")}.log",
# self.check_script_log(start_time, mode_book[mode]),
# self.maa_result,
# )
sub_logs_list.append(
Config.app_path
/ f"history/{curdate}/{sub_data['Info']['Name']}/{start_time.strftime("%H-%M-%S")}.json",
)
# 发送统计信息
# statistics = Config.merge_maa_logs("指定项", sub_logs_list)
statistics = {
"sub_index": sub[2],
"sub_info": sub[0],
"start_time": sub_start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"sub_result": "代理成功" if run_book else self.script_result,
}
self.push_notification(
"统计信息",
f"{current_date} | 配置 {sub[0]} 的自动代理统计报告",
statistics,
sub_data,
)
if run_book:
# 成功完成代理的用户修改相关参数
if (
sub_data["Data"]["ProxyTimes"] == 0
and sub_data["Info"]["RemainedDay"] != -1
):
sub_data["Info"]["RemainedDay"] -= 1
sub_data["Data"]["ProxyTimes"] += 1
sub[1] = "完成"
Notify.push_plyer(
"成功完成一个自动代理任务!",
f"已完成配置 {sub[0].replace("_", " 今天的")}任务",
f"已完成 {sub[0].replace("_", "")}",
3,
)
else:
# 录入代理失败的用户
sub[1] = "异常"
self.update_user_list.emit(self.sub_list)
# 设置通用脚本模式
elif self.mode == "设置通用脚本":
# 配置通用脚本
self.set_sub()
try:
# 创建通用脚本任务
logger.info(f"{self.name} | 无参数启动通用脚本:{self.script_exe_path}")
self.script_process_manager.open_process(self.script_exe_path)
# 记录当前时间
start_time = datetime.now()
# 监测通用脚本运行状态
self.start_monitor(start_time)
self.sub_config_path.mkdir(parents=True, exist_ok=True)
shutil.copytree(
self.script_config_path, self.sub_config_path, dirs_exist_ok=True
)
except Exception as e:
logger.error(f"{self.name} | 启动通用脚本时出现异常:{e}")
self.push_info_bar.emit(
"error",
"启动通用脚本时出现异常",
"请检查相关设置",
-1,
)
result_text = ""
# 导出结果
if self.mode in ["自动代理"]:
# 关闭可能未正常退出的通用脚本进程
if self.isInterruptionRequested:
self.script_process_manager.kill(if_force=True)
System.kill_process(self.script_exe_path)
if self.set["Game"]["Enabled"]:
self.game_process_manager.kill(if_force=True)
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
# 更新用户数据
updated_info = {_[2]: self.data[_[2]] for _ in self.sub_list}
self.update_sub_info.emit(self.config_path.name, updated_info)
error_index = [_[2] for _ in self.sub_list if _[1] == "异常"]
over_index = [_[2] for _ in self.sub_list if _[1] == "完成"]
wait_index = [_[2] for _ in self.sub_list if _[1] == "等待"]
# 保存运行日志
title = (
f"{current_date} | {self.name}{self.mode[:4]}任务报告"
if self.name != ""
else f"{current_date} | {self.mode[:4]}任务报告"
)
result = {
"title": f"{self.mode[:4]}任务报告",
"script_name": (self.name if self.name != "" else "空白"),
"start_time": begin_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"completed_count": len(over_index),
"uncompleted_count": len(error_index) + len(wait_index),
"failed_sub": [
self.data[_]["Config"]["Info"]["Name"] for _ in error_index
],
"waiting_sub": [
self.data[_]["Config"]["Info"]["Name"] for _ in wait_index
],
}
# 生成结果文本
result_text = (
f"任务开始时间:{result['start_time']},结束时间:{result['end_time']}\n"
f"已完成数:{result['completed_count']},未完成数:{result['uncompleted_count']}\n\n"
)
if len(result["failed_sub"]) > 0:
result_text += f"{self.mode[2:4]}未成功的配置:\n{"\n".join(result['failed_sub'])}\n"
if len(result["waiting_sub"]) > 0:
result_text += f"\n未开始{self.mode[2:4]}的配置:\n{"\n".join(result['waiting_sub'])}\n"
# 推送代理结果通知
Notify.push_plyer(
title.replace("报告", "已完成!"),
f"已完成配置数:{len(over_index)},未完成配置数:{len(error_index) + len(wait_index)}",
f"已完成配置数:{len(over_index)},未完成配置数:{len(error_index) + len(wait_index)}",
10,
)
self.push_notification("代理结果", title, result)
self.log_monitor.deleteLater()
self.log_monitor_timer.deleteLater()
self.accomplish.emit({"Time": begin_time, "History": result_text})
def requestInterruption(self) -> None:
logger.info(f"{self.name} | 收到任务中止申请")
if len(self.log_monitor.files()) != 0:
self.interrupt.emit()
self.script_result = "任务被手动中止"
self.isInterruptionRequested = True
self.wait_loop.quit()
def push_question(self, title: str, message: str) -> bool:
self.question.emit(title, message)
self.question_loop.exec()
return self.response
def __capture_response(self, response: bool) -> None:
self.response = response
def sleep(self, time: int) -> None:
"""非阻塞型等待"""
QTimer.singleShot(time * 1000, self.wait_loop.quit)
self.wait_loop.exec()
def refresh_log(self) -> None:
"""刷新脚本日志"""
with self.script_log_path.open(mode="r", encoding="utf-8") as f:
pass
# 一分钟内未执行日志变化检查,强制检查一次
if (datetime.now() - self.last_check_time).total_seconds() > 60:
self.log_monitor.fileChanged.emit("1分钟超时检查")
def strptime(
self, date_string: str, format: str, default_date: datetime
) -> datetime:
"""根据指定格式解析日期字符串"""
# 时间字段映射表
time_fields = {
"%Y": "year",
"%m": "month",
"%d": "day",
"%H": "hour",
"%M": "minute",
"%S": "second",
"%f": "microsecond",
}
date = datetime.strptime(date_string, format)
# 构建参数字典
datetime_kwargs = {}
for format_code, field_name in time_fields.items():
if format_code in format:
datetime_kwargs[field_name] = getattr(date, field_name)
else:
datetime_kwargs[field_name] = getattr(default_date, field_name)
return datetime(**datetime_kwargs)
def check_script_log(self, start_time: datetime) -> list:
"""获取脚本日志并检查以判断脚本程序运行状态"""
self.last_check_time = datetime.now()
# 获取日志
logs = []
if_log_start = False
with self.script_log_path.open(mode="r", encoding="utf-8") as f:
for entry in f:
if not if_log_start:
try:
entry_time = self.strptime(
entry[self.log_time_range[0] : self.log_time_range[1]],
self.set["Script"]["LogTimeFormat"],
self.last_check_time,
)
if entry_time > start_time:
if_log_start = True
logs.append(entry)
except ValueError:
pass
else:
logs.append(entry)
log = "".join(logs)
# 更新日志
if len(logs) > 100:
self.update_log_text.emit("".join(logs[-100:]))
else:
self.update_log_text.emit("".join(logs))
if "自动代理" in self.mode:
# 获取最近一条日志的时间
latest_time = start_time
for _ in logs[::-1]:
try:
latest_time = self.strptime(
_[self.log_time_range[0] : self.log_time_range[1]],
self.set["Script"]["LogTimeFormat"],
self.last_check_time,
)
break
except ValueError:
pass
for success_sign in self.success_log:
if success_sign in log:
self.script_result = "Success!"
break
else:
if self.isInterruptionRequested:
self.script_result = "任务被手动中止"
elif datetime.now() - latest_time > timedelta(
minutes=self.set["Run"]["RunTimeLimit"]
):
self.script_result = "脚本进程超时"
else:
for error_sign in self.error_log:
if error_sign in log:
self.script_result = error_sign
break
else:
if self.script_process_manager.is_running():
self.script_result = "Wait"
elif self.success_log:
self.script_result = "脚本在完成任务前退出"
else:
self.script_result = "Success!"
elif self.mode == "设置通用脚本":
if self.script_process_manager.is_running():
self.script_result = "Wait"
else:
self.script_result = "Success!"
if self.script_result != "Wait":
self.quit_monitor()
return logs
def start_monitor(self, start_time: datetime) -> None:
"""开始监视通用脚本日志"""
logger.info(f"{self.name} | 开始监视通用脚本日志")
self.log_monitor.addPath(str(self.script_log_path))
self.log_monitor.fileChanged.connect(lambda: self.check_script_log(start_time))
self.log_monitor_timer.start(1000)
self.last_check_time = datetime.now()
self.monitor_loop.exec()
def quit_monitor(self) -> None:
"""退出通用脚本日志监视进程"""
if len(self.log_monitor.files()) != 0:
logger.info(f"{self.name} | 退出通用脚本日志监视")
self.log_monitor.removePath(str(self.script_log_path))
self.log_monitor.fileChanged.disconnect()
self.log_monitor_timer.stop()
self.last_check_time = None
self.monitor_loop.quit()
def set_sub(self, index: str = "") -> dict:
"""配置通用脚本运行参数"""
logger.info(f"{self.name} | 配置脚本运行参数: {index}")
# 配置前关闭可能未正常退出的脚本进程
System.kill_process(self.script_exe_path)
# 预导入配置文件
if self.mode == "设置通用脚本":
if self.sub_config_path.exists():
shutil.copytree(
self.sub_config_path, self.script_config_path, dirs_exist_ok=True
)
else:
shutil.copytree(
self.data[index]["Path"] / "ConfigFiles",
self.script_config_path,
dirs_exist_ok=True,
)
def execute_script_task(self, script_path: Path, task_name: str) -> bool:
"""执行脚本任务并等待结束"""
try:
logger.info(f"{self.name} | 开始执行{task_name}: {script_path}")
# 根据文件类型选择执行方式
if script_path.suffix.lower() == ".py":
cmd = [sys.executable, script_path]
elif script_path.suffix.lower() in [".bat", ".cmd", ".exe"]:
cmd = [str(script_path)]
elif script_path.suffix.lower() == "":
logger.warning(f"{self.name} | {task_name}脚本没有指定后缀名,无法执行")
return False
else:
# 使用系统默认程序打开
os.startfile(str(script_path))
return True
# 执行脚本并等待结束
result = subprocess.run(
cmd,
cwd=script_path.parent,
stdin=subprocess.DEVNULL,
creationflags=(
subprocess.CREATE_NO_WINDOW
if Config.get(Config.function_IfSilence)
else 0
),
timeout=600,
capture_output=True,
errors="ignore",
)
if result.returncode == 0:
logger.info(f"{self.name} | {task_name}执行成功")
if result.stdout.strip():
logger.info(f"{self.name} | {task_name}输出: {result.stdout}")
return True
else:
logger.error(
f"{self.name} | {task_name}执行失败,返回码: {result.returncode}"
)
if result.stderr.strip():
logger.error(f"{self.name} | {task_name}错误输出: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error(f"{self.name} | {task_name}执行超时")
return False
except Exception as e:
logger.exception(f"{self.name} | 执行{task_name}时出现异常: {e}")
return False
def push_notification(
self,
mode: str,
title: str,
message: Union[str, dict],
sub_data: Dict[str, Dict[str, Union[str, int, bool]]] = None,
) -> None:
"""通过所有渠道推送通知"""
env = Environment(
loader=FileSystemLoader(str(Config.app_path / "resources/html"))
)
if mode == "代理结果" and (
Config.get(Config.notify_SendTaskResultTime) == "任何时刻"
or (
Config.get(Config.notify_SendTaskResultTime) == "仅失败时"
and message["uncompleted_count"] != 0
)
):
# 生成文本通知内容
message_text = (
f"任务开始时间:{message['start_time']},结束时间:{message['end_time']}\n"
f"已完成数:{message['completed_count']},未完成数:{message['uncompleted_count']}\n\n"
)
if len(message["failed_sub"]) > 0:
message_text += f"{self.mode[2:4]}未成功的配置:\n{"\n".join(message['failed_sub'])}\n"
if len(message["waiting_sub"]) > 0:
message_text += f"\n未开始{self.mode[2:4]}的配置:\n{"\n".join(message['waiting_sub'])}\n"
# 生成HTML通知内容
message["failed_sub"] = "".join(message["failed_sub"])
message["waiting_sub"] = "".join(message["waiting_sub"])
template = env.get_template("general_result.html")
message_html = template.render(message)
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
serverchan_message = message_text.replace("\n", "\n\n")
# 发送全局通知
if Config.get(Config.notify_IfSendMail):
Notify.send_mail(
"网页", title, message_html, Config.get(Config.notify_ToAddress)
)
if Config.get(Config.notify_IfServerChan):
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
)
if Config.get(Config.notify_IfCompanyWebHookBot):
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
elif mode == "统计信息":
message_text = (
f"开始时间: {message['start_time']}\n"
f"结束时间: {message['end_time']}\n"
f"通用脚本执行结果: {message['sub_result']}\n\n"
)
# 生成HTML通知内容
template = env.get_template("general_statistics.html")
message_html = template.render(message)
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
serverchan_message = message_text.replace("\n", "\n\n")
# 发送全局通知
if Config.get(Config.notify_IfSendStatistic):
if Config.get(Config.notify_IfSendMail):
Notify.send_mail(
"网页", title, message_html, Config.get(Config.notify_ToAddress)
)
if Config.get(Config.notify_IfServerChan):
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
)
if Config.get(Config.notify_IfCompanyWebHookBot):
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
# 发送用户单独通知
if sub_data["Notify"]["Enabled"] and sub_data["Notify"]["IfSendStatistic"]:
# 发送邮件通知
if sub_data["Notify"]["IfSendMail"]:
if sub_data["Notify"]["ToAddress"]:
Notify.send_mail(
"网页",
title,
message_html,
sub_data["Notify"]["ToAddress"],
)
else:
logger.error(
f"{self.name} | 用户邮箱地址为空,无法发送用户单独的邮件通知"
)
# 发送ServerChan通知
if sub_data["Notify"]["IfServerChan"]:
if sub_data["Notify"]["ServerChanKey"]:
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
sub_data["Notify"]["ServerChanKey"],
sub_data["Notify"]["ServerChanTag"],
sub_data["Notify"]["ServerChanChannel"],
)
else:
logger.error(
f"{self.name} |用户ServerChan密钥为空无法发送用户单独的ServerChan通知"
)
# 推送CompanyWebHookBot通知
if sub_data["Notify"]["IfCompanyWebHookBot"]:
if sub_data["Notify"]["CompanyWebHookBotUrl"]:
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
sub_data["Notify"]["CompanyWebHookBotUrl"],
)
else:
logger.error(
f"{self.name} |用户CompanyWebHookBot密钥为空无法发送用户单独的CompanyWebHookBot通知"
)
return None

View File

@@ -61,27 +61,67 @@ class _SystemHandler:
# 恢复系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)
def set_SelfStart(self) -> None:
def set_SelfStart(self) -> bool:
"""同步开机自启"""
if Config.get(Config.start_IfSelfStart) and not self.is_startup():
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
winreg.KEY_SET_VALUE,
winreg.KEY_ALL_ACCESS | winreg.KEY_WRITE | winreg.KEY_CREATE_SUB_KEY,
)
winreg.SetValueEx(key, "AUTO_MAA", 0, winreg.REG_SZ, Config.app_path_sys)
winreg.CloseKey(key)
try:
# 创建任务计划
result = subprocess.run(
[
"schtasks",
"/create",
"/tn",
"AUTO_MAA_AutoStart",
"/tr",
Config.app_path_sys,
"/sc",
"onlogon",
"/rl",
"highest", # 以最高权限运行
"/f", # 强制创建(覆盖现有任务)
],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info(f"任务计划程序自启动已创建: {Config.app_path_sys}")
return True
else:
logger.error(f"创建任务计划失败: {result.stderr}")
return False
except Exception as e:
logger.error(f"设置任务计划程序自启动失败: {e}")
return False
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
winreg.KEY_SET_VALUE,
winreg.KEY_ALL_ACCESS | winreg.KEY_WRITE | winreg.KEY_CREATE_SUB_KEY,
)
winreg.DeleteValue(key, "AUTO_MAA")
winreg.CloseKey(key)
try:
result = subprocess.run(
["schtasks", "/delete", "/tn", "AUTO_MAA_AutoStart", "/f"],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
if result.returncode == 0:
logger.info("任务计划程序自启动已删除")
return True
else:
logger.error(f"删除任务计划失败: {result.stderr}")
return False
except Exception as e:
logger.error(f"删除任务计划程序自启动失败: {e}")
return False
def set_power(self, mode) -> None:
@@ -144,19 +184,17 @@ class _SystemHandler:
def is_startup(self) -> bool:
"""判断程序是否已经开机自启"""
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
)
try:
value, _ = winreg.QueryValueEx(key, "AUTO_MAA")
winreg.CloseKey(key)
return True
except FileNotFoundError:
winreg.CloseKey(key)
result = subprocess.run(
["schtasks", "/query", "/tn", "AUTO_MAA_AutoStart"],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
return result.returncode == 0
except Exception as e:
logger.error(f"检查任务计划程序失败: {e}")
return False
def get_window_info(self) -> list:

View File

@@ -33,6 +33,8 @@ v4.3
import os
import re
import win32com.client
from pathlib import Path
from datetime import datetime
from functools import partial
from typing import Optional, Union, List, Dict
@@ -48,6 +50,7 @@ from PySide6.QtWidgets import (
QHBoxLayout,
QVBoxLayout,
QSizePolicy,
QFileDialog,
)
from qfluentwidgets import (
LineEdit,
@@ -566,6 +569,73 @@ class PasswordLineEditSettingCard(SettingCard):
self.LineEdit.textChanged.connect(self.__textChanged)
class PathSettingCard(PushSettingCard):
pathChanged = Signal(Path, Path)
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
mode: str,
text: str,
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(text, icon, title, "未设置", parent)
self.title = title
self.mode = mode
self.qconfig = qconfig
self.configItem = configItem
self.setContent(self.qconfig.get(self.configItem))
self.clicked.connect(self.ChoosePath)
self.configItem.valueChanged.connect(
lambda: self.setContent(self.qconfig.get(self.configItem))
)
def ChoosePath(self):
"""选择文件或文件夹路径"""
old_path = Path(self.qconfig.get(self.configItem))
if self.mode == "文件夹":
folder = QFileDialog.getExistingDirectory(
self, "选择文件夹", self.qconfig.get(self.configItem)
)
if folder:
self.qconfig.set(self.configItem, folder)
self.pathChanged.emit(old_path, Path(folder))
else:
file_path, _ = QFileDialog.getOpenFileName(
self, "打开文件", self.qconfig.get(self.configItem), self.mode
)
if file_path:
file_path = self.analysis_lnk(file_path)
self.qconfig.set(self.configItem, str(file_path))
self.pathChanged.emit(old_path, file_path)
def analysis_lnk(self, path: str) -> Path:
"""快捷方式解析"""
lnk_path = Path(path)
if lnk_path.suffix == ".lnk":
try:
shell = win32com.client.Dispatch("WScript.Shell")
shortcut = shell.CreateShortcut(str(lnk_path))
return Path(shortcut.TargetPath)
except Exception as e:
return lnk_path
else:
return lnk_path
class PushAndSwitchButtonSettingCard(SettingCard):
"""Setting card with push & switch button"""
@@ -1163,6 +1233,48 @@ class TimeEditSettingCard(SettingCard):
self.TimeEdit.setTime(QTime.fromString(value, "HH:mm"))
class SubLableSettingCard(SettingCard):
"""Setting card with Sub's Lable"""
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
qconfig: QConfig,
configItems: Dict[str, ConfigItem],
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItems = configItems
self.Lable = SubtitleLabel(self)
if configItems:
for configItem in configItems.values():
configItem.valueChanged.connect(self.setValue)
self.setValue()
self.hBoxLayout.addWidget(self.Lable, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
def setValue(self):
text_list = []
if self.configItems:
text_list.append(
f"今日已代理{self.qconfig.get(self.configItems["ProxyTimes"])}"
if Config.server_date().strftime("%Y-%m-%d")
== self.qconfig.get(self.configItems["LastProxyDate"])
else "今日未进行代理"
)
self.Lable.setText(" | ".join(text_list))
class UserLableSettingCard(SettingCard):
"""Setting card with User's Lable"""
@@ -1341,13 +1453,18 @@ class UserNoticeSettingCard(PushAndSwitchButtonSettingCard):
if not (
self.qconfig.get(self.configItems["IfSendStatistic"])
or self.qconfig.get(self.configItems["IfSendSixStar"])
or (
"IfSendSixStar" in self.configItems
and self.qconfig.get(self.configItems["IfSendSixStar"])
)
):
text_list.append("未启用任何通知项")
if self.qconfig.get(self.configItems["IfSendStatistic"]):
text_list.append("统计信息已启用")
if self.qconfig.get(self.configItems["IfSendSixStar"]):
if "IfSendSixStar" in self.configItems and self.qconfig.get(
self.configItems["IfSendSixStar"]
):
text_list.append("六星喜报已启用")
if self.qconfig.get(self.configItems["IfSendMail"]):

View File

@@ -210,8 +210,8 @@ class DispatchCenter(QWidget):
self.script_list["主调度台"].top_bar.object.addItem(
(
f"实例 - {info['Type']}"
if info["Config"].get(info["Config"].MaaSet_Name) == ""
else f"实例 - {info['Type']} - {info["Config"].get(info["Config"].MaaSet_Name)}"
if info["Config"].get_name() == ""
else f"实例 - {info['Type']} - {info['Config'].get_name()}"
),
userData=name,
)
@@ -284,8 +284,8 @@ class DispatchCenter(QWidget):
continue
text_list.append(
f"实例 - {info['Type']}"
if info["Config"].get(info["Config"].MaaSet_Name) == ""
else f"实例 - {info['Type']} - {info["Config"].get(info["Config"].MaaSet_Name)}"
if info["Config"].get_name() == ""
else f"实例 - {info['Type']} - {info['Config'].get_name()}"
)
data_list.append(name)
@@ -317,14 +317,12 @@ class DispatchCenter(QWidget):
elif "脚本" in choice.input[0].currentData():
if Config.member_dict[choice.input[0].currentData()]["Type"] == "Maa":
logger.info(f"用户添加任务:{choice.input[0].currentData()}")
TaskManager.add_task(
"自动代理_新调度台",
f"自定义队列 - {choice.input[0].currentData()}",
{"Queue": {"Member_1": choice.input[0].currentData()}},
)
logger.info(f"用户添加任务:{choice.input[0].currentData()}")
TaskManager.add_task(
"自动代理_新调度台",
f"自定义队列 - {choice.input[0].currentData()}",
{"Queue": {"Member_1": choice.input[0].currentData()}},
)
class DispatchBox(QWidget):
@@ -409,6 +407,18 @@ class DispatchCenter(QWidget):
)
return None
if (
"脚本" in self.object.currentData()
and Config.member_dict[self.object.currentData()]["Type"]
== "General"
and self.mode.currentData() == "人工排查"
):
logger.warning("通用脚本类型不存在人工排查功能")
MainInfoBar.push_info_bar(
"warning", "不支持的任务", "通用脚本无人工排查功能", 5000
)
return None
if "调度队列" in self.object.currentData():
logger.info(f"用户添加任务:{self.object.currentData()}")
@@ -420,14 +430,12 @@ class DispatchCenter(QWidget):
elif "脚本" in self.object.currentData():
if Config.member_dict[self.object.currentData()]["Type"] == "Maa":
logger.info(f"用户添加任务:{self.object.currentData()}")
TaskManager.add_task(
f"{self.mode.currentText()}_主调度台",
"自定义队列",
{"Queue": {"Member_1": self.object.currentData()}},
)
logger.info(f"用户添加任务:{self.object.currentData()}")
TaskManager.add_task(
f"{self.mode.currentText()}_主调度台",
"自定义队列",
{"Queue": {"Member_1": self.object.currentData()}},
)
class DispatchInfoCard(HeaderCardWidget):

View File

@@ -184,7 +184,7 @@ class AUTO_MAA(MSFluentWindow):
self.set_min_method()
Config.user_info_changed.connect(self.member_manager.refresh_dashboard)
Config.sub_info_changed.connect(self.member_manager.refresh_dashboard)
Config.power_sign_changed.connect(self.dispatch_center.update_power_sign)
TaskManager.create_gui.connect(self.dispatch_center.add_board)
TaskManager.connect_gui.connect(self.dispatch_center.connect_main_board)

File diff suppressed because it is too large Load Diff

View File

@@ -111,7 +111,7 @@ class QueueManager(QWidget):
"Config": queue_config,
}
self.queue_manager.add_QueueSettingBox(index)
self.queue_manager.add_SettingBox(index)
self.queue_manager.switch_SettingBox(index)
logger.success(f"调度队列_{index} 添加成功")
@@ -256,8 +256,8 @@ class QueueManager(QWidget):
+ [
(
k
if v["Config"].get(v["Config"].MaaSet_Name) == ""
else f"{k} - {v["Config"].get(v["Config"].MaaSet_Name)}"
if v["Config"].get_name() == ""
else f"{k} - {v["Config"].get_name()}"
)
for k, v in Config.member_dict.items()
],
@@ -332,7 +332,7 @@ class QueueManager(QWidget):
Config.search_queue()
for name in Config.queue_dict.keys():
self.add_QueueSettingBox(int(name[5:]))
self.add_SettingBox(int(name[5:]))
self.switch_SettingBox(index)
@@ -358,12 +358,12 @@ class QueueManager(QWidget):
self.script_list.clear()
self.pivot.clear()
def add_QueueSettingBox(self, uid: int) -> None:
def add_SettingBox(self, uid: int) -> None:
"""添加一个调度队列设置界面"""
maa_setting_box = self.QueueMemberSettingBox(uid, self)
setting_box = self.QueueMemberSettingBox(uid, self)
self.script_list.append(maa_setting_box)
self.script_list.append(setting_box)
self.stackedWidget.addWidget(self.script_list[-1])
@@ -586,8 +586,8 @@ class QueueManager(QWidget):
+ [
(
k
if v["Config"].get(v["Config"].MaaSet_Name) == ""
else f"{k} - {v["Config"].get(v["Config"].MaaSet_Name)}"
if v["Config"].get_name() == ""
else f"{k} - {v["Config"].get_name()}"
)
for k, v in Config.member_dict.items()
],

View File

@@ -1,9 +1,37 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2025 ClozyA
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA图像组件
v4.3
作者ClozyA
"""
import base64
import hashlib
from pathlib import Path
from PIL import Image
class ImageUtils:
@staticmethod
def get_base64_from_file(image_path):

167
app/utils/ProcessManager.py Normal file
View File

@@ -0,0 +1,167 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA进程管理组件
v4.3
作者DLmaster_361
"""
import psutil
import subprocess
from pathlib import Path
from datetime import datetime
from PySide6.QtCore import QTimer, QObject, Signal
class ProcessManager(QObject):
"""进程监视器类,用于跟踪主进程及其所有子进程的状态"""
processClosed = Signal()
def __init__(self):
super().__init__()
self.main_pid = None
self.tracked_pids = set()
self.check_timer = QTimer()
self.check_timer.timeout.connect(self.check_processes)
def open_process(
self,
path: Path,
args: list = [],
tracking_time: int = 60,
) -> int:
"""
启动一个新进程并返回其pid并开始监视该进程
:param path: 可执行文件的路径
:param args: 启动参数列表
:param tracking_time: 子进程追踪持续时间(秒)
:return: 新进程的PID
"""
process = subprocess.Popen(
[path, *args],
creationflags=subprocess.CREATE_NO_WINDOW,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
self.start_monitoring(process.pid, tracking_time)
def start_monitoring(self, pid: int, tracking_time: int = 60) -> None:
"""
启动进程监视器,跟踪指定的主进程及其子进程
:param pid: 被监视进程的PID
:param tracking_time: 子进程追踪持续时间(秒)
"""
self.clear()
self.main_pid = pid
self.tracking_time = tracking_time
# 扫描并记录所有相关进程
try:
# 获取主进程及其子进程
main_proc = psutil.Process(self.main_pid)
self.tracked_pids.add(self.main_pid)
# 递归获取所有子进程
for child in main_proc.children(recursive=True):
self.tracked_pids.add(child.pid)
except psutil.NoSuchProcess:
pass
# 启动持续追踪机制
self.start_time = datetime.now()
self.check_timer.start(100)
def check_processes(self) -> None:
"""检查跟踪的进程是否仍在运行,并更新子进程列表"""
# 仅在时限内持续更新跟踪的进程列表,发现新的子进程
if (datetime.now() - self.start_time).total_seconds() < self.tracking_time:
current_pids = set(self.tracked_pids)
for pid in current_pids:
try:
proc = psutil.Process(pid)
for child in proc.children():
if child.pid not in self.tracked_pids:
# 新发现的子进程
self.tracked_pids.add(child.pid)
except psutil.NoSuchProcess:
continue
if not self.is_running():
self.clear()
self.processClosed.emit()
def is_running(self) -> bool:
"""检查所有跟踪的进程是否还在运行"""
for pid in self.tracked_pids:
try:
proc = psutil.Process(pid)
if proc.is_running():
return True
except psutil.NoSuchProcess:
continue
return False
def kill(self, if_force: bool = False) -> None:
"""停止监视器并中止所有跟踪的进程"""
self.check_timer.stop()
for pid in self.tracked_pids:
try:
proc = psutil.Process(pid)
if if_force:
kill_process = subprocess.Popen(
["taskkill", "/F", "/T", "/PID", str(pid)],
creationflags=subprocess.CREATE_NO_WINDOW,
)
kill_process.wait()
proc.terminate()
except psutil.NoSuchProcess:
continue
if self.main_pid:
self.processClosed.emit()
self.clear()
def clear(self) -> None:
"""清空跟踪的进程列表"""
self.main_pid = None
self.check_timer.stop()
self.tracked_pids.clear()

View File

@@ -29,4 +29,7 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
__all__ = []
from .ImageUtils import ImageUtils
from .ProcessManager import ProcessManager
__all__ = ["ImageUtils", "ProcessManager"]

View File

@@ -70,7 +70,7 @@ if __name__ == "__main__":
print("Packaging AUTO_MAA main program ...")
os.system(
"powershell -Command python -m nuitka --standalone --onefile --mingw64"
"powershell -Command python -m nuitka --standalone --onefile --mingw64 --windows-uac-admin"
" --enable-plugins=pyside6 --windows-console-mode=attach"
" --onefile-tempdir-spec='{TEMP}\\AUTO_MAA'"
" --windows-icon-from-ico=resources\\icons\\AUTO_MAA.ico"

20
main.py
View File

@@ -45,9 +45,19 @@ builtins.print = no_print
from loguru import logger
import os
import sys
import ctypes
from PySide6.QtWidgets import QApplication
from qfluentwidgets import FluentTranslator
import sys
def is_admin() -> bool:
"""检查当前程序是否以管理员身份运行"""
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
@logger.catch
@@ -68,4 +78,10 @@ def main():
if __name__ == "__main__":
main()
if is_admin():
main()
else:
ctypes.windll.shell32.ShellExecuteW(
None, "runas", sys.executable, os.path.realpath(sys.argv[0]), None, 1
)
sys.exit(0)

View File

@@ -1,14 +1,14 @@
loguru
plyer
PySide6
loguru==0.7.3
plyer==2.1.0
PySide6==6.9.1
PySide6-Fluent-Widgets[full]
psutil
pywin32
keyboard
pycryptodome
psutil==7.0.0
pywin32==310
keyboard==0.13.5
pycryptodome==3.23.0
certifi==2025.4.26
requests
markdown
Jinja2
nuitka
pillow
requests==2.32.4
markdown==3.8.2
Jinja2==3.1.6
nuitka==2.7.11
pillow==11.3.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,46 +1,15 @@
{
"main_version": "4.3.12.0",
"main_version": "4.4.0.1",
"version_info": {
"4.3.12.0": {
"修复BUG": [
"固定certifi版本号"
]
},
"4.3.11.0": {
"修复BUG": [
"修复删除计划表引发的错误"
]
},
"4.3.10.0": {
"4.4.0.1": {
"新增功能": [
"更换全新默认主页图",
"适配 MAA 无`Default`配置情况 #52"
],
"程序优化": [
"静默模式控制时段延长至模拟器完成启动的10s后"
]
},
"4.3.10.3": {
"程序优化": [
"使用 keyboard 模块替代 pyautogui 模块"
]
},
"4.3.10.2": {
"新增功能": [
"公招喜报模板优化",
"支持使用命令行调用"
"初步完成通用调度模块"
],
"修复BUG": [
"修复更新动作重复执行问题"
"修复了程序BUG较少的BUG"
],
"程序优化": [
"Mirror 酱链接添加`source`字段,用于标识来源",
"优化下载器测速中止条件"
]
},
"4.3.10.1": {
"新增功能": [
"森空岛签到功能上线"
"子线程卡死不再阻塞调度任务"
]
}
}