feat: MAA调度初步上线

This commit is contained in:
DLmaster361
2025-08-09 22:17:19 +08:00
parent 135555e3ea
commit 409a7a2d03
9 changed files with 326 additions and 280 deletions

View File

@@ -27,6 +27,7 @@ from .info import router as info_router
from .scripts import router as scripts_router
from .plan import router as plan_router
from .queue import router as queue_router
from .dispatch import router as dispatch_router
from .setting import router as setting_router
__all__ = [
@@ -34,5 +35,6 @@ __all__ = [
"scripts_router",
"plan_router",
"queue_router",
"dispatch_router",
"setting_router",
]

View File

@@ -21,7 +21,8 @@
import uuid
from fastapi import APIRouter, WebSocket, Body, Path
import asyncio
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Body, Path
from app.core import Config, TaskManager
from app.models.schema import *
@@ -55,14 +56,26 @@ async def stop_task(task: DispatchIn = Body(...)) -> OutBase:
async def websocket_endpoint(
websocket: WebSocket, taskId: str = Path(..., description="要连接的任务ID")
):
await websocket.accept()
try:
uid = uuid.UUID(taskId)
except ValueError:
await websocket.close(code=1008, reason="无效的任务ID")
return
if uid in TaskManager.connection_events:
if uid in TaskManager.connection_events and uid not in TaskManager.websocket_dict:
TaskManager.websocket_dict[uid] = websocket
TaskManager.connection_events[uid].set()
while True:
try:
data = await asyncio.wait_for(websocket.receive_json(), timeout=30.0)
await Config.message_queue.put({"task_id": uid, "message": data})
except asyncio.TimeoutError:
await websocket.send_json(
TaskMessage(type="Signal", data={"Ping": "无描述"}).model_dump()
)
except WebSocketDisconnect:
TaskManager.websocket_dict.pop(uid, None)
break
else:
await websocket.close(code=1008, reason="任务不存在或已结束")

View File

@@ -578,15 +578,6 @@ class AppConfig(GlobalConfig):
def __init__(self) -> None:
super().__init__(if_save_multi_config=False)
self.log_path = Path.cwd() / "debug/app.log"
self.database_path = Path.cwd() / "data/data.db"
self.config_path = Path.cwd() / "config"
self.key_path = Path.cwd() / "data/key"
self.silence_dict: Dict[Path, datetime] = {}
self.power_sign = "NoAction"
self.if_ignore_silence = False
logger.info("")
logger.info("===================================")
logger.info("AUTO_MAA 后端应用程序")
@@ -594,10 +585,19 @@ class AppConfig(GlobalConfig):
logger.info(f"工作目录: {Path.cwd()}")
logger.info("===================================")
self.log_path = Path.cwd() / "debug/app.log"
self.database_path = Path.cwd() / "data/data.db"
self.config_path = Path.cwd() / "config"
self.key_path = Path.cwd() / "data/key"
# 检查目录
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.mkdir(parents=True, exist_ok=True)
self.message_queue = asyncio.Queue()
self.silence_dict: Dict[Path, datetime] = {}
self.power_sign = "NoAction"
self.if_ignore_silence = False
self.ScriptConfig = MultipleConfig([MaaConfig, GeneralConfig])
self.PlanConfig = MultipleConfig([MaaPlanConfig])
self.QueueConfig = MultipleConfig([QueueConfig])

View File

@@ -22,6 +22,7 @@
import uuid
import asyncio
from fastapi import WebSocket
from functools import partial
from typing import Dict, Optional
from .config import Config, MaaConfig, GeneralConfig, QueueConfig
@@ -82,8 +83,21 @@ class _TaskManager:
raise RuntimeError(f"The task {task_id} is already running.")
logger.info(f"创建任务:{task_id},模式:{mode}")
self.task_dict[task_id] = asyncio.create_task(
self.run_task(mode, task_id, actual_id)
)
self.task_dict[task_id].add_done_callback(
lambda t: asyncio.create_task(self.remove_task(t, mode, task_id))
)
# 创建任务实例并连接信号
return task_id
# @logger.catch
async def run_task(
self, mode: str, task_id: uuid.UUID, actual_id: Optional[uuid.UUID]
):
# 等待连接信号
if task_id in self.connection_events:
self.connection_events[task_id].clear()
else:
@@ -96,20 +110,6 @@ class _TaskManager:
logger.info(f"开始运行任务:{task_id},模式:{mode}")
self.task_dict[task_id] = asyncio.create_task(
self.run_task(mode, task_id, actual_id)
)
self.task_dict[task_id].add_done_callback(
lambda t: asyncio.create_task(self.remove_task(t, mode, task_id))
)
return task_id
@logger.catch
async def run_task(
self, mode: str, task_id: uuid.UUID, actual_id: Optional[uuid.UUID]
):
websocket = self.websocket_dict[task_id]
if mode == "设置脚本":
@@ -129,10 +129,13 @@ class _TaskManager:
)
return
task = asyncio.create_task(task_item.run())
task.add_done_callback(
uid = actual_id or uuid.uuid4()
self.task_dict[uid] = asyncio.create_task(task_item.run())
self.task_dict[uid].add_done_callback(
lambda t: asyncio.create_task(task_item.final_task(t))
)
self.task_dict[uid].add_done_callback(partial(self.task_dict.pop, uid))
await self.task_dict[uid]
else:
@@ -205,10 +208,14 @@ class _TaskManager:
)
continue
task = asyncio.create_task(task_item.run())
task.add_done_callback(
self.task_dict[script_id] = asyncio.create_task(task_item.run())
self.task_dict[script_id].add_done_callback(
lambda t: asyncio.create_task(task_item.final_task(t))
)
self.task_dict[script_id].add_done_callback(
partial(self.task_dict.pop, script_id)
)
await self.task_dict[script_id]
async def stop_task(self, task_id: str) -> None:
"""
@@ -249,14 +256,13 @@ class _TaskManager:
await task
except asyncio.CancelledError:
logger.info(f"任务 {task_id} 已结束")
self.task_dict.pop(task_id)
self.task_dict.pop(task_id)
websocket = self.websocket_dict.pop(task_id, None)
websocket = self.websocket_dict.get(task_id, None)
if websocket:
await websocket.send_json(
TaskMessage(type="Signal", data={"Accomplish": "无描述"}).model_dump()
)
await websocket.close()
TaskManager = _TaskManager()

View File

@@ -431,8 +431,10 @@ class ConfigBase:
for name in dir(self):
item = getattr(self, name)
if isinstance(item, ConfigItem | MultipleConfig):
if isinstance(item, ConfigItem):
item.lock()
elif isinstance(item, MultipleConfig):
await item.lock()
async def unlock(self):
"""

View File

@@ -264,10 +264,7 @@ class _SystemHandler:
pname = proc.info["name"].lower()
if any(keyword.lower() in pname for keyword in keywords):
proc.kill()
logger.info(
f"已关闭 MuMu 模拟器进程: {proc.info['name']}",
module="系统服务",
)
logger.info(f"已关闭 MuMu 模拟器进程: {proc.info['name']}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue

View File

@@ -105,7 +105,7 @@ class MaaManager:
if not self.maa_set_path.exists():
return "MAA配置文件不存在请检查MAA路径设置"
if (self.mode != "设置脚本" or self.user_id is not None) and not (
Path.cwd() / f"data/{self.script_id}/Default/gui.json"
Path.cwd() / f"data/{self.script_id}/Default/ConfigFile/gui.json"
).exists():
return "未完成 MAA 全局设置,请先设置 MAA"
return "Success!"
@@ -118,11 +118,11 @@ class MaaManager:
self.begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await self.configure()
check_result = self.check_config()
if check_result != "Success!":
logger.error(f"未通过配置检查:{check_result}")
self.check_result = self.check_config()
if self.check_result != "Success!":
logger.error(f"未通过配置检查:{self.check_result}")
await self.websocket.send_json(
TaskMessage(type="Info", data={"Error": check_result}).model_dump()
TaskMessage(type="Info", data={"Error": self.check_result}).model_dump()
)
return
@@ -145,7 +145,7 @@ class MaaManager:
}
for uid, config in self.user_config.items()
if config.get("Info", "Status")
and config.get("Info", "RemainedDay") > 0
and config.get("Info", "RemainedDay") != 0
]
self.user_list = sorted(
self.user_list,
@@ -184,7 +184,7 @@ class MaaManager:
if self.script_config.get(
"Run", "ProxyTimesLimit"
) == 0 or user_data.get("Data", "ProxyTimes") < self.script_config.get(
"RunSet", "ProxyTimesLimit"
"Run", "ProxyTimesLimit"
):
user["status"] = "运行"
await self.websocket.send_json(
@@ -201,9 +201,7 @@ class MaaManager:
)
continue
logger.info(
f"开始代理用户: {user['user_id']}",
)
logger.info(f"开始代理用户: {user['user_id']}")
# 详细模式用户首次代理需打开模拟器
if user_data.get("Info", "Mode") == "详细":
@@ -214,8 +212,8 @@ class MaaManager:
"Annihilation": bool(
user_data.get("Info", "Annihilation") == "Close"
),
"Routine": user_data.get("Info", "Mode") == "简洁"
or not user_data.get("Info", "Routine"),
"Routine": user_data.get("Info", "Mode") == "复杂"
and not user_data.get("Info", "Routine"),
}
user_logs_list = []
@@ -380,18 +378,18 @@ class MaaManager:
)
# 尝试次数循环
for i in range(user_data.get("Run", "RunTimesLimit")):
for i in range(self.script_config.get("Run", "RunTimesLimit")):
if run_book[mode]:
break
logger.info(
f"用户 {user['name']} - 模式: {mode} - 尝试次数: {i + 1}/{user_data.get('Run','RunTimesLimit')}",
f"用户 {user['name']} - 模式: {mode} - 尝试次数: {i + 1}/{self.script_config.get('Run','RunTimesLimit')}",
)
# 配置MAA
if isinstance(user_data, MaaUserConfig):
set = await self.set_maa(mode, index, user_data)
set = await self.set_maa(mode, user_data, index)
# 记录当前时间
self.log_start_time = datetime.now()
@@ -681,6 +679,7 @@ class MaaManager:
},
).model_dump()
)
await self.set_maa("Update")
subprocess.Popen(
[self.maa_exe_path],
creationflags=subprocess.CREATE_NO_WINDOW,
@@ -742,175 +741,165 @@ class MaaManager:
).model_dump()
)
# # 人工排查模式
# elif self.mode == "人工排查":
# # 人工排查模式
# elif self.mode == "人工排查":
# # 人工排查时,屏蔽静默操作
# logger.info(
# "人工排查任务开始,屏蔽静默操作",
# )
# Config.if_ignore_silence = True
# # 人工排查时,屏蔽静默操作
# logger.info(
# "人工排查任务开始,屏蔽静默操作",
# )
# Config.if_ignore_silence = True
# # 标记是否需要启动模拟器
# self.if_open_emulator = True
# # 标识排查模式
# for _ in self.user_list:
# _[0] += "_排查模式"
# # 标记是否需要启动模拟器
# self.if_open_emulator = True
# # 标识排查模式
# for _ in self.user_list:
# _[0] += "_排查模式"
# # 开始排查
# for user in self.user_list:
# # 开始排查
# for user in self.user_list:
# user_data = self.data[user[2]]["Config"]
# user_data = self.data[user[2]]["Config"]
# if self.isInterruptionRequested:
# break
# if self.isInterruptionRequested:
# break
# logger.info(f"开始排查用户: {user[0]}", )
# logger.info(f"开始排查用户: {user[0]}", )
# user[1] = "运行"
# self.update_user_list.emit(self.user_list)
# user[1] = "运行"
# self.update_user_list.emit(self.user_list)
# if user_data["Info"]["Mode"] == "详细":
# self.if_open_emulator = True
# if user_data["Info"]["Mode"] == "详细":
# self.if_open_emulator = True
# run_book = [False for _ in range(2)]
# run_book = [False for _ in range(2)]
# # 启动重试循环
# while not self.isInterruptionRequested:
# # 启动重试循环
# while not self.isInterruptionRequested:
# # 配置MAA
# self.set_maa("人工排查", user[2])
# # 配置MAA
# self.set_maa("人工排查", user[2])
# # 记录当前时间
# self.log_start_time = datetime.now()
# # 创建MAA任务
# logger.info(
# f"启动MAA进程{self.maa_exe_path}",
# ,
# )
# self.maa_process_manager.open_process(self.maa_exe_path, [], 0)
# # 记录当前时间
# self.log_start_time = datetime.now()
# # 创建MAA任务
# logger.info(
# f"启动MAA进程{self.maa_exe_path}",
# ,
# )
# self.maa_process_manager.open_process(self.maa_exe_path, [], 0)
# # 监测MAA运行状态
# self.log_check_mode = "人工排查"
# self.start_monitor()
# # 监测MAA运行状态
# self.log_check_mode = "人工排查"
# self.start_monitor()
# if self.maa_result == "Success!":
# logger.info(
# f"用户: {user[0]} - MAA进程成功登录PRTS",
# ,
# )
# run_book[0] = True
# self.update_log_text.emit("检测到MAA进程成功登录PRTS")
# else:
# logger.error(
# f"用户: {user[0]} - MAA未能正确登录到PRTS: {self.maa_result}",
# ,
# )
# self.update_log_text.emit(
# f"{self.maa_result}\n正在中止相关程序\n请等待10s"
# )
# # 无命令行中止MAA与其子程序
# logger.info(
# f"中止MAA进程{self.maa_exe_path}",
# ,
# )
# self.maa_process_manager.kill(if_force=True)
# System.kill_process(self.maa_exe_path)
# self.if_open_emulator = True
# self.sleep(10)
# if self.maa_result == "Success!":
# logger.info(
# f"用户: {user[0]} - MAA进程成功登录PRTS",
# ,
# )
# run_book[0] = True
# self.update_log_text.emit("检测到MAA进程成功登录PRTS")
# else:
# logger.error(
# f"用户: {user[0]} - MAA未能正确登录到PRTS: {self.maa_result}",
# ,
# )
# self.update_log_text.emit(
# f"{self.maa_result}\n正在中止相关程序\n请等待10s"
# )
# # 无命令行中止MAA与其子程序
# logger.info(
# f"中止MAA进程{self.maa_exe_path}",
# ,
# )
# self.maa_process_manager.kill(if_force=True)
# System.kill_process(self.maa_exe_path)
# self.if_open_emulator = True
# self.sleep(10)
# # 登录成功,结束循环
# if run_book[0]:
# break
# # 登录失败,询问是否结束循环
# elif not self.isInterruptionRequested:
# # 登录成功,结束循环
# if run_book[0]:
# break
# # 登录失败,询问是否结束循环
# elif not self.isInterruptionRequested:
# self.play_sound.emit("排查重试")
# if not self.push_question(
# "操作提示", "MAA未能正确登录到PRTS是否重试"
# ):
# break
# self.play_sound.emit("排查重试")
# if not self.push_question(
# "操作提示", "MAA未能正确登录到PRTS是否重试"
# ):
# break
# # 登录成功,录入人工排查情况
# if run_book[0] and not self.isInterruptionRequested:
# # 登录成功,录入人工排查情况
# if run_book[0] and not self.isInterruptionRequested:
# self.play_sound.emit("排查录入")
# if self.push_question(
# "操作提示", "请检查用户代理情况,该用户是否正确完成代理任务?"
# ):
# run_book[1] = True
# self.play_sound.emit("排查录入")
# if self.push_question(
# "操作提示", "请检查用户代理情况,该用户是否正确完成代理任务?"
# ):
# run_book[1] = True
# # 结果录入
# if run_book[0] and run_book[1]:
# logger.info(
# f"用户 {user[0]} 通过人工排查",
# )
# user_data["Data"]["IfPassCheck"] = True
# user[1] = "完成"
# else:
# logger.info(
# f"用户 {user[0]} 未通过人工排查",
# ,
# )
# user_data["Data"]["IfPassCheck"] = False
# user[1] = "异常"
# # 结果录入
# if run_book[0] and run_book[1]:
# logger.info(
# f"用户 {user[0]} 通过人工排查",
# )
# user_data["Data"]["IfPassCheck"] = True
# user[1] = "完成"
# else:
# logger.info(
# f"用户 {user[0]} 未通过人工排查",
# ,
# )
# user_data["Data"]["IfPassCheck"] = False
# user[1] = "异常"
# self.update_user_list.emit(self.user_list)
# self.update_user_list.emit(self.user_list)
# # 解除静默操作屏蔽
# logger.info(
# "人工排查任务结束,解除静默操作屏蔽",
# )
# Config.if_ignore_silence = False
# # 解除静默操作屏蔽
# logger.info(
# "人工排查任务结束,解除静默操作屏蔽",
# )
# Config.if_ignore_silence = False
# # 设置MAA模式
# elif "设置MAA" in self.mode:
# 设置MAA模式
elif self.mode == "设置脚本":
# # 配置MAA
# self.set_maa(self.mode, "")
# # 创建MAA任务
# logger.info(
# f"启动MAA进程{self.maa_exe_path}",
# )
# self.maa_process_manager.open_process(self.maa_exe_path, [], 0)
# # 记录当前时间
# self.log_start_time = datetime.now()
# 配置MAA
await self.set_maa(self.mode)
# 创建MAA任务
logger.info(f"启动MAA进程{self.maa_exe_path}")
await self.maa_process_manager.open_process(self.maa_exe_path, [], 0)
# 记录当前时间
self.log_start_time = datetime.now()
# # 监测MAA运行状态
# self.log_check_mode = "设置MAA"
# self.start_monitor()
# if "全局" in self.mode:
# (self.config_path / "Default").mkdir(parents=True, exist_ok=True)
# shutil.copy(self.maa_set_path, self.config_path / "Default")
# logger.success(
# f"全局MAA配置文件已保存到 {self.config_path / 'Default/gui.json'}",
# ,
# )
# elif "用户" in self.mode:
# self.user_config_path.mkdir(parents=True, exist_ok=True)
# shutil.copy(self.maa_set_path, self.user_config_path)
# logger.success(
# f"用户MAA配置文件已保存到 {self.user_config_path}",
# ,
# )
# result_text = ""
# 监测MAA运行状态
await self.maa_log_monitor.start(self.maa_log_path, self.log_start_time)
self.wait_event.clear()
await self.wait_event.wait()
async def final_task(self, task: asyncio.Task):
logger.info("MAA 主任务已结束,开始执行后续操作")
await Config.ScriptConfig[self.script_id].unlock()
logger.success(f"已解锁脚本配置 {self.script_id}")
# 结束各子任务
await self.maa_process_manager.kill(if_force=True)
await System.kill_process(self.maa_exe_path)
await self.emulator_process_manager.kill()
await self.maa_log_monitor.stop()
del self.maa_process_manager
del self.emulator_process_manager
del self.maa_log_monitor
if self.check_result != "Success!":
return self.check_result
# 导出结果
if self.mode in ["自动代理", "人工排查"]:
# 结束各子任务
await self.maa_process_manager.kill(if_force=True)
await System.kill_process(self.maa_exe_path)
await self.emulator_process_manager.kill()
await self.maa_log_monitor.stop()
# 更新用户数据
sc = Config.ScriptConfig[self.script_id]
if isinstance(sc, MaaConfig):
@@ -960,10 +949,24 @@ class MaaManager:
10,
)
await self.push_notification("代理结果", title, result)
elif self.mode == "设置脚本":
(
Path.cwd()
/ f"data/{self.script_id}/{self.user_id if self.user_id else 'Default'}/ConfigFile"
).mkdir(parents=True, exist_ok=True)
shutil.copy(
self.maa_set_path,
(
Path.cwd()
/ f"data/{self.script_id}/{self.user_id if self.user_id else 'Default'}/ConfigFile/gui.json"
),
)
result_text = ""
# 复原 MAA 配置文件
logger.info(f"复原 MAA 配置文件:{Path.cwd() / f'data/{self.script_id}/Temp'}")
if (Path.cwd() / f"data/{self.script_id}/Temp").exists():
if (Path.cwd() / f"data/{self.script_id}/Temp/gui.json").exists():
shutil.copy(
Path.cwd() / f"data/{self.script_id}/Temp/gui.json", self.maa_set_path
)
@@ -981,7 +984,7 @@ class MaaManager:
data={
"log": f"即将搜索ADB实际地址\n正在等待模拟器完成启动\n请等待{self.wait_time}s"
},
)
).model_dump()
)
await asyncio.sleep(self.wait_time)
@@ -1063,7 +1066,7 @@ class MaaManager:
log = "".join(log_content)
# 更新MAA日志
if self.maa_process_manager.is_running():
if await self.maa_process_manager.is_running():
await self.websocket.send_json(
TaskMessage(type="Update", data={"log": log}).model_dump()
@@ -1131,14 +1134,12 @@ class MaaManager:
elif (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
or not await self.maa_process_manager.is_running()
):
self.maa_result = "MAA在完成任务前退出"
elif datetime.now() - latest_time > timedelta(
minutes=self.script_config.get(
"RunSet", f"{self.log_check_mode}TimeLimit"
)
minutes=self.script_config.get("Run", f"{self.log_check_mode}TimeLimit")
):
self.maa_result = "MAA进程超时"
@@ -1156,7 +1157,7 @@ class MaaManager:
self.maa_result = "MAA在完成任务前中止"
elif (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
or not await self.maa_process_manager.is_running()
):
self.maa_result = "MAA在完成任务前退出"
else:
@@ -1165,7 +1166,7 @@ class MaaManager:
elif self.mode == "设置脚本":
if (
"MaaAssistantArknights GUI exited" in log
or not self.maa_process_manager.is_running()
or not await self.maa_process_manager.is_running()
):
self.maa_result = "Success!"
else:
@@ -1175,50 +1176,21 @@ class MaaManager:
if self.maa_result != "Wait":
logger.info(f"MAA 任务结果:{self.maa_result},日志锁已释放")
self.wait_event.set()
# def start_monitor(self) -> None:
# """开始监视MAA日志"""
# logger.info(
# f"开始监视MAA日志路径{self.maa_log_path},日志起始时间:{self.log_start_time},模式:{self.log_check_mode}",
# ,
# )
# self.log_monitor.addPath(str(self.maa_log_path))
# self.log_monitor_timer.start(1000)
# self.last_check_time = datetime.now()
# self.monitor_loop.exec()
# def quit_monitor(self) -> None:
# """退出MAA日志监视进程"""
# if len(self.log_monitor.files()) != 0:
# logger.info(
# f"MAA日志监视器移除路径{self.maa_log_path}",
# ,
# )
# self.log_monitor.removePath(str(self.maa_log_path))
# else:
# logger.warning(
# f"MAA日志监视器没有正在监看的路径{self.log_monitor.files()}",
# ,
# )
# self.log_monitor_timer.stop()
# self.last_check_time = None
# self.monitor_loop.quit()
# logger.info("MAA日志监视锁已释放", )
async def set_maa(self, mode: str, index: int, user_data: MaaUserConfig) -> dict:
async def set_maa(
self,
mode: str,
user_data: Optional[MaaUserConfig] = None,
index: Optional[int] = None,
) -> dict:
"""配置MAA运行参数"""
logger.info(f"开始配置MAA运行参数: {mode}/{index}")
if "设置脚本" not in self.mode and mode != "Update":
if self.mode != "设置脚本" and mode != "Update":
if user_data.get("Info", "Server") == "Bilibili":
if user_data and user_data.get("Info", "Server") == "Bilibili":
self.agree_bilibili(True)
else:
self.agree_bilibili(False)
@@ -1228,7 +1200,7 @@ class MaaManager:
await System.kill_process(self.maa_exe_path)
# 预导入MAA配置文件
if self.mode in ["自动代理", "人工排查"]:
if self.mode in ["自动代理", "人工排查"] and user_data is not None:
if user_data.get("Info", "Mode") == "简洁":
shutil.copy(
(Path.cwd() / f"data/{self.script_id}/Default/ConfigFile/gui.json"),
@@ -1288,7 +1260,12 @@ class MaaManager:
data["Global"][f"Timer.Timer{i}"] = "False"
# 自动代理配置
if self.mode == "自动代理":
if (
self.mode == "自动代理"
and mode in ["Annihilation", "Routine"]
and index is not None
and user_data is not None
):
if (index == len(self.user_list) - 1) or (
self.user_config[uuid.UUID(self.user_list[index + 1]["user_id"])].get(
@@ -1392,7 +1369,7 @@ class MaaManager:
user_data.get("Info", "SeriesNumb")
) # 连战次数
if "剿灭" in mode:
if mode == "Annihilation":
data["Configurations"]["Default"][
"MainFunction.Stage1"
@@ -1434,7 +1411,7 @@ class MaaManager:
"GUI.HideSeries"
] = "False" # 隐藏连战次数
elif "日常" in mode:
elif mode == "Routine":
data["Configurations"]["Default"]["MainFunction.Stage1"] = (
user_data.get("Info", "Stage")
@@ -1541,7 +1518,7 @@ class MaaManager:
) # 自定义基建配置索引
# 人工排查配置
elif "人工排查" in mode:
elif self.mode == "人工排查" and user_data is not None:
data["Configurations"]["Default"][
"MainFunction.PostActions"
@@ -1607,8 +1584,8 @@ class MaaManager:
"TaskQueue.Reclamation.IsChecked"
] = "False" # 生息演算
# 设置MAA配置
elif "设置MAA" in mode:
# 设置脚本配置
elif self.mode == "设置脚本":
data["Configurations"]["Default"][
"MainFunction.PostActions"
@@ -1659,7 +1636,7 @@ class MaaManager:
"TaskQueue.Reclamation.IsChecked"
] = "False" # 生息演算
elif mode == "更新MAA":
elif mode == "Update":
data["Configurations"]["Default"][
"MainFunction.PostActions"
@@ -1712,16 +1689,14 @@ class MaaManager:
] = "False" # 生息演算
# 启动模拟器仅生效一次
if "设置MAA" not in mode and "更新MAA" not in mode and self.if_open_emulator:
if self.mode != "设置脚本" and mode != "Update" and self.if_open_emulator:
self.if_open_emulator = False
# 覆写配置文件
with self.maa_set_path.open(mode="w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
logger.success(
f"MAA运行参数配置完成: {mode}/{index}",
)
logger.success(f"MAA运行参数配置完成: {mode}/{index}")
return data

View File

@@ -1,11 +1,13 @@
import asyncio
import aiofiles
import os
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Callable, Optional, List, Awaitable
from loguru import logger
from .logger import get_logger
logger = get_logger("日志监控器")
class LogMonitor:
@@ -22,44 +24,87 @@ class LogMonitor:
self.encoding = encoding
self.log_file_path: Optional[Path] = None
self.log_start_time: datetime = datetime.now()
self.last_callback_time: datetime = datetime.now()
self.log_contents: List[str] = []
self.task: Optional[asyncio.Task] = None
async def monitor_log(self):
"""监控日志文件的主循环"""
if self.log_file_path is None or not self.log_file_path.exists():
raise ValueError("Log file path is not set or does not exist.")
logger.info(f"开始监控日志文件: {self.log_file_path}")
consecutive_errors = 0
while True:
try:
log_contents = []
if_log_start = False
log_contents = []
if_log_start = False
# 检查文件是否仍然存在
if not self.log_file_path.exists():
logger.warning(f"日志文件不存在: {self.log_file_path}")
await asyncio.sleep(1)
continue
async with aiofiles.open(
self.log_file_path, "r", encoding=self.encoding
) as f:
async for line in f:
if not if_log_start:
try:
entry_time = datetime.strptime(
line[
self.time_stamp_range[0] : self.time_stamp_range[1]
],
self.time_format,
)
if entry_time > self.log_start_time:
if_log_start = True
# 尝试读取文件
try:
async with aiofiles.open(
self.log_file_path, "r", encoding=self.encoding
) as f:
async for line in f:
if not if_log_start:
try:
entry_time = datetime.strptime(
line[
self.time_stamp_range[
0
] : self.time_stamp_range[1]
],
self.time_format,
)
if entry_time > self.log_start_time:
if_log_start = True
log_contents.append(line)
except (ValueError, IndexError):
continue
else:
log_contents.append(line)
except ValueError:
pass
else:
log_contents.append(line)
# 调用回调
if log_contents != self.log_contents:
self.log_contents = log_contents
await self.callback(log_contents)
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"文件访问错误: {e}")
await asyncio.sleep(5)
continue
except UnicodeDecodeError as e:
logger.error(f"文件编码错误: {e}")
await asyncio.sleep(10)
continue
# 调用回调
if (
log_contents != self.log_contents
or datetime.now() - self.last_callback_time > timedelta(minutes=1)
):
self.log_contents = log_contents
self.last_callback_time = datetime.now()
# 安全调用回调函数
try:
await self.callback(log_contents)
except Exception as e:
logger.error(f"回调函数执行失败: {e}")
except asyncio.CancelledError:
logger.info("日志监控任务被取消")
break
except Exception as e:
logger.error(f"监控日志时发生未知错误:{e}")
consecutive_errors += 1
wait_time = min(60, 2**consecutive_errors)
await asyncio.sleep(wait_time)
continue
await asyncio.sleep(1)
@@ -75,15 +120,19 @@ class LogMonitor:
self.log_file_path = log_file_path
self.log_start_time = start_time
self.task = asyncio.create_task(self.monitor_log())
logger.info(f"开始监控文件: {self.log_file_path}")
logger.info(f"日志监控已启动: {self.log_file_path}")
async def stop(self):
"""停止监控"""
if self.task is not None and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.log_contents = []
logger.info(f"停止监控文件: {self.log_file_path}")
self.log_file_path = None
self.task = None
logger.info(f"日志监控已停止: {self.log_file_path}")