From 911eb60ae951bfd0cce99338e85032130e784750 Mon Sep 17 00:00:00 2001 From: DLmaster361 Date: Tue, 5 Aug 2025 00:46:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E9=98=9F=E5=88=97?= =?UTF-8?q?=E7=9B=B8=E5=85=B3API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/__init__.py | 3 +- app/api/queue.py | 139 +++++++ app/core/config.py | 867 +++++++-------------------------------- app/main.py | 64 +-- app/models/ConfigBase.py | 16 + app/models/schema.py | 55 +++ 6 files changed, 394 insertions(+), 750 deletions(-) create mode 100644 app/api/queue.py diff --git a/app/api/__init__.py b/app/api/__init__.py index 247175d..ded081d 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -23,6 +23,7 @@ __author__ = "DLmaster361 " __license__ = "GPL-3.0 license" from .scripts import router as scripts_router +from .queue import router as queue_router from .setting import router as setting_router -__all__ = ["scripts_router", "setting_router"] +__all__ = ["scripts_router", "queue_router", "setting_router"] diff --git a/app/api/queue.py b/app/api/queue.py new file mode 100644 index 0000000..8cdec6e --- /dev/null +++ b/app/api/queue.py @@ -0,0 +1,139 @@ +# AUTO_MAA:A MAA Multi Account Management and Automation Tool +# Copyright © 2024-2025 DLmaster361 + +# This file is part of AUTO_MAA. + +# AUTO_MAA is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. + +# AUTO_MAA is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See +# the GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with AUTO_MAA. If not, see . + +# Contact: DLmaster_361@163.com + + +from fastapi import APIRouter, Body + +from core import Config +from models.schema import * + +router = APIRouter(prefix="/api/queue", tags=["调度队列管理"]) + + +@router.post( + "/add", summary="添加调度队列", response_model=QueueCreateOut, status_code=200 +) +async def add_queue() -> QueueCreateOut: + + uid, config = await Config.add_queue() + return QueueCreateOut(queueId=str(uid), data=await config.toDict()) + + +@router.post( + "/get", summary="查询调度队列配置信息", response_model=QueueGetOut, status_code=200 +) +async def get_queues(queue: QueueGetIn = Body(...)) -> QueueGetOut: + + try: + index, data = await Config.get_queue(queue.queueId) + except Exception as e: + return QueueGetOut(code=500, status="error", message=str(e), index=[], data={}) + return QueueGetOut(index=index, data=data) + + +@router.post( + "/update", summary="更新调度队列配置信息", response_model=OutBase, status_code=200 +) +async def update_queue(queue: QueueUpdateIn = Body(...)) -> OutBase: + + try: + await Config.update_queue(queue.queueId, queue.data) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() + + +@router.post("/delete", summary="删除调度队列", response_model=OutBase, status_code=200) +async def delete_queue(queue: QueueDeleteIn = Body(...)) -> OutBase: + + try: + await Config.del_queue(queue.queueId) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() + + +@router.post( + "/time/add", summary="添加定时项", response_model=TimeSetCreateOut, status_code=200 +) +async def add_time_set(time: QueueSetInBase = Body(...)) -> TimeSetCreateOut: + + uid, config = await Config.add_time_set(time.queueId) + return TimeSetCreateOut(timeSetId=str(uid), data=await config.toDict()) + + +@router.post( + "/time/update", summary="更新定时项", response_model=OutBase, status_code=200 +) +async def update_time_set(time: TimeSetUpdateIn = Body(...)) -> OutBase: + + try: + await Config.update_time_set(time.queueId, time.timeSetId, time.data) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() + + +@router.post( + "/time/delete", summary="删除定时项", response_model=OutBase, status_code=200 +) +async def delete_time_set(time: TimeSetDeleteIn = Body(...)) -> OutBase: + + try: + await Config.del_time_set(time.queueId, time.timeSetId) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() + + +@router.post( + "/item/add", + summary="添加队列项", + response_model=QueueItemCreateOut, + status_code=200, +) +async def add_item(item: QueueSetInBase = Body(...)) -> QueueItemCreateOut: + + uid, config = await Config.add_queue_item(item.queueId) + return QueueItemCreateOut(queueItemId=str(uid), data=await config.toDict()) + + +@router.post( + "/item/update", summary="更新队列项", response_model=OutBase, status_code=200 +) +async def update_item(item: QueueItemUpdateIn = Body(...)) -> OutBase: + + try: + await Config.update_queue_item(item.queueId, item.queueItemId, item.data) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() + + +@router.post( + "/item/delete", summary="删除队列项", response_model=OutBase, status_code=200 +) +async def delete_item(item: QueueItemDeleteIn = Body(...)) -> OutBase: + + try: + await Config.del_queue_item(item.queueId, item.queueItemId) + except Exception as e: + return OutBase(code=500, status="error", message=str(e)) + return OutBase() diff --git a/app/core/config.py b/app/core/config.py index 8ebc8ef..023f087 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -121,7 +121,7 @@ class QueueItem(ConfigBase): def __init__(self) -> None: super().__init__() - self.Info_ScriptId = ConfigItem("Info", "ScriptId", "") + self.Info_ScriptId = ConfigItem("Info", "ScriptId", "", UidValidator()) class TimeSet(ConfigBase): @@ -352,9 +352,6 @@ class MaaConfig(ConfigBase): self.UserData = MultipleConfig([MaaUserConfig]) - # def get_name(self) -> str: - # return self.get(self.MaaSet_Name) - class MaaPlanConfig(ConfigBase): """MAA计划表配置""" @@ -501,10 +498,7 @@ class GeneralConfig(ConfigBase): "Script", "ConfigPath", ".", FileValidator() ) self.Script_ConfigPathMode = ConfigItem( - "Script", - "ConfigPathMode", - "所有文件 (*)", - OptionsValidator(["所有文件 (*)", "文件夹"]), + "Script", "ConfigPathMode", "File", OptionsValidator(["File", "Folder"]) ) self.Script_UpdateConfigMode = ConfigItem( "Script", @@ -549,9 +543,6 @@ class GeneralConfig(ConfigBase): self.UserData = MultipleConfig([GeneralUserConfig]) - # def get_name(self) -> str: - # return self.get(self.Script_Name) - class AppConfig(GlobalConfig): @@ -602,11 +593,6 @@ class AppConfig(GlobalConfig): self.PlanConfig = MultipleConfig([MaaPlanConfig]) self.QueueConfig = MultipleConfig([QueueConfig]) - import asyncio - - loop = asyncio.get_event_loop() - loop.run_until_complete(self.init_config()) - async def init_config(self) -> None: """初始化配置管理""" await self.connect(self.config_path / "Config.json") @@ -716,6 +702,148 @@ class AppConfig(GlobalConfig): await script_config.UserData.remove(uid) await self.ScriptConfig.save() + async def add_queue(self) -> tuple[uuid.UUID, ConfigBase]: + """添加调度队列""" + + self.logger.info("添加调度队列") + + return await self.QueueConfig.add(QueueConfig) + + async def get_queue(self, queue_id: Optional[str]) -> tuple[list, dict]: + """获取调度队列配置""" + + self.logger.info(f"获取调度队列配置:{queue_id}") + + if queue_id is None: + data = await self.QueueConfig.toDict() + else: + data = await self.QueueConfig.get(uuid.UUID(queue_id)) + + index = data.pop("instances", []) + + return list(index), data + + async def update_queue( + self, queue_id: str, data: Dict[str, Dict[str, Any]] + ) -> None: + """更新调度队列配置""" + + self.logger.info(f"更新调度队列配置:{queue_id}") + + uid = uuid.UUID(queue_id) + + for group, items in data.items(): + for name, value in items.items(): + self.logger.debug( + f"更新调度队列配置:{queue_id} - {group}.{name} = {value}" + ) + await self.QueueConfig[uid].set(group, name, value) + + await self.QueueConfig.save() + + async def del_queue(self, queue_id: str) -> None: + """删除调度队列配置""" + + self.logger.info(f"删除调度队列配置:{queue_id}") + + await self.QueueConfig.remove(uuid.UUID(queue_id)) + + async def add_time_set(self, queue_id: str) -> tuple[uuid.UUID, ConfigBase]: + """添加时间设置配置""" + + self.logger.info(f"{queue_id} 添加时间设置配置") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + + if isinstance(queue_config, QueueConfig): + uid, config = await queue_config.TimeSet.add(TimeSet) + else: + raise TypeError(f"Unsupported script config type: {type(queue_config)}") + + await self.QueueConfig.save() + return uid, config + + async def update_time_set( + self, queue_id: str, time_set_id: str, data: Dict[str, Dict[str, Any]] + ) -> None: + """更新时间设置配置""" + + self.logger.info(f"{queue_id} 更新时间设置配置:{time_set_id}") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + uid = uuid.UUID(time_set_id) + + for group, items in data.items(): + for name, value in items.items(): + self.logger.debug( + f"更新时间设置配置:{queue_id} - {group}.{name} = {value}" + ) + if isinstance(queue_config, QueueConfig): + await queue_config.TimeSet[uid].set(group, name, value) + + await self.QueueConfig.save() + + async def del_time_set(self, queue_id: str, time_set_id: str) -> None: + """删除时间设置配置""" + + self.logger.info(f"{queue_id} 删除时间设置配置:{time_set_id}") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + uid = uuid.UUID(time_set_id) + + if isinstance(queue_config, QueueConfig): + await queue_config.TimeSet.remove(uid) + await self.QueueConfig.save() + + async def add_queue_item(self, queue_id: str) -> tuple[uuid.UUID, ConfigBase]: + """添加队列项配置""" + + self.logger.info(f"{queue_id} 添加队列项配置") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + + if isinstance(queue_config, QueueConfig): + uid, config = await queue_config.QueueItem.add(QueueItem) + else: + raise TypeError(f"Unsupported script config type: {type(queue_config)}") + + await self.QueueConfig.save() + return uid, config + + async def update_queue_item( + self, queue_id: str, queue_item_id: str, data: Dict[str, Dict[str, Any]] + ) -> None: + """更新队列项配置""" + + self.logger.info(f"{queue_id} 更新队列项配置:{queue_item_id}") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + uid = uuid.UUID(queue_item_id) + + for group, items in data.items(): + for name, value in items.items(): + if uuid.UUID(value) not in self.ScriptConfig: + raise ValueError(f"Script with uid {value} does not exist.") + self.logger.debug( + f"更新队列项配置:{queue_id} - {group}.{name} = {value}" + ) + if isinstance(queue_config, QueueConfig): + await queue_config.QueueItem[uid].set(group, name, value) + + await self.QueueConfig.save() + + async def del_queue_item(self, queue_id: str, queue_item_id: str) -> None: + """删除队列项配置""" + + self.logger.info(f"{queue_id} 删除队列项配置:{queue_item_id}") + + queue_config = self.QueueConfig[uuid.UUID(queue_id)] + uid = uuid.UUID(queue_item_id) + + if isinstance(queue_config, QueueConfig): + await queue_config.QueueItem.remove(uid) + await self.QueueConfig.save() + async def get_setting(self) -> Dict[str, Any]: """获取全局设置""" @@ -733,87 +861,6 @@ class AppConfig(GlobalConfig): self.logger.debug(f"更新全局设置 - {group}.{name} = {value}") await self.set(group, name, value) - # def get_stage(self) -> None: - # """从MAA服务器更新活动关卡信息""" - - # logger.info("开始获取活动关卡信息", module="配置管理") - # network = Network.add_task( - # mode="get", - # url="https://api.maa.plus/MaaAssistantArknights/api/gui/StageActivity.json", - # ) - # network.loop.exec() - # network_result = Network.get_result(network) - # if network_result["status_code"] == 200: - # stage_infos: List[Dict[str, Union[str, Dict[str, Union[str, int]]]]] = ( - # network_result["response_json"]["Official"]["sideStoryStage"] - # ) - # else: - # logger.warning( - # f"无法从MAA服务器获取活动关卡信息:{network_result['error_message']}", - # module="配置管理", - # ) - # stage_infos = [] - - # ss_stage_dict = {"value": [], "text": []} - - # for stage_info in stage_infos: - - # if ( - # datetime.strptime( - # stage_info["Activity"]["UtcStartTime"], "%Y/%m/%d %H:%M:%S" - # ) - # < datetime.now() - # < datetime.strptime( - # stage_info["Activity"]["UtcExpireTime"], "%Y/%m/%d %H:%M:%S" - # ) - # ): - # ss_stage_dict["value"].append(stage_info["Value"]) - # ss_stage_dict["text"].append(stage_info["Value"]) - - # # 生成每日关卡信息 - # stage_daily_info = [ - # {"value": "-", "text": "当前/上次", "days": [1, 2, 3, 4, 5, 6, 7]}, - # {"value": "1-7", "text": "1-7", "days": [1, 2, 3, 4, 5, 6, 7]}, - # {"value": "R8-11", "text": "R8-11", "days": [1, 2, 3, 4, 5, 6, 7]}, - # { - # "value": "12-17-HARD", - # "text": "12-17-HARD", - # "days": [1, 2, 3, 4, 5, 6, 7], - # }, - # {"value": "CE-6", "text": "龙门币-6/5", "days": [2, 4, 6, 7]}, - # {"value": "AP-5", "text": "红票-5", "days": [1, 4, 6, 7]}, - # {"value": "CA-5", "text": "技能-5", "days": [2, 3, 5, 7]}, - # {"value": "LS-6", "text": "经验-6/5", "days": [1, 2, 3, 4, 5, 6, 7]}, - # {"value": "SK-5", "text": "碳-5", "days": [1, 3, 5, 6]}, - # {"value": "PR-A-1", "text": "奶/盾芯片", "days": [1, 4, 5, 7]}, - # {"value": "PR-A-2", "text": "奶/盾芯片组", "days": [1, 4, 5, 7]}, - # {"value": "PR-B-1", "text": "术/狙芯片", "days": [1, 2, 5, 6]}, - # {"value": "PR-B-2", "text": "术/狙芯片组", "days": [1, 2, 5, 6]}, - # {"value": "PR-C-1", "text": "先/辅芯片", "days": [3, 4, 6, 7]}, - # {"value": "PR-C-2", "text": "先/辅芯片组", "days": [3, 4, 6, 7]}, - # {"value": "PR-D-1", "text": "近/特芯片", "days": [2, 3, 6, 7]}, - # {"value": "PR-D-2", "text": "近/特芯片组", "days": [2, 3, 6, 7]}, - # ] - - # for day in range(0, 8): - - # today_stage_dict = {"value": [], "text": []} - - # for stage_info in stage_daily_info: - - # if day in stage_info["days"] or day == 0: - # today_stage_dict["value"].append(stage_info["value"]) - # today_stage_dict["text"].append(stage_info["text"]) - - # self.stage_dict[calendar.day_name[day - 1] if day > 0 else "ALL"] = { - # "value": today_stage_dict["value"] + ss_stage_dict["value"], - # "text": today_stage_dict["text"] + ss_stage_dict["text"], - # } - - # self.stage_refreshed.emit() - - # logger.success("活动关卡信息更新完成", module="配置管理") - def server_date(self) -> date: """ 获取当前的服务器日期 @@ -828,630 +875,4 @@ class AppConfig(GlobalConfig): return dt.date() -# def search_maa_user(self, name: str) -> None: -# """ -# 更新指定 MAA 脚本实例的用户信息 - -# :param name: 脚本实例名称 -# :type name: str -# """ - -# logger.info(f"开始搜索并读入 MAA 脚本实例 {name} 的用户信息", module="配置管理") - -# user_dict: Dict[str, Dict[str, Union[Path, MaaUserConfig]]] = {} -# for user_dir in (Config.script_dict[name]["Path"] / "UserData").iterdir(): -# if user_dir.is_dir(): - -# user_config = MaaUserConfig() -# user_config.load(user_dir / "config.json", user_config) -# user_config.save() - -# user_dict[user_dir.stem] = {"Path": user_dir, "Config": user_config} - -# self.script_dict[name]["UserData"] = dict( -# sorted(user_dict.items(), key=lambda x: int(x[0][3:])) -# ) - -# logger.success( -# f"MAA 脚本实例 {name} 的用户信息搜索完成,共找到 {len(user_dict)} 个用户", -# module="配置管理", -# ) - -# def search_general_sub(self, name: str) -> None: -# """ -# 更新指定通用脚本实例的子配置信息 - -# :param name: 脚本实例名称 -# :type name: str -# """ - -# logger.info( -# f"开始搜索并读入通用脚本实例 {name} 的子配置信息", module="配置管理" -# ) - -# user_dict: Dict[str, Dict[str, Union[Path, GeneralSubConfig]]] = {} -# for sub_dir in (Config.script_dict[name]["Path"] / "SubData").iterdir(): -# if sub_dir.is_dir(): - -# sub_config = GeneralSubConfig() -# sub_config.load(sub_dir / "config.json", sub_config) -# sub_config.save() - -# user_dict[sub_dir.stem] = {"Path": sub_dir, "Config": sub_config} - -# self.script_dict[name]["SubData"] = dict( -# sorted(user_dict.items(), key=lambda x: int(x[0][3:])) -# ) - -# logger.success( -# f"通用脚本实例 {name} 的子配置信息搜索完成,共找到 {len(user_dict)} 个子配置", -# module="配置管理", -# ) - -# def search_plan(self) -> None: -# """更新计划表配置信息""" - -# logger.info("开始搜索并读入计划表配置", module="配置管理") - -# self.plan_dict: Dict[str, Dict[str, Union[str, Path, MaaPlanConfig]]] = {} -# if (self.app_path / "config/MaaPlanConfig").exists(): -# for maa_plan_dir in (self.app_path / "config/MaaPlanConfig").iterdir(): -# if maa_plan_dir.is_dir(): - -# maa_plan_config = MaaPlanConfig() -# maa_plan_config.load(maa_plan_dir / "config.json", maa_plan_config) -# maa_plan_config.save() - -# self.plan_dict[maa_plan_dir.name] = { -# "Type": "Maa", -# "Path": maa_plan_dir, -# "Config": maa_plan_config, -# } - -# self.plan_dict = dict( -# sorted(self.plan_dict.items(), key=lambda x: int(x[0][3:])) -# ) - -# logger.success( -# f"计划表配置搜索完成,共找到 {len(self.plan_dict)} 个计划表", -# module="配置管理", -# ) - -# def search_queue(self): -# """更新调度队列实例配置信息""" - -# logger.info("开始搜索并读入调度队列配置", module="配置管理") - -# self.queue_dict: Dict[str, Dict[str, Union[Path, QueueConfig]]] = {} - -# if (self.app_path / "config/QueueConfig").exists(): -# for json_file in (self.app_path / "config/QueueConfig").glob("*.json"): - -# queue_config = QueueConfig() -# queue_config.load(json_file, queue_config) -# queue_config.save() - -# self.queue_dict[json_file.stem] = { -# "Path": json_file, -# "Config": queue_config, -# } - -# self.queue_dict = dict( -# sorted(self.queue_dict.items(), key=lambda x: int(x[0][5:])) -# ) - -# logger.success( -# f"调度队列配置搜索完成,共找到 {len(self.queue_dict)} 个调度队列", -# module="配置管理", -# ) - -# def change_queue(self, old: str, new: str) -> None: -# """ -# 修改调度队列配置文件的队列参数 - -# :param old: 旧脚本名 -# :param new: 新脚本名 -# """ - -# logger.info(f"开始修改调度队列参数:{old} -> {new}", module="配置管理") - -# for queue in self.queue_dict.values(): - -# for i in range(10): - -# if ( -# queue["Config"].get( -# queue["Config"].config_item_dict["Queue"][f"Script_{i}"] -# ) -# == old -# ): -# queue["Config"].set( -# queue["Config"].config_item_dict["Queue"][f"Script_{i}"], new -# ) - -# logger.success(f"调度队列参数修改完成:{old} -> {new}", module="配置管理") - -# def change_plan(self, old: str, new: str) -> None: -# """ -# 修改脚本管理所有下属用户的计划表配置参数 - -# :param old: 旧计划表名 -# :param new: 新计划表名 -# """ - -# logger.info(f"开始修改计划表参数:{old} -> {new}", module="配置管理") - -# for script in self.script_dict.values(): - -# for user in script["UserData"].values(): - -# if user["Config"].get(user["Config"].Info_StageMode) == old: -# user["Config"].set(user["Config"].Info_StageMode, new) - -# logger.success(f"计划表参数修改完成:{old} -> {new}", module="配置管理") - -# def change_maa_user_info( -# self, name: str, user_data: Dict[str, Dict[str, Union[str, Path, dict]]] -# ) -> None: -# """ -# 保存代理完成后发生改动的用户信息 - -# :param name: 脚本实例名称 -# :type name: str -# :param user_data: 用户信息字典,包含用户名称和对应的配置信息 -# :type user_data: Dict[str, Dict[str, Union[str, Path, dict]]] -# """ - -# logger.info(f"开始保存 MAA 脚本实例 {name} 的用户信息变动", module="配置管理") - -# for user, info in user_data.items(): - -# user_config = self.script_dict[name]["UserData"][user]["Config"] - -# user_config.set( -# user_config.Info_RemainedDay, info["Config"]["Info"]["RemainedDay"] -# ) -# user_config.set( -# user_config.Data_LastProxyDate, info["Config"]["Data"]["LastProxyDate"] -# ) -# user_config.set( -# user_config.Data_LastAnnihilationDate, -# info["Config"]["Data"]["LastAnnihilationDate"], -# ) -# user_config.set( -# user_config.Data_LastSklandDate, -# info["Config"]["Data"]["LastSklandDate"], -# ) -# user_config.set( -# user_config.Data_ProxyTimes, info["Config"]["Data"]["ProxyTimes"] -# ) -# user_config.set( -# user_config.Data_IfPassCheck, info["Config"]["Data"]["IfPassCheck"] -# ) -# user_config.set( -# user_config.Data_CustomInfrastPlanIndex, -# info["Config"]["Data"]["CustomInfrastPlanIndex"], -# ) - -# self.sub_info_changed.emit() - -# logger.success(f"MAA 脚本实例 {name} 的用户信息变动保存完成", module="配置管理") - -# def change_general_sub_info( -# self, name: str, sub_data: Dict[str, Dict[str, Union[str, Path, dict]]] -# ) -> None: -# """ -# 保存代理完成后发生改动的配置信息 - -# :param name: 脚本实例名称 -# :type name: str -# :param sub_data: 子配置信息字典,包含子配置名称和对应的配置信息 -# :type sub_data: Dict[str, Dict[str, Union[str, Path, dict]]] -# """ - -# logger.info(f"开始保存通用脚本实例 {name} 的子配置信息变动", module="配置管理") - -# for sub, info in sub_data.items(): - -# sub_config = self.script_dict[name]["SubData"][sub]["Config"] - -# sub_config.set( -# sub_config.Info_RemainedDay, info["Config"]["Info"]["RemainedDay"] -# ) -# sub_config.set( -# sub_config.Data_LastProxyDate, info["Config"]["Data"]["LastProxyDate"] -# ) -# sub_config.set( -# sub_config.Data_ProxyTimes, info["Config"]["Data"]["ProxyTimes"] -# ) - -# self.sub_info_changed.emit() - -# logger.success( -# f"通用脚本实例 {name} 的子配置信息变动保存完成", module="配置管理" -# ) - -# def set_power_sign(self, sign: str) -> None: -# """ -# 设置当前电源状态 - -# :param sign: 电源状态标志 -# """ - -# self.power_sign = sign -# self.power_sign_changed.emit() - -# logger.info(f"电源状态已更改为: {sign}", module="配置管理") - -# def save_history(self, key: str, content: dict) -> None: -# """ -# 保存历史记录 - -# :param key: 调度队列的键 -# :type key: str -# :param content: 包含时间和历史记录内容的字典 -# :type content: dict -# """ - -# if key in self.queue_dict: -# logger.info(f"保存调度队列 {key} 的历史记录", module="配置管理") -# self.queue_dict[key]["Config"].set( -# self.queue_dict[key]["Config"].Data_LastProxyTime, content["Time"] -# ) -# self.queue_dict[key]["Config"].set( -# self.queue_dict[key]["Config"].Data_LastProxyHistory, content["History"] -# ) -# logger.success(f"调度队列 {key} 的历史记录已保存", module="配置管理") -# else: -# logger.warning(f"保存历史记录时未找到调度队列: {key}") - -# def save_maa_log(self, log_path: Path, logs: list, maa_result: str) -> bool: -# """ -# 保存MAA日志并生成对应统计数据 - -# :param log_path: 日志文件保存路径 -# :type log_path: Path -# :param logs: 日志内容列表 -# :type logs: list -# :param maa_result: MAA 结果 -# :type maa_result: str -# :return: 是否包含6★招募 -# :rtype: bool -# """ - -# logger.info( -# f"开始处理 MAA 日志,日志长度: {len(logs)},日志标记:{maa_result}", -# module="配置管理", -# ) - -# data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = { -# "recruit_statistics": defaultdict(int), -# "drop_statistics": defaultdict(dict), -# "maa_result": maa_result, -# } - -# if_six_star = False - -# # 公招统计(仅统计招募到的) -# confirmed_recruit = False -# current_star_level = None -# i = 0 -# while i < len(logs): -# if "公招识别结果:" in logs[i]: -# current_star_level = None # 每次识别公招时清空之前的星级 -# i += 1 -# while i < len(logs) and "Tags" not in logs[i]: # 读取所有公招标签 -# i += 1 - -# if i < len(logs) and "Tags" in logs[i]: # 识别星级 -# star_match = re.search(r"(\d+)\s*★ Tags", logs[i]) -# if star_match: -# current_star_level = f"{star_match.group(1)}★" -# if current_star_level == "6★": -# if_six_star = True - -# if "已确认招募" in logs[i]: # 只有确认招募后才统计 -# confirmed_recruit = True - -# if confirmed_recruit and current_star_level: -# data["recruit_statistics"][current_star_level] += 1 -# confirmed_recruit = False # 重置,等待下一次公招 -# current_star_level = None # 清空已处理的星级 - -# i += 1 - -# # 掉落统计 -# # 存储所有关卡的掉落统计 -# all_stage_drops = {} - -# # 查找所有Fight任务的开始和结束位置 -# fight_tasks = [] -# for i, line in enumerate(logs): -# if "开始任务: Fight" in line or "开始任务: 刷理智" in line: -# # 查找对应的任务结束位置 -# end_index = -1 -# for j in range(i + 1, len(logs)): -# if "完成任务: Fight" in logs[j] or "完成任务: 刷理智" in logs[j]: -# end_index = j -# break -# # 如果遇到新的Fight任务开始,则当前任务没有正常结束 -# if j < len(logs) and ( -# "开始任务: Fight" in logs[j] or "开始任务: 刷理智" in logs[j] -# ): -# break - -# # 如果找到了结束位置,记录这个任务的范围 -# if end_index != -1: -# fight_tasks.append((i, end_index)) - -# # 处理每个Fight任务 -# for start_idx, end_idx in fight_tasks: -# # 提取当前任务的日志 -# task_logs = logs[start_idx : end_idx + 1] - -# # 查找任务中的最后一次掉落统计 -# last_drop_stats = {} -# current_stage = None - -# for line in task_logs: -# # 匹配掉落统计行,如"1-7 掉落统计:" -# drop_match = re.search(r"([A-Za-z0-9\-]+) 掉落统计:", line) -# if drop_match: -# # 发现新的掉落统计,重置当前关卡的掉落数据 -# current_stage = drop_match.group(1) -# last_drop_stats = {} -# continue - -# # 如果已经找到了关卡,处理掉落物 -# if current_stage: -# item_match: List[str] = re.findall( -# r"^(?!\[)(\S+?)\s*:\s*([\d,]+)(?:\s*\(\+[\d,]+\))?", -# line, -# re.M, -# ) -# for item, total in item_match: -# # 解析数值时去掉逗号 (如 2,160 -> 2160) -# total = int(total.replace(",", "")) - -# # 黑名单 -# if item not in [ -# "当前次数", -# "理智", -# "最快截图耗时", -# "专精等级", -# "剩余时间", -# ]: -# last_drop_stats[item] = total - -# # 如果任务中有掉落统计,更新总统计 -# if current_stage and last_drop_stats: -# if current_stage not in all_stage_drops: -# all_stage_drops[current_stage] = {} - -# # 累加掉落数据 -# for item, count in last_drop_stats.items(): -# all_stage_drops[current_stage].setdefault(item, 0) -# all_stage_drops[current_stage][item] += count - -# # 将累加后的掉落数据保存到结果中 -# data["drop_statistics"] = all_stage_drops - -# # 保存日志 -# log_path.parent.mkdir(parents=True, exist_ok=True) -# with log_path.open("w", encoding="utf-8") as f: -# f.writelines(logs) -# with log_path.with_suffix(".json").open("w", encoding="utf-8") as f: -# json.dump(data, f, ensure_ascii=False, indent=4) - -# logger.success(f"MAA 日志统计完成,日志路径:{log_path}", module="配置管理") - -# return if_six_star - -# def save_general_log(self, log_path: Path, logs: list, general_result: str) -> None: -# """ -# 保存通用日志并生成对应统计数据 - -# :param log_path: 日志文件保存路径 -# :param logs: 日志内容列表 -# :param general_result: 待保存的日志结果信息 -# """ - -# logger.info( -# f"开始处理通用日志,日志长度: {len(logs)},日志标记:{general_result}", -# module="配置管理", -# ) - -# data: Dict[str, str] = {"general_result": general_result} - -# # 保存日志 -# log_path.parent.mkdir(parents=True, exist_ok=True) -# with log_path.with_suffix(".log").open("w", encoding="utf-8") as f: -# f.writelines(logs) -# with log_path.with_suffix(".json").open("w", encoding="utf-8") as f: -# json.dump(data, f, ensure_ascii=False, indent=4) - -# logger.success( -# f"通用日志统计完成,日志路径:{log_path.with_suffix('.log')}", -# module="配置管理", -# ) - -# def merge_statistic_info(self, statistic_path_list: List[Path]) -> dict: -# """ -# 合并指定数据统计信息文件 - -# :param statistic_path_list: 需要合并的统计信息文件路径列表 -# :return: 合并后的统计信息字典 -# """ - -# logger.info( -# f"开始合并统计信息文件,共计 {len(statistic_path_list)} 个文件", -# module="配置管理", -# ) - -# data = {"index": {}} - -# for json_file in statistic_path_list: - -# with json_file.open("r", encoding="utf-8") as f: -# single_data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = ( -# json.load(f) -# ) - -# for key in single_data.keys(): - -# if key not in data: -# data[key] = {} - -# # 合并公招统计 -# if key == "recruit_statistics": - -# for star_level, count in single_data[key].items(): -# if star_level not in data[key]: -# data[key][star_level] = 0 -# data[key][star_level] += count - -# # 合并掉落统计 -# elif key == "drop_statistics": - -# for stage, drops in single_data[key].items(): -# if stage not in data[key]: -# data[key][stage] = {} # 初始化关卡 - -# for item, count in drops.items(): - -# if item not in data[key][stage]: -# data[key][stage][item] = 0 -# data[key][stage][item] += count - -# # 录入运行结果 -# elif key in ["maa_result", "general_result"]: - -# actual_date = datetime.strptime( -# f"{json_file.parent.parent.name} {json_file.stem}", -# "%Y-%m-%d %H-%M-%S", -# ) + timedelta( -# days=( -# 1 -# if datetime.strptime(json_file.stem, "%H-%M-%S").time() -# < datetime.min.time().replace(hour=4) -# else 0 -# ) -# ) - -# if single_data[key] != "Success!": -# if "error_info" not in data: -# data["error_info"] = {} -# data["error_info"][actual_date.strftime("%d日 %H:%M:%S")] = ( -# single_data[key] -# ) - -# data["index"][actual_date] = [ -# actual_date.strftime("%d日 %H:%M:%S"), -# ("完成" if single_data[key] == "Success!" else "异常"), -# json_file, -# ] - -# data["index"] = [data["index"][_] for _ in sorted(data["index"])] - -# logger.success( -# f"统计信息合并完成,共计 {len(data['index'])} 条记录", module="配置管理" -# ) - -# return {k: v for k, v in data.items() if v} - -# def search_history( -# self, mode: str, start_date: datetime, end_date: datetime -# ) -> dict: -# """ -# 搜索指定范围内的历史记录 - -# :param mode: 合并模式(按日合并、按周合并、按月合并) -# :param start_date: 开始日期 -# :param end_date: 结束日期 -# :return: 搜索到的历史记录字典 -# """ - -# logger.info( -# f"开始搜索历史记录,合并模式:{mode},日期范围:{start_date} 至 {end_date}", -# module="配置管理", -# ) - -# history_dict = {} - -# for date_folder in (Config.app_path / "history").iterdir(): -# if not date_folder.is_dir(): -# continue # 只处理日期文件夹 - -# try: - -# date = datetime.strptime(date_folder.name, "%Y-%m-%d") - -# if not (start_date <= date <= end_date): -# continue # 只统计在范围内的日期 - -# if mode == "按日合并": -# date_name = date.strftime("%Y年 %m月 %d日") -# elif mode == "按周合并": -# year, week, _ = date.isocalendar() -# date_name = f"{year}年 第{week}周" -# elif mode == "按月合并": -# date_name = date.strftime("%Y年 %m月") - -# if date_name not in history_dict: -# history_dict[date_name] = {} - -# for user_folder in date_folder.iterdir(): -# if not user_folder.is_dir(): -# continue # 只处理用户文件夹 - -# if user_folder.stem not in history_dict[date_name]: -# history_dict[date_name][user_folder.stem] = list( -# user_folder.with_suffix("").glob("*.json") -# ) -# else: -# history_dict[date_name][user_folder.stem] += list( -# user_folder.with_suffix("").glob("*.json") -# ) - -# except ValueError: -# logger.warning(f"非日期格式的目录: {date_folder}") - -# logger.success( -# f"历史记录搜索完成,共计 {len(history_dict)} 条记录", module="配置管理" -# ) - -# return { -# k: v -# for k, v in sorted(history_dict.items(), key=lambda x: x[0], reverse=True) -# } - -# def clean_old_history(self): -# """删除超过用户设定天数的历史记录文件(基于目录日期)""" - -# if self.get(self.function_HistoryRetentionTime) == 0: -# logger.info("历史记录永久保留,跳过历史记录清理", module="配置管理") -# return - -# logger.info("开始清理超过设定天数的历史记录", module="配置管理") - -# deleted_count = 0 - -# for date_folder in (self.app_path / "history").iterdir(): -# if not date_folder.is_dir(): -# continue # 只处理日期文件夹 - -# try: -# # 只检查 `YYYY-MM-DD` 格式的文件夹 -# folder_date = datetime.strptime(date_folder.name, "%Y-%m-%d") -# if datetime.now() - folder_date > timedelta( -# days=self.get(self.function_HistoryRetentionTime) -# ): -# shutil.rmtree(date_folder, ignore_errors=True) -# deleted_count += 1 -# logger.info(f"已删除超期日志目录: {date_folder}", module="配置管理") -# except ValueError: -# logger.warning(f"非日期格式的目录: {date_folder}", module="配置管理") - -# logger.success(f"清理完成: {deleted_count} 个日期目录", module="配置管理") - Config = AppConfig() diff --git a/app/main.py b/app/main.py index e974284..24a94b0 100644 --- a/app/main.py +++ b/app/main.py @@ -22,38 +22,12 @@ import os import sys import ctypes -import uvicorn -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware - -from api import scripts_router, setting_router -from core import Config from utils import get_logger - logger = get_logger("主程序") -app = FastAPI( - title="AUTO_MAA", - description="API for managing automation scripts, plans, and tasks", - version="1.0.0", -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # 允许所有域名跨域访问 - allow_credentials=True, - allow_methods=["*"], # 允许所有请求方法,如 GET、POST、PUT、DELETE - allow_headers=["*"], # 允许所有请求头 -) - - -app.include_router(scripts_router) -app.include_router(setting_router) - - def is_admin() -> bool: """检查当前程序是否以管理员身份运行""" try: @@ -67,6 +41,44 @@ def main(): if is_admin(): + import uvicorn + from fastapi import FastAPI + from contextlib import asynccontextmanager + + @asynccontextmanager + async def lifespan(app: FastAPI): + + from core import Config + + await Config.init_config() + + yield + + logger.info("AUTO_MAA 后端程序关闭") + logger.info("----------------END----------------") + + from fastapi.middleware.cors import CORSMiddleware + from api import scripts_router, queue_router, setting_router + + app = FastAPI( + title="AUTO_MAA", + description="API for managing automation scripts, plans, and tasks", + version="1.0.0", + lifespan=lifespan, + ) + + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # 允许所有域名跨域访问 + allow_credentials=True, + allow_methods=["*"], # 允许所有请求方法,如 GET、POST、PUT、DELETE + allow_headers=["*"], # 允许所有请求头 + ) + + app.include_router(scripts_router) + app.include_router(queue_router) + app.include_router(setting_router) + uvicorn.run(app, host="0.0.0.0", port=8000) else: diff --git a/app/models/ConfigBase.py b/app/models/ConfigBase.py index 5a09a60..d1b40b8 100644 --- a/app/models/ConfigBase.py +++ b/app/models/ConfigBase.py @@ -74,6 +74,22 @@ class OptionsValidator(ConfigValidator): return value if self.validate(value) else self.options[0] +class UidValidator(ConfigValidator): + """UID验证器""" + + def validate(self, value: str) -> bool: + if value == "": + return True + try: + uuid.UUID(value) + return True + except ValueError: + return False + + def correct(self, value: str) -> str: + return "" + + class BoolValidator(OptionsValidator): """布尔值验证器""" diff --git a/app/models/schema.py b/app/models/schema.py index d5e5fdb..5db1b44 100644 --- a/app/models/schema.py +++ b/app/models/schema.py @@ -74,6 +74,61 @@ class UserDeleteIn(UserInBase): userId: str = Field(..., description="用户ID") +class QueueCreateOut(OutBase): + queueId: str = Field(..., description="新创建的队列ID") + data: Dict[str, Any] = Field(..., description="队列配置数据") + + +class QueueGetIn(BaseModel): + queueId: Optional[str] = Field(None, description="队列ID,仅在模式为Single时需要") + + +class QueueGetOut(OutBase): + index: List[Dict[str, str]] = Field(..., description="队列索引列表") + data: Dict[str, Any] = Field(..., description="队列列表或单个队列数据") + + +class QueueUpdateIn(BaseModel): + queueId: str = Field(..., description="队列ID") + data: Dict[str, Dict[str, Any]] = Field(..., description="队列更新数据") + + +class QueueDeleteIn(BaseModel): + queueId: str = Field(..., description="队列ID") + + +class QueueSetInBase(BaseModel): + queueId: str = Field(..., description="所属队列ID") + + +class TimeSetCreateOut(OutBase): + timeSetId: str = Field(..., description="新创建的时间设置ID") + data: Dict[str, Any] = Field(..., description="时间设置配置数据") + + +class TimeSetUpdateIn(QueueSetInBase): + timeSetId: str = Field(..., description="时间设置ID") + data: Dict[str, Dict[str, Any]] = Field(..., description="时间设置更新数据") + + +class TimeSetDeleteIn(QueueSetInBase): + timeSetId: str = Field(..., description="时间设置ID") + + +class QueueItemCreateOut(OutBase): + queueItemId: str = Field(..., description="新创建的队列项ID") + data: Dict[str, Any] = Field(..., description="队列项配置数据") + + +class QueueItemUpdateIn(QueueSetInBase): + queueItemId: str = Field(..., description="队列项ID") + data: Dict[str, Dict[str, Any]] = Field(..., description="队列项更新数据") + + +class QueueItemDeleteIn(QueueSetInBase): + queueItemId: str = Field(..., description="队列项ID") + + class SettingGetOut(OutBase): data: Dict[str, Dict[str, Any]] = Field(..., description="全局设置数据")