diff --git a/app/api/__init__.py b/app/api/__init__.py index 8a0210f..4fa5b33 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -27,6 +27,7 @@ from .info import router as info_router from .scripts import router as scripts_router from .plan import router as plan_router from .queue import router as queue_router +from .dispatch import router as dispatch_router from .setting import router as setting_router __all__ = [ @@ -34,5 +35,6 @@ __all__ = [ "scripts_router", "plan_router", "queue_router", + "dispatch_router", "setting_router", ] diff --git a/app/api/dispatch.py b/app/api/dispatch.py index 0fbccf0..c0d7346 100644 --- a/app/api/dispatch.py +++ b/app/api/dispatch.py @@ -21,7 +21,8 @@ import uuid -from fastapi import APIRouter, WebSocket, Body, Path +import asyncio +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Body, Path from app.core import Config, TaskManager from app.models.schema import * @@ -55,14 +56,26 @@ async def stop_task(task: DispatchIn = Body(...)) -> OutBase: async def websocket_endpoint( websocket: WebSocket, taskId: str = Path(..., description="要连接的任务ID") ): + await websocket.accept() try: uid = uuid.UUID(taskId) except ValueError: await websocket.close(code=1008, reason="无效的任务ID") return - if uid in TaskManager.connection_events: + if uid in TaskManager.connection_events and uid not in TaskManager.websocket_dict: TaskManager.websocket_dict[uid] = websocket TaskManager.connection_events[uid].set() + while True: + try: + data = await asyncio.wait_for(websocket.receive_json(), timeout=30.0) + await Config.message_queue.put({"task_id": uid, "message": data}) + except asyncio.TimeoutError: + await websocket.send_json( + TaskMessage(type="Signal", data={"Ping": "无描述"}).model_dump() + ) + except WebSocketDisconnect: + TaskManager.websocket_dict.pop(uid, None) + break else: await websocket.close(code=1008, reason="任务不存在或已结束") diff --git a/app/core/config.py b/app/core/config.py index 4d2e32e..2562536 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -578,15 +578,6 @@ class AppConfig(GlobalConfig): def __init__(self) -> None: super().__init__(if_save_multi_config=False) - self.log_path = Path.cwd() / "debug/app.log" - self.database_path = Path.cwd() / "data/data.db" - self.config_path = Path.cwd() / "config" - self.key_path = Path.cwd() / "data/key" - - self.silence_dict: Dict[Path, datetime] = {} - self.power_sign = "NoAction" - self.if_ignore_silence = False - logger.info("") logger.info("===================================") logger.info("AUTO_MAA 后端应用程序") @@ -594,10 +585,19 @@ class AppConfig(GlobalConfig): logger.info(f"工作目录: {Path.cwd()}") logger.info("===================================") + self.log_path = Path.cwd() / "debug/app.log" + self.database_path = Path.cwd() / "data/data.db" + self.config_path = Path.cwd() / "config" + self.key_path = Path.cwd() / "data/key" # 检查目录 self.log_path.parent.mkdir(parents=True, exist_ok=True) self.config_path.mkdir(parents=True, exist_ok=True) + self.message_queue = asyncio.Queue() + self.silence_dict: Dict[Path, datetime] = {} + self.power_sign = "NoAction" + self.if_ignore_silence = False + self.ScriptConfig = MultipleConfig([MaaConfig, GeneralConfig]) self.PlanConfig = MultipleConfig([MaaPlanConfig]) self.QueueConfig = MultipleConfig([QueueConfig]) diff --git a/app/core/task_manager.py b/app/core/task_manager.py index d58eaa0..524ec9a 100644 --- a/app/core/task_manager.py +++ b/app/core/task_manager.py @@ -22,6 +22,7 @@ import uuid import asyncio from fastapi import WebSocket +from functools import partial from typing import Dict, Optional from .config import Config, MaaConfig, GeneralConfig, QueueConfig @@ -82,8 +83,21 @@ class _TaskManager: raise RuntimeError(f"The task {task_id} is already running.") logger.info(f"创建任务:{task_id},模式:{mode}") + self.task_dict[task_id] = asyncio.create_task( + self.run_task(mode, task_id, actual_id) + ) + self.task_dict[task_id].add_done_callback( + lambda t: asyncio.create_task(self.remove_task(t, mode, task_id)) + ) - # 创建任务实例并连接信号 + return task_id + + # @logger.catch + async def run_task( + self, mode: str, task_id: uuid.UUID, actual_id: Optional[uuid.UUID] + ): + + # 等待连接信号 if task_id in self.connection_events: self.connection_events[task_id].clear() else: @@ -96,20 +110,6 @@ class _TaskManager: logger.info(f"开始运行任务:{task_id},模式:{mode}") - self.task_dict[task_id] = asyncio.create_task( - self.run_task(mode, task_id, actual_id) - ) - self.task_dict[task_id].add_done_callback( - lambda t: asyncio.create_task(self.remove_task(t, mode, task_id)) - ) - - return task_id - - @logger.catch - async def run_task( - self, mode: str, task_id: uuid.UUID, actual_id: Optional[uuid.UUID] - ): - websocket = self.websocket_dict[task_id] if mode == "设置脚本": @@ -129,10 +129,13 @@ class _TaskManager: ) return - task = asyncio.create_task(task_item.run()) - task.add_done_callback( + uid = actual_id or uuid.uuid4() + self.task_dict[uid] = asyncio.create_task(task_item.run()) + self.task_dict[uid].add_done_callback( lambda t: asyncio.create_task(task_item.final_task(t)) ) + self.task_dict[uid].add_done_callback(partial(self.task_dict.pop, uid)) + await self.task_dict[uid] else: @@ -205,10 +208,14 @@ class _TaskManager: ) continue - task = asyncio.create_task(task_item.run()) - task.add_done_callback( + self.task_dict[script_id] = asyncio.create_task(task_item.run()) + self.task_dict[script_id].add_done_callback( lambda t: asyncio.create_task(task_item.final_task(t)) ) + self.task_dict[script_id].add_done_callback( + partial(self.task_dict.pop, script_id) + ) + await self.task_dict[script_id] async def stop_task(self, task_id: str) -> None: """ @@ -249,14 +256,13 @@ class _TaskManager: await task except asyncio.CancelledError: logger.info(f"任务 {task_id} 已结束") - self.task_dict.pop(task_id) + self.task_dict.pop(task_id) - websocket = self.websocket_dict.pop(task_id, None) + websocket = self.websocket_dict.get(task_id, None) if websocket: await websocket.send_json( TaskMessage(type="Signal", data={"Accomplish": "无描述"}).model_dump() ) - await websocket.close() TaskManager = _TaskManager() diff --git a/app/models/ConfigBase.py b/app/models/ConfigBase.py index cd4b49e..90d780d 100644 --- a/app/models/ConfigBase.py +++ b/app/models/ConfigBase.py @@ -431,8 +431,10 @@ class ConfigBase: for name in dir(self): item = getattr(self, name) - if isinstance(item, ConfigItem | MultipleConfig): + if isinstance(item, ConfigItem): item.lock() + elif isinstance(item, MultipleConfig): + await item.lock() async def unlock(self): """ diff --git a/app/services/system.py b/app/services/system.py index bd926aa..d1e89aa 100644 --- a/app/services/system.py +++ b/app/services/system.py @@ -264,10 +264,7 @@ class _SystemHandler: pname = proc.info["name"].lower() if any(keyword.lower() in pname for keyword in keywords): proc.kill() - logger.info( - f"已关闭 MuMu 模拟器进程: {proc.info['name']}", - module="系统服务", - ) + logger.info(f"已关闭 MuMu 模拟器进程: {proc.info['name']}") except (psutil.NoSuchProcess, psutil.AccessDenied): continue diff --git a/app/task/MAA.py b/app/task/MAA.py index d10be38..390246f 100644 --- a/app/task/MAA.py +++ b/app/task/MAA.py @@ -105,7 +105,7 @@ class MaaManager: if not self.maa_set_path.exists(): return "MAA配置文件不存在,请检查MAA路径设置!" if (self.mode != "设置脚本" or self.user_id is not None) and not ( - Path.cwd() / f"data/{self.script_id}/Default/gui.json" + Path.cwd() / f"data/{self.script_id}/Default/ConfigFile/gui.json" ).exists(): return "未完成 MAA 全局设置,请先设置 MAA!" return "Success!" @@ -118,11 +118,11 @@ class MaaManager: self.begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") await self.configure() - check_result = self.check_config() - if check_result != "Success!": - logger.error(f"未通过配置检查:{check_result}") + self.check_result = self.check_config() + if self.check_result != "Success!": + logger.error(f"未通过配置检查:{self.check_result}") await self.websocket.send_json( - TaskMessage(type="Info", data={"Error": check_result}).model_dump() + TaskMessage(type="Info", data={"Error": self.check_result}).model_dump() ) return @@ -145,7 +145,7 @@ class MaaManager: } for uid, config in self.user_config.items() if config.get("Info", "Status") - and config.get("Info", "RemainedDay") > 0 + and config.get("Info", "RemainedDay") != 0 ] self.user_list = sorted( self.user_list, @@ -184,7 +184,7 @@ class MaaManager: if self.script_config.get( "Run", "ProxyTimesLimit" ) == 0 or user_data.get("Data", "ProxyTimes") < self.script_config.get( - "RunSet", "ProxyTimesLimit" + "Run", "ProxyTimesLimit" ): user["status"] = "运行" await self.websocket.send_json( @@ -201,9 +201,7 @@ class MaaManager: ) continue - logger.info( - f"开始代理用户: {user['user_id']}", - ) + logger.info(f"开始代理用户: {user['user_id']}") # 详细模式用户首次代理需打开模拟器 if user_data.get("Info", "Mode") == "详细": @@ -214,8 +212,8 @@ class MaaManager: "Annihilation": bool( user_data.get("Info", "Annihilation") == "Close" ), - "Routine": user_data.get("Info", "Mode") == "简洁" - or not user_data.get("Info", "Routine"), + "Routine": user_data.get("Info", "Mode") == "复杂" + and not user_data.get("Info", "Routine"), } user_logs_list = [] @@ -380,18 +378,18 @@ class MaaManager: ) # 尝试次数循环 - for i in range(user_data.get("Run", "RunTimesLimit")): + for i in range(self.script_config.get("Run", "RunTimesLimit")): if run_book[mode]: break logger.info( - f"用户 {user['name']} - 模式: {mode} - 尝试次数: {i + 1}/{user_data.get('Run','RunTimesLimit')}", + f"用户 {user['name']} - 模式: {mode} - 尝试次数: {i + 1}/{self.script_config.get('Run','RunTimesLimit')}", ) # 配置MAA if isinstance(user_data, MaaUserConfig): - set = await self.set_maa(mode, index, user_data) + set = await self.set_maa(mode, user_data, index) # 记录当前时间 self.log_start_time = datetime.now() @@ -681,6 +679,7 @@ class MaaManager: }, ).model_dump() ) + await self.set_maa("Update") subprocess.Popen( [self.maa_exe_path], creationflags=subprocess.CREATE_NO_WINDOW, @@ -742,175 +741,165 @@ class MaaManager: ).model_dump() ) - # # 人工排查模式 - # elif self.mode == "人工排查": + # # 人工排查模式 + # elif self.mode == "人工排查": - # # 人工排查时,屏蔽静默操作 - # logger.info( - # "人工排查任务开始,屏蔽静默操作", - # ) - # Config.if_ignore_silence = True + # # 人工排查时,屏蔽静默操作 + # logger.info( + # "人工排查任务开始,屏蔽静默操作", + # ) + # Config.if_ignore_silence = True - # # 标记是否需要启动模拟器 - # self.if_open_emulator = True - # # 标识排查模式 - # for _ in self.user_list: - # _[0] += "_排查模式" + # # 标记是否需要启动模拟器 + # self.if_open_emulator = True + # # 标识排查模式 + # for _ in self.user_list: + # _[0] += "_排查模式" - # # 开始排查 - # for user in self.user_list: + # # 开始排查 + # for user in self.user_list: - # user_data = self.data[user[2]]["Config"] + # user_data = self.data[user[2]]["Config"] - # if self.isInterruptionRequested: - # break + # if self.isInterruptionRequested: + # break - # logger.info(f"开始排查用户: {user[0]}", ) + # logger.info(f"开始排查用户: {user[0]}", ) - # user[1] = "运行" - # self.update_user_list.emit(self.user_list) + # user[1] = "运行" + # self.update_user_list.emit(self.user_list) - # if user_data["Info"]["Mode"] == "详细": - # self.if_open_emulator = True + # if user_data["Info"]["Mode"] == "详细": + # self.if_open_emulator = True - # run_book = [False for _ in range(2)] + # run_book = [False for _ in range(2)] - # # 启动重试循环 - # while not self.isInterruptionRequested: + # # 启动重试循环 + # while not self.isInterruptionRequested: - # # 配置MAA - # self.set_maa("人工排查", user[2]) + # # 配置MAA + # self.set_maa("人工排查", user[2]) - # # 记录当前时间 - # self.log_start_time = datetime.now() - # # 创建MAA任务 - # logger.info( - # f"启动MAA进程:{self.maa_exe_path}", - # , - # ) - # self.maa_process_manager.open_process(self.maa_exe_path, [], 0) + # # 记录当前时间 + # self.log_start_time = datetime.now() + # # 创建MAA任务 + # logger.info( + # f"启动MAA进程:{self.maa_exe_path}", + # , + # ) + # self.maa_process_manager.open_process(self.maa_exe_path, [], 0) - # # 监测MAA运行状态 - # self.log_check_mode = "人工排查" - # self.start_monitor() + # # 监测MAA运行状态 + # self.log_check_mode = "人工排查" + # self.start_monitor() - # if self.maa_result == "Success!": - # logger.info( - # f"用户: {user[0]} - MAA进程成功登录PRTS", - # , - # ) - # run_book[0] = True - # self.update_log_text.emit("检测到MAA进程成功登录PRTS") - # else: - # logger.error( - # f"用户: {user[0]} - MAA未能正确登录到PRTS: {self.maa_result}", - # , - # ) - # self.update_log_text.emit( - # f"{self.maa_result}\n正在中止相关程序\n请等待10s" - # ) - # # 无命令行中止MAA与其子程序 - # logger.info( - # f"中止MAA进程:{self.maa_exe_path}", - # , - # ) - # self.maa_process_manager.kill(if_force=True) - # System.kill_process(self.maa_exe_path) - # self.if_open_emulator = True - # self.sleep(10) + # if self.maa_result == "Success!": + # logger.info( + # f"用户: {user[0]} - MAA进程成功登录PRTS", + # , + # ) + # run_book[0] = True + # self.update_log_text.emit("检测到MAA进程成功登录PRTS") + # else: + # logger.error( + # f"用户: {user[0]} - MAA未能正确登录到PRTS: {self.maa_result}", + # , + # ) + # self.update_log_text.emit( + # f"{self.maa_result}\n正在中止相关程序\n请等待10s" + # ) + # # 无命令行中止MAA与其子程序 + # logger.info( + # f"中止MAA进程:{self.maa_exe_path}", + # , + # ) + # self.maa_process_manager.kill(if_force=True) + # System.kill_process(self.maa_exe_path) + # self.if_open_emulator = True + # self.sleep(10) - # # 登录成功,结束循环 - # if run_book[0]: - # break - # # 登录失败,询问是否结束循环 - # elif not self.isInterruptionRequested: + # # 登录成功,结束循环 + # if run_book[0]: + # break + # # 登录失败,询问是否结束循环 + # elif not self.isInterruptionRequested: - # self.play_sound.emit("排查重试") - # if not self.push_question( - # "操作提示", "MAA未能正确登录到PRTS,是否重试?" - # ): - # break + # self.play_sound.emit("排查重试") + # if not self.push_question( + # "操作提示", "MAA未能正确登录到PRTS,是否重试?" + # ): + # break - # # 登录成功,录入人工排查情况 - # if run_book[0] and not self.isInterruptionRequested: + # # 登录成功,录入人工排查情况 + # if run_book[0] and not self.isInterruptionRequested: - # self.play_sound.emit("排查录入") - # if self.push_question( - # "操作提示", "请检查用户代理情况,该用户是否正确完成代理任务?" - # ): - # run_book[1] = True + # self.play_sound.emit("排查录入") + # if self.push_question( + # "操作提示", "请检查用户代理情况,该用户是否正确完成代理任务?" + # ): + # run_book[1] = True - # # 结果录入 - # if run_book[0] and run_book[1]: - # logger.info( - # f"用户 {user[0]} 通过人工排查", - # ) - # user_data["Data"]["IfPassCheck"] = True - # user[1] = "完成" - # else: - # logger.info( - # f"用户 {user[0]} 未通过人工排查", - # , - # ) - # user_data["Data"]["IfPassCheck"] = False - # user[1] = "异常" + # # 结果录入 + # if run_book[0] and run_book[1]: + # logger.info( + # f"用户 {user[0]} 通过人工排查", + # ) + # user_data["Data"]["IfPassCheck"] = True + # user[1] = "完成" + # else: + # logger.info( + # f"用户 {user[0]} 未通过人工排查", + # , + # ) + # user_data["Data"]["IfPassCheck"] = False + # user[1] = "异常" - # self.update_user_list.emit(self.user_list) + # self.update_user_list.emit(self.user_list) - # # 解除静默操作屏蔽 - # logger.info( - # "人工排查任务结束,解除静默操作屏蔽", - # ) - # Config.if_ignore_silence = False + # # 解除静默操作屏蔽 + # logger.info( + # "人工排查任务结束,解除静默操作屏蔽", + # ) + # Config.if_ignore_silence = False - # # 设置MAA模式 - # elif "设置MAA" in self.mode: + # 设置MAA模式 + elif self.mode == "设置脚本": - # # 配置MAA - # self.set_maa(self.mode, "") - # # 创建MAA任务 - # logger.info( - # f"启动MAA进程:{self.maa_exe_path}", - # ) - # self.maa_process_manager.open_process(self.maa_exe_path, [], 0) - # # 记录当前时间 - # self.log_start_time = datetime.now() + # 配置MAA + await self.set_maa(self.mode) + # 创建MAA任务 + logger.info(f"启动MAA进程:{self.maa_exe_path}") + await self.maa_process_manager.open_process(self.maa_exe_path, [], 0) + # 记录当前时间 + self.log_start_time = datetime.now() - # # 监测MAA运行状态 - # self.log_check_mode = "设置MAA" - # self.start_monitor() - - # if "全局" in self.mode: - # (self.config_path / "Default").mkdir(parents=True, exist_ok=True) - # shutil.copy(self.maa_set_path, self.config_path / "Default") - # logger.success( - # f"全局MAA配置文件已保存到 {self.config_path / 'Default/gui.json'}", - # , - # ) - - # elif "用户" in self.mode: - # self.user_config_path.mkdir(parents=True, exist_ok=True) - # shutil.copy(self.maa_set_path, self.user_config_path) - # logger.success( - # f"用户MAA配置文件已保存到 {self.user_config_path}", - # , - # ) - - # result_text = "" + # 监测MAA运行状态 + await self.maa_log_monitor.start(self.maa_log_path, self.log_start_time) + self.wait_event.clear() + await self.wait_event.wait() async def final_task(self, task: asyncio.Task): + logger.info("MAA 主任务已结束,开始执行后续操作") + await Config.ScriptConfig[self.script_id].unlock() + logger.success(f"已解锁脚本配置 {self.script_id}") + + # 结束各子任务 + await self.maa_process_manager.kill(if_force=True) + await System.kill_process(self.maa_exe_path) + await self.emulator_process_manager.kill() + await self.maa_log_monitor.stop() + del self.maa_process_manager + del self.emulator_process_manager + del self.maa_log_monitor + + if self.check_result != "Success!": + return self.check_result # 导出结果 if self.mode in ["自动代理", "人工排查"]: - # 结束各子任务 - await self.maa_process_manager.kill(if_force=True) - await System.kill_process(self.maa_exe_path) - await self.emulator_process_manager.kill() - await self.maa_log_monitor.stop() - # 更新用户数据 sc = Config.ScriptConfig[self.script_id] if isinstance(sc, MaaConfig): @@ -960,10 +949,24 @@ class MaaManager: 10, ) await self.push_notification("代理结果", title, result) + elif self.mode == "设置脚本": + ( + Path.cwd() + / f"data/{self.script_id}/{self.user_id if self.user_id else 'Default'}/ConfigFile" + ).mkdir(parents=True, exist_ok=True) + shutil.copy( + self.maa_set_path, + ( + Path.cwd() + / f"data/{self.script_id}/{self.user_id if self.user_id else 'Default'}/ConfigFile/gui.json" + ), + ) + + result_text = "" # 复原 MAA 配置文件 logger.info(f"复原 MAA 配置文件:{Path.cwd() / f'data/{self.script_id}/Temp'}") - if (Path.cwd() / f"data/{self.script_id}/Temp").exists(): + if (Path.cwd() / f"data/{self.script_id}/Temp/gui.json").exists(): shutil.copy( Path.cwd() / f"data/{self.script_id}/Temp/gui.json", self.maa_set_path ) @@ -981,7 +984,7 @@ class MaaManager: data={ "log": f"即将搜索ADB实际地址\n正在等待模拟器完成启动\n请等待{self.wait_time}s" }, - ) + ).model_dump() ) await asyncio.sleep(self.wait_time) @@ -1063,7 +1066,7 @@ class MaaManager: log = "".join(log_content) # 更新MAA日志 - if self.maa_process_manager.is_running(): + if await self.maa_process_manager.is_running(): await self.websocket.send_json( TaskMessage(type="Update", data={"log": log}).model_dump() @@ -1131,14 +1134,12 @@ class MaaManager: elif ( "MaaAssistantArknights GUI exited" in log - or not self.maa_process_manager.is_running() + or not await self.maa_process_manager.is_running() ): self.maa_result = "MAA在完成任务前退出" elif datetime.now() - latest_time > timedelta( - minutes=self.script_config.get( - "RunSet", f"{self.log_check_mode}TimeLimit" - ) + minutes=self.script_config.get("Run", f"{self.log_check_mode}TimeLimit") ): self.maa_result = "MAA进程超时" @@ -1156,7 +1157,7 @@ class MaaManager: self.maa_result = "MAA在完成任务前中止" elif ( "MaaAssistantArknights GUI exited" in log - or not self.maa_process_manager.is_running() + or not await self.maa_process_manager.is_running() ): self.maa_result = "MAA在完成任务前退出" else: @@ -1165,7 +1166,7 @@ class MaaManager: elif self.mode == "设置脚本": if ( "MaaAssistantArknights GUI exited" in log - or not self.maa_process_manager.is_running() + or not await self.maa_process_manager.is_running() ): self.maa_result = "Success!" else: @@ -1175,50 +1176,21 @@ class MaaManager: if self.maa_result != "Wait": + logger.info(f"MAA 任务结果:{self.maa_result},日志锁已释放") self.wait_event.set() - # def start_monitor(self) -> None: - # """开始监视MAA日志""" - - # logger.info( - # f"开始监视MAA日志,路径:{self.maa_log_path},日志起始时间:{self.log_start_time},模式:{self.log_check_mode}", - # , - # ) - # self.log_monitor.addPath(str(self.maa_log_path)) - # self.log_monitor_timer.start(1000) - # self.last_check_time = datetime.now() - # self.monitor_loop.exec() - - # def quit_monitor(self) -> None: - # """退出MAA日志监视进程""" - - # if len(self.log_monitor.files()) != 0: - - # logger.info( - # f"MAA日志监视器移除路径:{self.maa_log_path}", - # , - # ) - # self.log_monitor.removePath(str(self.maa_log_path)) - - # else: - # logger.warning( - # f"MAA日志监视器没有正在监看的路径:{self.log_monitor.files()}", - # , - # ) - - # self.log_monitor_timer.stop() - # self.last_check_time = None - # self.monitor_loop.quit() - - # logger.info("MAA日志监视锁已释放", ) - - async def set_maa(self, mode: str, index: int, user_data: MaaUserConfig) -> dict: + async def set_maa( + self, + mode: str, + user_data: Optional[MaaUserConfig] = None, + index: Optional[int] = None, + ) -> dict: """配置MAA运行参数""" logger.info(f"开始配置MAA运行参数: {mode}/{index}") - if "设置脚本" not in self.mode and mode != "Update": + if self.mode != "设置脚本" and mode != "Update": - if user_data.get("Info", "Server") == "Bilibili": + if user_data and user_data.get("Info", "Server") == "Bilibili": self.agree_bilibili(True) else: self.agree_bilibili(False) @@ -1228,7 +1200,7 @@ class MaaManager: await System.kill_process(self.maa_exe_path) # 预导入MAA配置文件 - if self.mode in ["自动代理", "人工排查"]: + if self.mode in ["自动代理", "人工排查"] and user_data is not None: if user_data.get("Info", "Mode") == "简洁": shutil.copy( (Path.cwd() / f"data/{self.script_id}/Default/ConfigFile/gui.json"), @@ -1288,7 +1260,12 @@ class MaaManager: data["Global"][f"Timer.Timer{i}"] = "False" # 自动代理配置 - if self.mode == "自动代理": + if ( + self.mode == "自动代理" + and mode in ["Annihilation", "Routine"] + and index is not None + and user_data is not None + ): if (index == len(self.user_list) - 1) or ( self.user_config[uuid.UUID(self.user_list[index + 1]["user_id"])].get( @@ -1392,7 +1369,7 @@ class MaaManager: user_data.get("Info", "SeriesNumb") ) # 连战次数 - if "剿灭" in mode: + if mode == "Annihilation": data["Configurations"]["Default"][ "MainFunction.Stage1" @@ -1434,7 +1411,7 @@ class MaaManager: "GUI.HideSeries" ] = "False" # 隐藏连战次数 - elif "日常" in mode: + elif mode == "Routine": data["Configurations"]["Default"]["MainFunction.Stage1"] = ( user_data.get("Info", "Stage") @@ -1541,7 +1518,7 @@ class MaaManager: ) # 自定义基建配置索引 # 人工排查配置 - elif "人工排查" in mode: + elif self.mode == "人工排查" and user_data is not None: data["Configurations"]["Default"][ "MainFunction.PostActions" @@ -1607,8 +1584,8 @@ class MaaManager: "TaskQueue.Reclamation.IsChecked" ] = "False" # 生息演算 - # 设置MAA配置 - elif "设置MAA" in mode: + # 设置脚本配置 + elif self.mode == "设置脚本": data["Configurations"]["Default"][ "MainFunction.PostActions" @@ -1659,7 +1636,7 @@ class MaaManager: "TaskQueue.Reclamation.IsChecked" ] = "False" # 生息演算 - elif mode == "更新MAA": + elif mode == "Update": data["Configurations"]["Default"][ "MainFunction.PostActions" @@ -1712,16 +1689,14 @@ class MaaManager: ] = "False" # 生息演算 # 启动模拟器仅生效一次 - if "设置MAA" not in mode and "更新MAA" not in mode and self.if_open_emulator: + if self.mode != "设置脚本" and mode != "Update" and self.if_open_emulator: self.if_open_emulator = False # 覆写配置文件 with self.maa_set_path.open(mode="w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) - logger.success( - f"MAA运行参数配置完成: {mode}/{index}", - ) + logger.success(f"MAA运行参数配置完成: {mode}/{index}") return data diff --git a/app/utils/LogMonitor.py b/app/utils/LogMonitor.py index 4442fc5..c970b83 100644 --- a/app/utils/LogMonitor.py +++ b/app/utils/LogMonitor.py @@ -1,11 +1,13 @@ import asyncio import aiofiles import os -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Callable, Optional, List, Awaitable -from loguru import logger +from .logger import get_logger + +logger = get_logger("日志监控器") class LogMonitor: @@ -22,44 +24,87 @@ class LogMonitor: self.encoding = encoding self.log_file_path: Optional[Path] = None self.log_start_time: datetime = datetime.now() + self.last_callback_time: datetime = datetime.now() self.log_contents: List[str] = [] self.task: Optional[asyncio.Task] = None async def monitor_log(self): - + """监控日志文件的主循环""" if self.log_file_path is None or not self.log_file_path.exists(): raise ValueError("Log file path is not set or does not exist.") + logger.info(f"开始监控日志文件: {self.log_file_path}") + + consecutive_errors = 0 + while True: + try: + log_contents = [] + if_log_start = False - log_contents = [] - if_log_start = False + # 检查文件是否仍然存在 + if not self.log_file_path.exists(): + logger.warning(f"日志文件不存在: {self.log_file_path}") + await asyncio.sleep(1) + continue - async with aiofiles.open( - self.log_file_path, "r", encoding=self.encoding - ) as f: - - async for line in f: - if not if_log_start: - try: - entry_time = datetime.strptime( - line[ - self.time_stamp_range[0] : self.time_stamp_range[1] - ], - self.time_format, - ) - if entry_time > self.log_start_time: - if_log_start = True + # 尝试读取文件 + try: + async with aiofiles.open( + self.log_file_path, "r", encoding=self.encoding + ) as f: + async for line in f: + if not if_log_start: + try: + entry_time = datetime.strptime( + line[ + self.time_stamp_range[ + 0 + ] : self.time_stamp_range[1] + ], + self.time_format, + ) + if entry_time > self.log_start_time: + if_log_start = True + log_contents.append(line) + except (ValueError, IndexError): + continue + else: log_contents.append(line) - except ValueError: - pass - else: - log_contents.append(line) - # 调用回调 - if log_contents != self.log_contents: - self.log_contents = log_contents - await self.callback(log_contents) + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"文件访问错误: {e}") + await asyncio.sleep(5) + continue + except UnicodeDecodeError as e: + logger.error(f"文件编码错误: {e}") + await asyncio.sleep(10) + continue + + # 调用回调 + if ( + log_contents != self.log_contents + or datetime.now() - self.last_callback_time > timedelta(minutes=1) + ): + self.log_contents = log_contents + self.last_callback_time = datetime.now() + + # 安全调用回调函数 + try: + await self.callback(log_contents) + except Exception as e: + logger.error(f"回调函数执行失败: {e}") + + except asyncio.CancelledError: + logger.info("日志监控任务被取消") + break + except Exception as e: + logger.error(f"监控日志时发生未知错误:{e}") + + consecutive_errors += 1 + wait_time = min(60, 2**consecutive_errors) + await asyncio.sleep(wait_time) + continue await asyncio.sleep(1) @@ -75,15 +120,19 @@ class LogMonitor: self.log_file_path = log_file_path self.log_start_time = start_time self.task = asyncio.create_task(self.monitor_log()) - - logger.info(f"开始监控文件: {self.log_file_path}") + logger.info(f"日志监控已启动: {self.log_file_path}") async def stop(self): """停止监控""" if self.task is not None and not self.task.done(): self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass self.log_contents = [] - logger.info(f"停止监控文件: {self.log_file_path}") self.log_file_path = None + self.task = None + logger.info(f"日志监控已停止: {self.log_file_path}") diff --git a/main.py b/main.py index 3d0a8e4..b8a80fa 100644 --- a/main.py +++ b/main.py @@ -77,6 +77,7 @@ def main(): scripts_router, plan_router, queue_router, + dispatch_router, setting_router, ) @@ -99,6 +100,7 @@ def main(): app.include_router(scripts_router) app.include_router(plan_router) app.include_router(queue_router) + app.include_router(dispatch_router) app.include_router(setting_router) uvicorn.run(app, host="0.0.0.0", port=8000)