diff --git a/app/api/dispatch.py b/app/api/dispatch.py index c0d7346..50da6dc 100644 --- a/app/api/dispatch.py +++ b/app/api/dispatch.py @@ -24,7 +24,7 @@ import uuid import asyncio from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Body, Path -from app.core import Config, TaskManager +from app.core import TaskManager, Broadcast from app.models.schema import * router = APIRouter(prefix="/api/dispatch", tags=["任务调度"]) @@ -69,7 +69,7 @@ async def websocket_endpoint( while True: try: data = await asyncio.wait_for(websocket.receive_json(), timeout=30.0) - await Config.message_queue.put({"task_id": uid, "message": data}) + await Broadcast.put(data) except asyncio.TimeoutError: await websocket.send_json( TaskMessage(type="Signal", data={"Ping": "无描述"}).model_dump() diff --git a/app/core/__init__.py b/app/core/__init__.py index 7d03561..44fa974 100644 --- a/app/core/__init__.py +++ b/app/core/__init__.py @@ -23,11 +23,13 @@ __version__ = "5.0.0" __author__ = "DLmaster361 " __license__ = "GPL-3.0 license" +from .broadcast import Broadcast from .config import Config, MaaConfig, GeneralConfig, MaaUserConfig from .timer import MainTimer from .task_manager import TaskManager __all__ = [ + "Broadcast", "Config", "MaaConfig", "GeneralConfig", diff --git a/app/core/broadcast.py b/app/core/broadcast.py new file mode 100644 index 0000000..d145deb --- /dev/null +++ b/app/core/broadcast.py @@ -0,0 +1,51 @@ +# AUTO_MAA:A MAA Multi Account Management and Automation Tool +# Copyright © 2024-2025 DLmaster361 + +# This file is part of AUTO_MAA. + +# AUTO_MAA is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. + +# AUTO_MAA is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See +# the GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with AUTO_MAA. If not, see . + +# Contact: DLmaster_361@163.com + + +import asyncio +from copy import deepcopy +from typing import Set + +from app.utils import get_logger + + +logger = get_logger("消息广播") + + +class _Broadcast: + + def __init__(self): + self.__subscribers: Set[asyncio.Queue] = set() + + async def subscribe(self, queue: asyncio.Queue): + """订阅者注册""" + self.__subscribers.add(queue) + + async def unsubscribe(self, queue: asyncio.Queue): + """取消订阅""" + self.__subscribers.remove(queue) + + async def put(self, item): + """向所有订阅者广播消息""" + for subscriber in self.__subscribers: + await subscriber.put(deepcopy(item)) + + +Broadcast = _Broadcast() diff --git a/app/core/config.py b/app/core/config.py index 2562536..28daccb 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -593,10 +593,9 @@ class AppConfig(GlobalConfig): self.log_path.parent.mkdir(parents=True, exist_ok=True) self.config_path.mkdir(parents=True, exist_ok=True) - self.message_queue = asyncio.Queue() self.silence_dict: Dict[Path, datetime] = {} self.power_sign = "NoAction" - self.if_ignore_silence = False + self.if_ignore_silence: List[uuid.UUID] = [] self.ScriptConfig = MultipleConfig([MaaConfig, GeneralConfig]) self.PlanConfig = MultipleConfig([MaaPlanConfig]) diff --git a/app/core/task_manager.py b/app/core/task_manager.py index 524ec9a..7a5f723 100644 --- a/app/core/task_manager.py +++ b/app/core/task_manager.py @@ -78,7 +78,9 @@ class _TaskManager: else: raise ValueError(f"The task corresponding to UID {uid} could not be found.") - if task_id in self.task_dict: + if task_id in self.task_dict or ( + actual_id is not None and actual_id in self.task_dict + ): raise RuntimeError(f"The task {task_id} is already running.") diff --git a/app/core/timer.py b/app/core/timer.py index a77d3c5..c8e1b68 100644 --- a/app/core/timer.py +++ b/app/core/timer.py @@ -50,8 +50,8 @@ class _MainTimer: if ( not Config.if_ignore_silence - and await Config.get("Function", "IfSilence") - and await Config.get("Function", "BossKey") != "" + and Config.get("Function", "IfSilence") + and Config.get("Function", "BossKey") != "" ): pass diff --git a/app/task/MAA.py b/app/task/MAA.py index 390246f..94f1b6b 100644 --- a/app/task/MAA.py +++ b/app/task/MAA.py @@ -32,7 +32,7 @@ from pathlib import Path from jinja2 import Environment, FileSystemLoader from typing import Union, List, Dict, Optional -from app.core import Config, MaaConfig, MaaUserConfig +from app.core import Broadcast, Config, MaaConfig, MaaUserConfig from app.models.schema import TaskMessage from app.models.ConfigBase import MultipleConfig from app.services import Notify, System @@ -40,7 +40,7 @@ from app.utils import get_logger, LogMonitor, ProcessManager from .skland import skland_sign_in -logger = get_logger("MAA调度器") +logger = get_logger("MAA 调度器") METHOD_BOOK = {"NoAction": "8", "ExitGame": "9", "ExitEmulator": "12"} @@ -67,6 +67,7 @@ class MaaManager: self.emulator_process_manager = ProcessManager() self.maa_process_manager = ProcessManager() self.wait_event = asyncio.Event() + self.message_queue = asyncio.Queue() self.maa_logs = [] self.maa_result = "Wait" @@ -75,6 +76,8 @@ class MaaManager: async def configure(self): """提取配置信息""" + await Broadcast.subscribe(self.message_queue) + await Config.ScriptConfig[self.script_id].lock() self.script_config = Config.ScriptConfig[self.script_id] @@ -114,7 +117,7 @@ class MaaManager: """主进程,运行MAA代理进程""" self.current_date = datetime.now().strftime("%m-%d") - curdate = Config.server_date().strftime("%Y-%m-%d") + self.curdate = Config.server_date().strftime("%Y-%m-%d") self.begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") await self.configure() @@ -167,23 +170,25 @@ class MaaManager: self.user_config[uuid.UUID(_["user_id"])].get( "Data", "LastProxyDate" ) - != curdate + != self.curdate ): await self.user_config[uuid.UUID(_["user_id"])].set( - "Data", "LastProxyDate", curdate + "Data", "LastProxyDate", self.curdate ) await self.user_config[uuid.UUID(_["user_id"])].set( "Data", "ProxyTimes", 0 ) # 开始代理 - for index, user in enumerate(self.user_list): + for self.index, user in enumerate(self.user_list): - user_data = self.user_config[uuid.UUID(user["user_id"])] + self.cur_user_data = self.user_config[uuid.UUID(user["user_id"])] if self.script_config.get( "Run", "ProxyTimesLimit" - ) == 0 or user_data.get("Data", "ProxyTimes") < self.script_config.get( + ) == 0 or self.cur_user_data.get( + "Data", "ProxyTimes" + ) < self.script_config.get( "Run", "ProxyTimesLimit" ): user["status"] = "运行" @@ -204,26 +209,26 @@ class MaaManager: logger.info(f"开始代理用户: {user['user_id']}") # 详细模式用户首次代理需打开模拟器 - if user_data.get("Info", "Mode") == "详细": + if self.cur_user_data.get("Info", "Mode") == "详细": self.if_open_emulator = True # 初始化代理情况记录和模式替换表 - run_book = { + self.run_book = { "Annihilation": bool( - user_data.get("Info", "Annihilation") == "Close" + self.cur_user_data.get("Info", "Annihilation") == "Close" ), - "Routine": user_data.get("Info", "Mode") == "复杂" - and not user_data.get("Info", "Routine"), + "Routine": self.cur_user_data.get("Info", "Mode") == "复杂" + and not self.cur_user_data.get("Info", "Routine"), } - user_logs_list = [] - user_start_time = datetime.now() + self.user_logs_list = [] + self.user_start_time = datetime.now() - if user_data.get("Info", "IfSkland") and user_data.get( - "Info", "SklandToken" - ): + if self.cur_user_data.get( + "Info", "IfSkland" + ) and self.cur_user_data.get("Info", "SklandToken"): - if user_data.get( + if self.cur_user_data.get( "Data", "LastSklandDate" ) != datetime.now().strftime("%Y-%m-%d"): @@ -235,7 +240,7 @@ class MaaManager: ) skland_result = await skland_sign_in( - user_data.get("Info", "SklandToken") + self.cur_user_data.get("Info", "SklandToken") ) for type, user_list in skland_result.items(): @@ -270,13 +275,13 @@ class MaaManager: skland_result["总计"] > 0 and len(skland_result["失败"]) == 0 ): - await user_data.set( + await self.cur_user_data.set( "Data", "LastSklandDate", datetime.now().strftime("%Y-%m-%d"), ) - elif user_data.get("Info", "IfSkland"): + elif self.cur_user_data.get("Info", "IfSkland"): logger.warning( f"用户: {user['user_id']} - 未配置森空岛签到Token,跳过森空岛签到" ) @@ -292,7 +297,7 @@ class MaaManager: # 剿灭-日常模式循环 for mode in ["Annihilation", "Routine"]: - if run_book[mode]: + if self.run_book[mode]: continue # 剿灭模式;满足条件跳过剿灭 @@ -300,20 +305,21 @@ class MaaManager: mode == "Annihilation" and self.script_config.get("Run", "AnnihilationWeeklyLimit") and datetime.strptime( - user_data.get("Data", "LastAnnihilationDate"), "%Y-%m-%d" + self.cur_user_data.get("Data", "LastAnnihilationDate"), + "%Y-%m-%d", ).isocalendar()[:2] - == datetime.strptime(curdate, "%Y-%m-%d").isocalendar()[:2] + == datetime.strptime(self.curdate, "%Y-%m-%d").isocalendar()[:2] ): logger.info( f"用户: {user['user_id']} - 本周剿灭模式已达上限,跳过执行剿灭任务" ) - run_book[mode] = True + self.run_book[mode] = True continue else: self.weekly_annihilation_limit_reached = False if ( - user_data.get("Info", "Mode") == "详细" + self.cur_user_data.get("Info", "Mode") == "详细" and not ( Path.cwd() / f"data/{self.script_id}/{user['user_id']}/ConfigFile/gui.json" @@ -328,7 +334,7 @@ class MaaManager: data={"Error": f"未找到 {user['name']} 的详细配置文件"}, ).model_dump() ) - run_book[mode] = False + self.run_book[mode] = False break # 更新当前模式到界面 @@ -348,16 +354,20 @@ class MaaManager: if mode == "Routine": self.task_dict = { - "WakeUp": str(user_data.get("Task", "IfWakeUp")), - "Recruiting": str(user_data.get("Task", "IfRecruiting")), - "Base": str(user_data.get("Task", "IfBase")), - "Combat": str(user_data.get("Task", "IfCombat")), - "Mission": str(user_data.get("Task", "IfMission")), - "Mall": str(user_data.get("Task", "IfMall")), - "AutoRoguelike": str( - user_data.get("Task", "IfAutoRoguelike") + "WakeUp": str(self.cur_user_data.get("Task", "IfWakeUp")), + "Recruiting": str( + self.cur_user_data.get("Task", "IfRecruiting") + ), + "Base": str(self.cur_user_data.get("Task", "IfBase")), + "Combat": str(self.cur_user_data.get("Task", "IfCombat")), + "Mission": str(self.cur_user_data.get("Task", "IfMission")), + "Mall": str(self.cur_user_data.get("Task", "IfMall")), + "AutoRoguelike": str( + self.cur_user_data.get("Task", "IfAutoRoguelike") + ), + "Reclamation": str( + self.cur_user_data.get("Task", "IfReclamation") ), - "Reclamation": str(user_data.get("Task", "IfReclamation")), } elif mode == "Annihilation": @@ -380,7 +390,7 @@ class MaaManager: # 尝试次数循环 for i in range(self.script_config.get("Run", "RunTimesLimit")): - if run_book[mode]: + if self.run_book[mode]: break logger.info( @@ -388,8 +398,7 @@ class MaaManager: ) # 配置MAA - if isinstance(user_data, MaaUserConfig): - set = await self.set_maa(mode, user_data, index) + set = await self.set_maa(mode) # 记录当前时间 self.log_start_time = datetime.now() @@ -528,11 +537,13 @@ class MaaManager: self.wait_event.clear() await self.wait_event.wait() + await self.maa_log_monitor.stop() + # 处理MAA结果 if self.maa_result == "Success!": # 标记任务完成 - run_book[mode] = True + self.run_book[mode] = True logger.info( f"用户: {user['user_id']} - MAA进程完成代理任务" @@ -581,7 +592,6 @@ class MaaManager: 3, ) - await self.maa_log_monitor.stop() await asyncio.sleep(10) # 任务结束后释放ADB @@ -615,7 +625,7 @@ class MaaManager: data = json.load(f) # 记录自定义基建索引 - await user_data.set( + await self.cur_user_data.set( "Data", "CustomInfrastPlanIndex", data["Configurations"]["Default"][ @@ -640,17 +650,19 @@ class MaaManager: mode == "Annihilation" and self.weekly_annihilation_limit_reached ): - await user_data.set("Data", "LastAnnihilationDate", curdate) + await self.cur_user_data.set( + "Data", "LastAnnihilationDate", self.curdate + ) # 保存运行日志以及统计信息 if_six_star = await Config.save_maa_log( Path.cwd() - / f"history/{curdate}/{user['name']}/{self.log_start_time.strftime('%H-%M-%S')}.log", + / f"history/{self.curdate}/{user['name']}/{self.log_start_time.strftime('%H-%M-%S')}.log", self.maa_logs, self.maa_result, ) - user_logs_list.append( + self.user_logs_list.append( Path.cwd() - / f"history/{curdate}/{user['name']}/{self.log_start_time.strftime('%H-%M-%S')}.json" + / f"history/{self.curdate}/{user['name']}/{self.log_start_time.strftime('%H-%M-%S')}.json" ) if if_six_star: await self.push_notification( @@ -659,11 +671,6 @@ class MaaManager: { "user_name": user["name"], }, - ( - user_data - if isinstance(user_data, MaaUserConfig) - else None - ), ) # 执行MAA解压更新动作 @@ -691,176 +698,144 @@ class MaaManager: logger.info(f"更新动作结束") - # 发送统计信息 - statistics = Config.merge_statistic_info(user_logs_list) - statistics["user_info"] = user["name"] - statistics["start_time"] = user_start_time.strftime("%Y-%m-%d %H:%M:%S") - statistics["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - statistics["maa_result"] = ( - "代理任务全部完成" - if (run_book["Annihilation"] and run_book["Routine"]) - else "代理任务未全部完成" - ) - await self.push_notification( - "统计信息", - f"{self.current_date} | 用户 {user['name']} 的自动代理统计报告", - statistics, - user_data if isinstance(user_data, MaaUserConfig) else None, - ) + await self.result_record() - if run_book["Annihilation"] and run_book["Routine"]: - # 成功完成代理的用户修改相关参数 - if ( - user_data.get("Data", "ProxyTimes") == 0 - and user_data.get("Info", "RemainedDay") != -1 - ): - await user_data.set( - "Info", - "RemainedDay", - user_data.get("Info", "RemainedDay") - 1, - ) - await user_data.set( - "Data", "ProxyTimes", user_data.get("Data", "ProxyTimes") + 1 - ) - user["status"] = "完成" - logger.success(f"用户 {user['name']} 的自动代理任务已完成") - Notify.push_plyer( - "成功完成一个自动代理任务!", - f"已完成用户 {user['name']} 的自动代理任务", - f"已完成 {user['name']} 的自动代理任务", - 3, - ) - else: - # 录入代理失败的用户 - logger.error(f"用户 {user['name']} 的自动代理任务未完成") - user["status"] = "异常" + # 人工排查模式 + elif self.mode == "人工排查": + # 人工排查时,屏蔽静默操作 + logger.info("人工排查任务开始,屏蔽静默操作") + Config.if_ignore_silence.append(self.script_id) + + # 标记是否需要启动模拟器 + self.if_open_emulator = True + + # 开始排查 + for self.index, user in enumerate(self.user_list): + + self.cur_user_data = self.user_config[uuid.UUID(user["user_id"])] + + logger.info(f"开始排查用户: {user['user_id']}") + + user["status"] = "运行" await self.websocket.send_json( TaskMessage( type="Update", data={"user_list": self.user_list} ).model_dump() ) - # # 人工排查模式 - # elif self.mode == "人工排查": + if self.cur_user_data.get("Info", "Mode") == "详细": + self.if_open_emulator = True - # # 人工排查时,屏蔽静默操作 - # logger.info( - # "人工排查任务开始,屏蔽静默操作", - # ) - # Config.if_ignore_silence = True + self.run_book = {"SignIn": False, "PassCheck": False} - # # 标记是否需要启动模拟器 - # self.if_open_emulator = True - # # 标识排查模式 - # for _ in self.user_list: - # _[0] += "_排查模式" + # 启动重试循环 + while True: - # # 开始排查 - # for user in self.user_list: + # 配置MAA + await self.set_maa("人工排查") - # user_data = self.data[user[2]]["Config"] + # 记录当前时间 + self.log_start_time = datetime.now() + # 创建MAA任务 + logger.info(f"启动MAA进程:{self.maa_exe_path}") + await self.maa_process_manager.open_process( + self.maa_exe_path, [], 0 + ) - # if self.isInterruptionRequested: - # break + # 监测MAA运行状态 + self.log_check_mode = "人工排查" + await self.maa_log_monitor.start( + self.maa_log_path, self.log_start_time + ) - # logger.info(f"开始排查用户: {user[0]}", ) + self.wait_event.clear() + await self.wait_event.wait() - # user[1] = "运行" - # self.update_user_list.emit(self.user_list) + await self.maa_log_monitor.stop() - # if user_data["Info"]["Mode"] == "详细": - # self.if_open_emulator = True + logger.info( + f"用户: {user['user_id']} - MAA进程登录PRTS结果: {self.maa_result}" + ) - # run_book = [False for _ in range(2)] + if self.maa_result == "Success!": + logger.info( + f"用户: {user['user_id']} - MAA进程成功登录PRTS", + ) + self.run_book["SignIn"] = True + await self.websocket.send_json( + TaskMessage( + type="Update", + data={"log": "检测到MAA进程成功登录PRTS"}, + ).model_dump() + ) + else: + logger.error( + f"用户: {user['user_id']} - MAA未能正确登录到PRTS: {self.maa_result}" + ) + await self.websocket.send_json( + TaskMessage( + type="Update", + data={ + "log": f"{self.maa_result}\n正在中止相关程序\n请等待10s" + }, + ).model_dump() + ) + # 无命令行中止MAA与其子程序 + logger.info(f"中止MAA进程:{self.maa_exe_path}") + await self.maa_process_manager.kill(if_force=True) + await System.kill_process(self.maa_exe_path) + self.if_open_emulator = True + await asyncio.sleep(10) - # # 启动重试循环 - # while not self.isInterruptionRequested: + # 登录成功,结束循环 + if self.run_book["SignIn"]: + break + # 登录失败,询问是否结束循环 + else: - # # 配置MAA - # self.set_maa("人工排查", user[2]) + uid = str(uuid.uuid4()) + await self.websocket.send_json( + TaskMessage( + type="Message", + data={ + "message_id": uid, + "type": "Question", + "title": "操作提示", + "message": "MAA未能正确登录到PRTS,是否重试?", + }, + ).model_dump() + ) + result = await self.get_message(uid) + if result.get("choice", False): + break - # # 记录当前时间 - # self.log_start_time = datetime.now() - # # 创建MAA任务 - # logger.info( - # f"启动MAA进程:{self.maa_exe_path}", - # , - # ) - # self.maa_process_manager.open_process(self.maa_exe_path, [], 0) + # 登录成功,录入人工排查情况 + if self.run_book["SignIn"]: - # # 监测MAA运行状态 - # self.log_check_mode = "人工排查" - # self.start_monitor() + uid = str(uuid.uuid4()) + await self.websocket.send_json( + TaskMessage( + type="Message", + data={ + "message_id": uid, + "type": "Question", + "title": "操作提示", + "message": "请检查用户代理情况,该用户是否正确完成代理任务?", + }, + ).model_dump() + ) + result = await self.get_message(uid) + if result.get("choice", False): + self.run_book["PassCheck"] = True - # if self.maa_result == "Success!": - # logger.info( - # f"用户: {user[0]} - MAA进程成功登录PRTS", - # , - # ) - # run_book[0] = True - # self.update_log_text.emit("检测到MAA进程成功登录PRTS") - # else: - # logger.error( - # f"用户: {user[0]} - MAA未能正确登录到PRTS: {self.maa_result}", - # , - # ) - # self.update_log_text.emit( - # f"{self.maa_result}\n正在中止相关程序\n请等待10s" - # ) - # # 无命令行中止MAA与其子程序 - # logger.info( - # f"中止MAA进程:{self.maa_exe_path}", - # , - # ) - # self.maa_process_manager.kill(if_force=True) - # System.kill_process(self.maa_exe_path) - # self.if_open_emulator = True - # self.sleep(10) + await self.result_record() - # # 登录成功,结束循环 - # if run_book[0]: - # break - # # 登录失败,询问是否结束循环 - # elif not self.isInterruptionRequested: - - # self.play_sound.emit("排查重试") - # if not self.push_question( - # "操作提示", "MAA未能正确登录到PRTS,是否重试?" - # ): - # break - - # # 登录成功,录入人工排查情况 - # if run_book[0] and not self.isInterruptionRequested: - - # self.play_sound.emit("排查录入") - # if self.push_question( - # "操作提示", "请检查用户代理情况,该用户是否正确完成代理任务?" - # ): - # run_book[1] = True - - # # 结果录入 - # if run_book[0] and run_book[1]: - # logger.info( - # f"用户 {user[0]} 通过人工排查", - # ) - # user_data["Data"]["IfPassCheck"] = True - # user[1] = "完成" - # else: - # logger.info( - # f"用户 {user[0]} 未通过人工排查", - # , - # ) - # user_data["Data"]["IfPassCheck"] = False - # user[1] = "异常" - - # self.update_user_list.emit(self.user_list) - - # # 解除静默操作屏蔽 - # logger.info( - # "人工排查任务结束,解除静默操作屏蔽", - # ) - # Config.if_ignore_silence = False + await self.websocket.send_json( + TaskMessage( + type="Update", data={"user_list": self.user_list} + ).model_dump() + ) # 设置MAA模式 elif self.mode == "设置脚本": @@ -877,6 +852,83 @@ class MaaManager: await self.maa_log_monitor.start(self.maa_log_path, self.log_start_time) self.wait_event.clear() await self.wait_event.wait() + await self.maa_log_monitor.stop() + + async def result_record(self): + """记录用户结果信息""" + + if self.mode == "自动代理": + # 发送统计信息 + statistics = Config.merge_statistic_info(self.user_logs_list) + statistics["user_info"] = self.user_list[self.index]["name"] + statistics["start_time"] = self.user_start_time.strftime( + "%Y-%m-%d %H:%M:%S" + ) + statistics["end_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + statistics["maa_result"] = ( + "代理任务全部完成" + if (self.run_book["Annihilation"] and self.run_book["Routine"]) + else "代理任务未全部完成" + ) + await self.push_notification( + "统计信息", + f"{self.current_date} | 用户 {self.user_list[self.index]['name']} 的自动代理统计报告", + statistics, + ) + + if self.run_book["Annihilation"] and self.run_book["Routine"]: + # 成功完成代理的用户修改相关参数 + if ( + self.cur_user_data.get("Data", "ProxyTimes") == 0 + and self.cur_user_data.get("Info", "RemainedDay") != -1 + ): + await self.cur_user_data.set( + "Info", + "RemainedDay", + self.cur_user_data.get("Info", "RemainedDay") - 1, + ) + await self.cur_user_data.set( + "Data", + "ProxyTimes", + self.cur_user_data.get("Data", "ProxyTimes") + 1, + ) + self.user_list[self.index]["status"] = "完成" + logger.success( + f"用户 {self.user_list[self.index]['name']} 的自动代理任务已完成" + ) + Notify.push_plyer( + "成功完成一个自动代理任务!", + f"已完成用户 {self.user_list[self.index]['name']} 的自动代理任务", + f"已完成 {self.user_list[self.index]['name']} 的自动代理任务", + 3, + ) + else: + # 录入代理失败的用户 + logger.error( + f"用户 {self.user_list[self.index]['name']} 的自动代理任务未完成" + ) + self.user_list[self.index]["status"] = "异常" + + await self.websocket.send_json( + TaskMessage( + type="Update", data={"user_list": self.user_list} + ).model_dump() + ) + + elif self.mode == "人工排查": + + if self.run_book["SignIn"] and self.run_book["PassCheck"]: + logger.info( + f"用户 {self.user_list[self.index]['user_id']} 通过人工排查" + ) + await self.cur_user_data.set("Data", "IfPassCheck", True) + self.user_list[self.index]["status"] = "完成" + else: + logger.info( + f"用户 {self.user_list[self.index]['user_id']} 未通过人工排查", + ) + await self.cur_user_data.set("Data", "IfPassCheck", False) + self.user_list[self.index]["status"] = "异常" async def final_task(self, task: asyncio.Task): @@ -886,6 +938,7 @@ class MaaManager: logger.success(f"已解锁脚本配置 {self.script_id}") # 结束各子任务 + await Broadcast.unsubscribe(self.message_queue) await self.maa_process_manager.kill(if_force=True) await System.kill_process(self.maa_exe_path) await self.emulator_process_manager.kill() @@ -897,6 +950,45 @@ class MaaManager: if self.check_result != "Success!": return self.check_result + if self.mode == "人工排查": + + # 解除静默操作屏蔽 + logger.info("人工排查任务结束,解除静默操作屏蔽") + if self.script_id in Config.if_ignore_silence: + Config.if_ignore_silence.remove(self.script_id) + + if self.mode == "自动代理" and self.user_list[self.index]["status"] == "运行": + + if not self.maa_update_package: + + self.maa_result = "用户手动中止任务" + + # 保存运行日志以及统计信息 + if_six_star = await Config.save_maa_log( + Path.cwd() + / f"history/{self.curdate}/{self.user_list[self.index]['name']}/{self.log_start_time.strftime('%H-%M-%S')}.log", + self.maa_logs, + self.maa_result, + ) + self.user_logs_list.append( + Path.cwd() + / f"history/{self.curdate}/{self.user_list[self.index]['name']}/{self.log_start_time.strftime('%H-%M-%S')}.json" + ) + if if_six_star: + await self.push_notification( + "公招六星", + f"喜报:用户 {self.user_list[self.index]['name']} 公招出六星啦!", + { + "user_name": self.user_list[self.index]["name"], + }, + ) + + await self.result_record() + + elif self.mode == "人工排查" and self.user_list[self.index]["status"] == "运行": + + await self.result_record() + # 导出结果 if self.mode in ["自动代理", "人工排查"]: @@ -975,6 +1067,20 @@ class MaaManager: self.agree_bilibili(False) return result_text + async def get_message(self, message_id: str): + """获取当前任务的属性值""" + + logger.info(f"等待客户端回应消息:{message_id}") + + while True: + message = await self.message_queue.get() + if message.get("message_id") == message_id: + self.message_queue.task_done() + logger.success(f"收到客户端回应消息:{message_id}") + return message + else: + self.message_queue.task_done() + async def search_ADB_address(self) -> None: """搜索ADB实际地址""" @@ -1179,18 +1285,13 @@ class MaaManager: logger.info(f"MAA 任务结果:{self.maa_result},日志锁已释放") self.wait_event.set() - async def set_maa( - self, - mode: str, - user_data: Optional[MaaUserConfig] = None, - index: Optional[int] = None, - ) -> dict: + async def set_maa(self, mode: str) -> dict: """配置MAA运行参数""" - logger.info(f"开始配置MAA运行参数: {mode}/{index}") + logger.info(f"开始配置MAA运行参数: {mode}") if self.mode != "设置脚本" and mode != "Update": - if user_data and user_data.get("Info", "Server") == "Bilibili": + if self.cur_user_data.get("Info", "Server") == "Bilibili": self.agree_bilibili(True) else: self.agree_bilibili(False) @@ -1200,13 +1301,13 @@ class MaaManager: await System.kill_process(self.maa_exe_path) # 预导入MAA配置文件 - if self.mode in ["自动代理", "人工排查"] and user_data is not None: - if user_data.get("Info", "Mode") == "简洁": + if self.mode in ["自动代理", "人工排查"]: + if self.cur_user_data.get("Info", "Mode") == "简洁": shutil.copy( (Path.cwd() / f"data/{self.script_id}/Default/ConfigFile/gui.json"), self.maa_set_path, ) - elif user_data.get("Info", "Mode") == "详细": + elif self.cur_user_data.get("Info", "Mode") == "详细": shutil.copy( ( Path.cwd() @@ -1260,17 +1361,12 @@ class MaaManager: data["Global"][f"Timer.Timer{i}"] = "False" # 自动代理配置 - if ( - self.mode == "自动代理" - and mode in ["Annihilation", "Routine"] - and index is not None - and user_data is not None - ): + if self.mode == "自动代理" and mode in ["Annihilation", "Routine"]: - if (index == len(self.user_list) - 1) or ( - self.user_config[uuid.UUID(self.user_list[index + 1]["user_id"])].get( - "Info", "Mode" - ) + if (self.index == len(self.user_list) - 1) or ( + self.user_config[ + uuid.UUID(self.user_list[self.index + 1]["user_id"]) + ].get("Info", "Mode") == "详细" ): data["Configurations"]["Default"][ @@ -1305,20 +1401,20 @@ class MaaManager: data["Global"]["GUI.MinimizeToTray"] = "True" # 最小化时隐藏至托盘 # 客户端类型 - data["Configurations"]["Default"]["Start.ClientType"] = user_data.get( - "Info", "Server" + data["Configurations"]["Default"]["Start.ClientType"] = ( + self.cur_user_data.get("Info", "Server") ) # 账号切换 - if user_data.get("Info", "Server") == "Official": + if self.cur_user_data.get("Info", "Server") == "Official": data["Configurations"]["Default"]["Start.AccountName"] = ( - f"{user_data.get("Info", "Id")[:3]}****{user_data.get("Info", "Id")[7:]}" - if len(user_data.get("Info", "Id")) == 11 - else user_data.get("Info", "Id") + f"{self.cur_user_data.get("Info", "Id")[:3]}****{self.cur_user_data.get("Info", "Id")[7:]}" + if len(self.cur_user_data.get("Info", "Id")) == 11 + else self.cur_user_data.get("Info", "Id") ) - elif user_data.get("Info", "Server") == "Bilibili": - data["Configurations"]["Default"]["Start.AccountName"] = user_data.get( - "Info", "Id" + elif self.cur_user_data.get("Info", "Server") == "Bilibili": + data["Configurations"]["Default"]["Start.AccountName"] = ( + self.cur_user_data.get("Info", "Id") ) # 按预设设定任务 @@ -1348,7 +1444,10 @@ class MaaManager: ) # 生息演算 # 整理任务顺序 - if mode == "Annihilation" or user_data.get("Info", "Mode") == "简洁": + if ( + mode == "Annihilation" + or self.cur_user_data.get("Info", "Mode") == "简洁" + ): data["Configurations"]["Default"]["TaskQueue.Order.WakeUp"] = "0" data["Configurations"]["Default"]["TaskQueue.Order.Recruiting"] = "1" @@ -1360,13 +1459,15 @@ class MaaManager: data["Configurations"]["Default"]["TaskQueue.Order.Reclamation"] = "7" data["Configurations"]["Default"]["MainFunction.UseMedicine"] = ( - "False" if user_data.get("Info", "MedicineNumb") == 0 else "True" + "False" + if self.cur_user_data.get("Info", "MedicineNumb") == 0 + else "True" ) # 吃理智药 data["Configurations"]["Default"]["MainFunction.UseMedicine.Quantity"] = ( - str(user_data.get("Info", "MedicineNumb")) + str(self.cur_user_data.get("Info", "MedicineNumb")) ) # 吃理智药数量 data["Configurations"]["Default"]["MainFunction.Series.Quantity"] = ( - user_data.get("Info", "SeriesNumb") + self.cur_user_data.get("Info", "SeriesNumb") ) # 连战次数 if mode == "Annihilation": @@ -1390,7 +1491,7 @@ class MaaManager: "MainFunction.Annihilation.UseCustom" ] = "True" # 自定义剿灭关卡 data["Configurations"]["Default"]["MainFunction.Annihilation.Stage"] = ( - user_data.get("Info", "Annihilation") + self.cur_user_data.get("Info", "Annihilation") ) # 自定义剿灭关卡号 data["Configurations"]["Default"][ "Penguin.IsDrGrandet" @@ -1414,38 +1515,40 @@ class MaaManager: elif mode == "Routine": data["Configurations"]["Default"]["MainFunction.Stage1"] = ( - user_data.get("Info", "Stage") - if user_data.get("Info", "Stage") != "-" + self.cur_user_data.get("Info", "Stage") + if self.cur_user_data.get("Info", "Stage") != "-" else "" ) # 主关卡 data["Configurations"]["Default"]["MainFunction.Stage2"] = ( - user_data.get("Info", "Stage_1") - if user_data.get("Info", "Stage_1") != "-" + self.cur_user_data.get("Info", "Stage_1") + if self.cur_user_data.get("Info", "Stage_1") != "-" else "" ) # 备选关卡1 data["Configurations"]["Default"]["MainFunction.Stage3"] = ( - user_data.get("Info", "Stage_2") - if user_data.get("Info", "Stage_2") != "-" + self.cur_user_data.get("Info", "Stage_2") + if self.cur_user_data.get("Info", "Stage_2") != "-" else "" ) # 备选关卡2 data["Configurations"]["Default"]["MainFunction.Stage4"] = ( - user_data.get("Info", "Stage_3") - if user_data.get("Info", "Stage_3") != "-" + self.cur_user_data.get("Info", "Stage_3") + if self.cur_user_data.get("Info", "Stage_3") != "-" else "" ) # 备选关卡3 data["Configurations"]["Default"]["Fight.RemainingSanityStage"] = ( - user_data.get("Info", "Stage_Remain") - if user_data.get("Info", "Stage_Remain") != "-" + self.cur_user_data.get("Info", "Stage_Remain") + if self.cur_user_data.get("Info", "Stage_Remain") != "-" else "" ) # 剩余理智关卡 data["Configurations"]["Default"][ "GUI.UseAlternateStage" ] = "True" # 备选关卡 data["Configurations"]["Default"]["Fight.UseRemainingSanityStage"] = ( - "True" if user_data.get("Info", "Stage_Remain") != "-" else "False" + "True" + if self.cur_user_data.get("Info", "Stage_Remain") != "-" + else "False" ) # 使用剩余理智 - if user_data.get("Info", "Mode") == "简洁": + if self.cur_user_data.get("Info", "Mode") == "简洁": data["Configurations"]["Default"][ "Penguin.IsDrGrandet" @@ -1457,7 +1560,7 @@ class MaaManager: "Fight.UseExpiringMedicine" ] = "True" # 无限吃48小时内过期的理智药 # 自定义基建配置 - if user_data.get("Info", "InfrastMode") == "Custom": + if self.cur_user_data.get("Info", "InfrastMode") == "Custom": if ( Path.cwd() @@ -1469,7 +1572,7 @@ class MaaManager: ] = "Custom" # 基建模式 data["Configurations"]["Default"][ "Infrast.CustomInfrastPlanIndex" - ] = user_data.get( + ] = self.cur_user_data.get( "Data", "CustomInfrastPlanIndex" ) # 自定义基建配置索引 data["Configurations"]["Default"][ @@ -1486,13 +1589,13 @@ class MaaManager: ) # 自定义基建配置文件地址 else: logger.warning( - f"未选择用户 {user_data.get('Info', 'Name')} 的自定义基建配置文件" + f"未选择用户 {self.cur_user_data.get('Info', 'Name')} 的自定义基建配置文件" ) await self.websocket.send_json( TaskMessage( type="Info", data={ - "warning": f"未选择用户 {user_data.get('Info', 'Name')} 的自定义基建配置文件" + "warning": f"未选择用户 {self.cur_user_data.get('Info', 'Name')} 的自定义基建配置文件" }, ) ) @@ -1501,10 +1604,10 @@ class MaaManager: ] = "Normal" # 基建模式 else: data["Configurations"]["Default"]["Infrast.InfrastMode"] = ( - user_data.get("Info", "InfrastMode") + self.cur_user_data.get("Info", "InfrastMode") ) # 基建模式 - elif user_data.get("Info", "Mode") == "详细": + elif self.cur_user_data.get("Info", "Mode") == "详细": # 基建模式 if ( @@ -1513,12 +1616,12 @@ class MaaManager: ): data["Configurations"]["Default"][ "Infrast.CustomInfrastPlanIndex" - ] = user_data.get( + ] = self.cur_user_data.get( "Data", "CustomInfrastPlanIndex" ) # 自定义基建配置索引 # 人工排查配置 - elif self.mode == "人工排查" and user_data is not None: + elif self.mode == "人工排查" and self.cur_user_data is not None: data["Configurations"]["Default"][ "MainFunction.PostActions" @@ -1543,20 +1646,20 @@ class MaaManager: ] = "False" # 自动安装更新包 # 客户端类型 - data["Configurations"]["Default"]["Start.ClientType"] = user_data.get( - "Info", "Server" + data["Configurations"]["Default"]["Start.ClientType"] = ( + self.cur_user_data.get("Info", "Server") ) # 账号切换 - if user_data.get("Info", "Server") == "Official": + if self.cur_user_data.get("Info", "Server") == "Official": data["Configurations"]["Default"]["Start.AccountName"] = ( - f"{user_data.get('Info', 'Id')[:3]}****{user_data.get('Info', 'Id')[7:]}" - if len(user_data.get("Info", "Id")) == 11 - else user_data.get("Info", "Id") + f"{self.cur_user_data.get('Info', 'Id')[:3]}****{self.cur_user_data.get('Info', 'Id')[7:]}" + if len(self.cur_user_data.get("Info", "Id")) == 11 + else self.cur_user_data.get("Info", "Id") ) - elif user_data.get("Info", "Server") == "Bilibili": - data["Configurations"]["Default"]["Start.AccountName"] = user_data.get( - "Info", "Id" + elif self.cur_user_data.get("Info", "Server") == "Bilibili": + data["Configurations"]["Default"]["Start.AccountName"] = ( + self.cur_user_data.get("Info", "Id") ) data["Configurations"]["Default"][ @@ -1696,7 +1799,7 @@ class MaaManager: with self.maa_set_path.open(mode="w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) - logger.success(f"MAA运行参数配置完成: {mode}/{index}") + logger.success(f"MAA运行参数配置完成: {mode}") return data @@ -1727,13 +1830,7 @@ class MaaManager: with self.maa_tasks_path.open(mode="w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) - async def push_notification( - self, - mode: str, - title: str, - message, - user_data: Optional[MaaUserConfig] = None, - ) -> None: + async def push_notification(self, mode: str, title: str, message) -> None: """通过所有渠道推送通知""" logger.info(f"开始推送通知,模式:{mode},标题:{title}") @@ -1843,31 +1940,29 @@ class MaaManager: ) # 发送用户单独通知 - if ( - isinstance(user_data, MaaUserConfig) - and user_data.get("Notify", "Enabled") - and user_data.get("Notify", "IfSendStatistic") + if self.cur_user_data.get("Notify", "Enabled") and self.cur_user_data.get( + "Notify", "IfSendStatistic" ): # 发送邮件通知 - if user_data.get("Notify", "IfSendMail"): - if user_data.get("Notify", "ToAddress"): + if self.cur_user_data.get("Notify", "IfSendMail"): + if self.cur_user_data.get("Notify", "ToAddress"): Notify.send_mail( "网页", title, message_html, - user_data.get("Notify", "ToAddress"), + self.cur_user_data.get("Notify", "ToAddress"), ) else: logger.error(f"用户邮箱地址为空,无法发送用户单独的邮件通知") # 发送ServerChan通知 - if user_data.get("Notify", "IfServerChan"): - if user_data.get("Notify", "ServerChanKey"): + if self.cur_user_data.get("Notify", "IfServerChan"): + if self.cur_user_data.get("Notify", "ServerChanKey"): Notify.ServerChanPush( title, f"{serverchan_message}\n\nAUTO_MAA 敬上", - user_data.get("Notify", "ServerChanKey"), + self.cur_user_data.get("Notify", "ServerChanKey"), ) else: logger.error( @@ -1875,12 +1970,12 @@ class MaaManager: ) # 推送CompanyWebHookBot通知 - if user_data.get("Notify", "IfCompanyWebHookBot"): - if user_data.get("Notify", "CompanyWebHookBotUrl"): + if self.cur_user_data.get("Notify", "IfCompanyWebHookBot"): + if self.cur_user_data.get("Notify", "CompanyWebHookBotUrl"): Notify.WebHookPush( title, f"{message_text}\n\nAUTO_MAA 敬上", - user_data.get("Notify", "CompanyWebHookBotUrl"), + self.cur_user_data.get("Notify", "CompanyWebHookBotUrl"), ) else: logger.error( @@ -1921,32 +2016,30 @@ class MaaManager: ) # 发送用户单独通知 - if ( - isinstance(user_data, MaaUserConfig) - and user_data.get("Notify", "Enabled") - and user_data.get("Notify", "IfSendSixStar") + if self.cur_user_data.get("Notify", "Enabled") and self.cur_user_data.get( + "Notify", "IfSendSixStar" ): # 发送邮件通知 - if user_data.get("Notify", "IfSendMail"): - if user_data.get("Notify", "ToAddress"): + if self.cur_user_data.get("Notify", "IfSendMail"): + if self.cur_user_data.get("Notify", "ToAddress"): Notify.send_mail( "网页", title, message_html, - user_data.get("Notify", "ToAddress"), + self.cur_user_data.get("Notify", "ToAddress"), ) else: logger.error("用户邮箱地址为空,无法发送用户单独的邮件通知") # 发送ServerChan通知 - if user_data.get("Notify", "IfServerChan"): + if self.cur_user_data.get("Notify", "IfServerChan"): - if user_data.get("Notify", "ServerChanKey"): + if self.cur_user_data.get("Notify", "ServerChanKey"): Notify.ServerChanPush( title, "好羡慕~\n\nAUTO_MAA 敬上", - user_data.get("Notify", "ServerChanKey"), + self.cur_user_data.get("Notify", "ServerChanKey"), ) else: logger.error( @@ -1954,16 +2047,16 @@ class MaaManager: ) # 推送CompanyWebHookBot通知 - if user_data.get("Notify", "IfCompanyWebHookBot"): - if user_data.get("Notify", "CompanyWebHookBotUrl"): + if self.cur_user_data.get("Notify", "IfCompanyWebHookBot"): + if self.cur_user_data.get("Notify", "CompanyWebHookBotUrl"): Notify.WebHookPush( title, "好羡慕~\n\nAUTO_MAA 敬上", - user_data.get("Notify", "CompanyWebHookBotUrl"), + self.cur_user_data.get("Notify", "CompanyWebHookBotUrl"), ) Notify.CompanyWebHookBotPushImage( Path.cwd() / "resources/images/notification/six_star.png", - user_data.get("Notify", "CompanyWebHookBotUrl"), + self.cur_user_data.get("Notify", "CompanyWebHookBotUrl"), ) else: logger.error( diff --git a/app/utils/LogMonitor.py b/app/utils/LogMonitor.py index c970b83..ccad835 100644 --- a/app/utils/LogMonitor.py +++ b/app/utils/LogMonitor.py @@ -27,6 +27,7 @@ class LogMonitor: self.last_callback_time: datetime = datetime.now() self.log_contents: List[str] = [] self.task: Optional[asyncio.Task] = None + self.__is_running = False async def monitor_log(self): """监控日志文件的主循环""" @@ -35,77 +36,63 @@ class LogMonitor: logger.info(f"开始监控日志文件: {self.log_file_path}") - consecutive_errors = 0 + while self.__is_running: + logger.debug("正在检查日志文件...") + log_contents = [] + if_log_start = False - while True: - try: - log_contents = [] - if_log_start = False - - # 检查文件是否仍然存在 - if not self.log_file_path.exists(): - logger.warning(f"日志文件不存在: {self.log_file_path}") - await asyncio.sleep(1) - continue - - # 尝试读取文件 - try: - async with aiofiles.open( - self.log_file_path, "r", encoding=self.encoding - ) as f: - async for line in f: - if not if_log_start: - try: - entry_time = datetime.strptime( - line[ - self.time_stamp_range[ - 0 - ] : self.time_stamp_range[1] - ], - self.time_format, - ) - if entry_time > self.log_start_time: - if_log_start = True - log_contents.append(line) - except (ValueError, IndexError): - continue - else: - log_contents.append(line) - - except (FileNotFoundError, PermissionError) as e: - logger.warning(f"文件访问错误: {e}") - await asyncio.sleep(5) - continue - except UnicodeDecodeError as e: - logger.error(f"文件编码错误: {e}") - await asyncio.sleep(10) - continue - - # 调用回调 - if ( - log_contents != self.log_contents - or datetime.now() - self.last_callback_time > timedelta(minutes=1) - ): - self.log_contents = log_contents - self.last_callback_time = datetime.now() - - # 安全调用回调函数 - try: - await self.callback(log_contents) - except Exception as e: - logger.error(f"回调函数执行失败: {e}") - - except asyncio.CancelledError: - logger.info("日志监控任务被取消") - break - except Exception as e: - logger.error(f"监控日志时发生未知错误:{e}") - - consecutive_errors += 1 - wait_time = min(60, 2**consecutive_errors) - await asyncio.sleep(wait_time) + # 检查文件是否仍然存在 + if not self.log_file_path.exists(): + logger.warning(f"日志文件不存在: {self.log_file_path}") continue + # 尝试读取文件 + try: + async with aiofiles.open( + self.log_file_path, "r", encoding=self.encoding + ) as f: + async for line in f: + if not if_log_start: + try: + entry_time = datetime.strptime( + line[ + self.time_stamp_range[ + 0 + ] : self.time_stamp_range[1] + ], + self.time_format, + ) + if entry_time > self.log_start_time: + if_log_start = True + log_contents.append(line) + except (ValueError, IndexError): + continue + else: + log_contents.append(line) + + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"文件访问错误: {e}") + await asyncio.sleep(5) + continue + except UnicodeDecodeError as e: + logger.error(f"文件编码错误: {e}") + await asyncio.sleep(10) + continue + + # 调用回调 + if ( + log_contents != self.log_contents + or datetime.now() - self.last_callback_time > timedelta(minutes=1) + ): + self.log_contents = log_contents + self.last_callback_time = datetime.now() + + # 安全调用回调函数 + try: + await self.callback(log_contents) + except Exception as e: + logger.error(f"回调函数执行失败: {e}") + await asyncio.sleep(1) async def start(self, log_file_path: Path, start_time: datetime) -> None: @@ -117,6 +104,8 @@ class LogMonitor: if self.task is not None and not self.task.done(): await self.stop() + self.__is_running = True + self.log_contents = [] self.log_file_path = log_file_path self.log_start_time = start_time self.task = asyncio.create_task(self.monitor_log()) @@ -125,14 +114,15 @@ class LogMonitor: async def stop(self): """停止监控""" + logger.info("请求取消日志监控任务") + if self.task is not None and not self.task.done(): self.task.cancel() + try: await self.task except asyncio.CancelledError: - pass + logger.info("日志监控任务已中止") - self.log_contents = [] - self.log_file_path = None + logger.success("日志监控任务已停止") self.task = None - logger.info(f"日志监控已停止: {self.log_file_path}") diff --git a/app/utils/ProcessManager.py b/app/utils/ProcessManager.py index 0ea1a11..66fb75f 100644 --- a/app/utils/ProcessManager.py +++ b/app/utils/ProcessManager.py @@ -22,6 +22,7 @@ import asyncio import psutil import subprocess +from datetime import datetime, timedelta from pathlib import Path @@ -34,6 +35,7 @@ class ProcessManager: self.main_pid = None self.tracked_pids = set() self.check_task = None + self.track_end_time = datetime.now() async def open_process( self, path: Path, args: list = [], tracking_time: int = 60 @@ -88,14 +90,13 @@ class ProcessManager: # 启动持续追踪任务 if tracking_time > 0: + self.track_end_time = datetime.now() + timedelta(seconds=tracking_time) self.check_task = asyncio.create_task(self.track_processes()) - await asyncio.sleep(tracking_time) - await self.stop_tracking() async def track_processes(self) -> None: """更新子进程列表""" - while True: + while datetime.now() < self.track_end_time: current_pids = set(self.tracked_pids) for pid in current_pids: try: @@ -108,16 +109,6 @@ class ProcessManager: continue await asyncio.sleep(0.1) - async def stop_tracking(self) -> None: - """停止更新子进程列表""" - - if self.check_task and not self.check_task.done(): - self.check_task.cancel() - try: - await self.check_task - except asyncio.CancelledError: - pass - async def is_running(self) -> bool: """检查所有跟踪的进程是否还在运行""" @@ -152,6 +143,13 @@ class ProcessManager: async def clear(self) -> None: """清空跟踪的进程列表""" - await self.stop_tracking() + if self.check_task is not None and not self.check_task.done(): + self.check_task.cancel() + + try: + await self.check_task + except asyncio.CancelledError: + pass + self.main_pid = None self.tracked_pids.clear()