feat: 添加队列相关API

This commit is contained in:
DLmaster361
2025-08-05 00:46:22 +08:00
parent 8e3c62a518
commit 911eb60ae9
6 changed files with 394 additions and 750 deletions

View File

@@ -23,6 +23,7 @@ __author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .scripts import router as scripts_router
from .queue import router as queue_router
from .setting import router as setting_router
__all__ = ["scripts_router", "setting_router"]
__all__ = ["scripts_router", "queue_router", "setting_router"]

139
app/api/queue.py Normal file
View File

@@ -0,0 +1,139 @@
# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
from fastapi import APIRouter, Body
from core import Config
from models.schema import *
router = APIRouter(prefix="/api/queue", tags=["调度队列管理"])
@router.post(
"/add", summary="添加调度队列", response_model=QueueCreateOut, status_code=200
)
async def add_queue() -> QueueCreateOut:
uid, config = await Config.add_queue()
return QueueCreateOut(queueId=str(uid), data=await config.toDict())
@router.post(
"/get", summary="查询调度队列配置信息", response_model=QueueGetOut, status_code=200
)
async def get_queues(queue: QueueGetIn = Body(...)) -> QueueGetOut:
try:
index, data = await Config.get_queue(queue.queueId)
except Exception as e:
return QueueGetOut(code=500, status="error", message=str(e), index=[], data={})
return QueueGetOut(index=index, data=data)
@router.post(
"/update", summary="更新调度队列配置信息", response_model=OutBase, status_code=200
)
async def update_queue(queue: QueueUpdateIn = Body(...)) -> OutBase:
try:
await Config.update_queue(queue.queueId, queue.data)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()
@router.post("/delete", summary="删除调度队列", response_model=OutBase, status_code=200)
async def delete_queue(queue: QueueDeleteIn = Body(...)) -> OutBase:
try:
await Config.del_queue(queue.queueId)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()
@router.post(
"/time/add", summary="添加定时项", response_model=TimeSetCreateOut, status_code=200
)
async def add_time_set(time: QueueSetInBase = Body(...)) -> TimeSetCreateOut:
uid, config = await Config.add_time_set(time.queueId)
return TimeSetCreateOut(timeSetId=str(uid), data=await config.toDict())
@router.post(
"/time/update", summary="更新定时项", response_model=OutBase, status_code=200
)
async def update_time_set(time: TimeSetUpdateIn = Body(...)) -> OutBase:
try:
await Config.update_time_set(time.queueId, time.timeSetId, time.data)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()
@router.post(
"/time/delete", summary="删除定时项", response_model=OutBase, status_code=200
)
async def delete_time_set(time: TimeSetDeleteIn = Body(...)) -> OutBase:
try:
await Config.del_time_set(time.queueId, time.timeSetId)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()
@router.post(
"/item/add",
summary="添加队列项",
response_model=QueueItemCreateOut,
status_code=200,
)
async def add_item(item: QueueSetInBase = Body(...)) -> QueueItemCreateOut:
uid, config = await Config.add_queue_item(item.queueId)
return QueueItemCreateOut(queueItemId=str(uid), data=await config.toDict())
@router.post(
"/item/update", summary="更新队列项", response_model=OutBase, status_code=200
)
async def update_item(item: QueueItemUpdateIn = Body(...)) -> OutBase:
try:
await Config.update_queue_item(item.queueId, item.queueItemId, item.data)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()
@router.post(
"/item/delete", summary="删除队列项", response_model=OutBase, status_code=200
)
async def delete_item(item: QueueItemDeleteIn = Body(...)) -> OutBase:
try:
await Config.del_queue_item(item.queueId, item.queueItemId)
except Exception as e:
return OutBase(code=500, status="error", message=str(e))
return OutBase()

View File

@@ -121,7 +121,7 @@ class QueueItem(ConfigBase):
def __init__(self) -> None:
super().__init__()
self.Info_ScriptId = ConfigItem("Info", "ScriptId", "")
self.Info_ScriptId = ConfigItem("Info", "ScriptId", "", UidValidator())
class TimeSet(ConfigBase):
@@ -352,9 +352,6 @@ class MaaConfig(ConfigBase):
self.UserData = MultipleConfig([MaaUserConfig])
# def get_name(self) -> str:
# return self.get(self.MaaSet_Name)
class MaaPlanConfig(ConfigBase):
"""MAA计划表配置"""
@@ -501,10 +498,7 @@ class GeneralConfig(ConfigBase):
"Script", "ConfigPath", ".", FileValidator()
)
self.Script_ConfigPathMode = ConfigItem(
"Script",
"ConfigPathMode",
"所有文件 (*)",
OptionsValidator(["所有文件 (*)", "文件夹"]),
"Script", "ConfigPathMode", "File", OptionsValidator(["File", "Folder"])
)
self.Script_UpdateConfigMode = ConfigItem(
"Script",
@@ -549,9 +543,6 @@ class GeneralConfig(ConfigBase):
self.UserData = MultipleConfig([GeneralUserConfig])
# def get_name(self) -> str:
# return self.get(self.Script_Name)
class AppConfig(GlobalConfig):
@@ -602,11 +593,6 @@ class AppConfig(GlobalConfig):
self.PlanConfig = MultipleConfig([MaaPlanConfig])
self.QueueConfig = MultipleConfig([QueueConfig])
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(self.init_config())
async def init_config(self) -> None:
"""初始化配置管理"""
await self.connect(self.config_path / "Config.json")
@@ -716,6 +702,148 @@ class AppConfig(GlobalConfig):
await script_config.UserData.remove(uid)
await self.ScriptConfig.save()
async def add_queue(self) -> tuple[uuid.UUID, ConfigBase]:
"""添加调度队列"""
self.logger.info("添加调度队列")
return await self.QueueConfig.add(QueueConfig)
async def get_queue(self, queue_id: Optional[str]) -> tuple[list, dict]:
"""获取调度队列配置"""
self.logger.info(f"获取调度队列配置:{queue_id}")
if queue_id is None:
data = await self.QueueConfig.toDict()
else:
data = await self.QueueConfig.get(uuid.UUID(queue_id))
index = data.pop("instances", [])
return list(index), data
async def update_queue(
self, queue_id: str, data: Dict[str, Dict[str, Any]]
) -> None:
"""更新调度队列配置"""
self.logger.info(f"更新调度队列配置:{queue_id}")
uid = uuid.UUID(queue_id)
for group, items in data.items():
for name, value in items.items():
self.logger.debug(
f"更新调度队列配置:{queue_id} - {group}.{name} = {value}"
)
await self.QueueConfig[uid].set(group, name, value)
await self.QueueConfig.save()
async def del_queue(self, queue_id: str) -> None:
"""删除调度队列配置"""
self.logger.info(f"删除调度队列配置:{queue_id}")
await self.QueueConfig.remove(uuid.UUID(queue_id))
async def add_time_set(self, queue_id: str) -> tuple[uuid.UUID, ConfigBase]:
"""添加时间设置配置"""
self.logger.info(f"{queue_id} 添加时间设置配置")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
if isinstance(queue_config, QueueConfig):
uid, config = await queue_config.TimeSet.add(TimeSet)
else:
raise TypeError(f"Unsupported script config type: {type(queue_config)}")
await self.QueueConfig.save()
return uid, config
async def update_time_set(
self, queue_id: str, time_set_id: str, data: Dict[str, Dict[str, Any]]
) -> None:
"""更新时间设置配置"""
self.logger.info(f"{queue_id} 更新时间设置配置:{time_set_id}")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
uid = uuid.UUID(time_set_id)
for group, items in data.items():
for name, value in items.items():
self.logger.debug(
f"更新时间设置配置:{queue_id} - {group}.{name} = {value}"
)
if isinstance(queue_config, QueueConfig):
await queue_config.TimeSet[uid].set(group, name, value)
await self.QueueConfig.save()
async def del_time_set(self, queue_id: str, time_set_id: str) -> None:
"""删除时间设置配置"""
self.logger.info(f"{queue_id} 删除时间设置配置:{time_set_id}")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
uid = uuid.UUID(time_set_id)
if isinstance(queue_config, QueueConfig):
await queue_config.TimeSet.remove(uid)
await self.QueueConfig.save()
async def add_queue_item(self, queue_id: str) -> tuple[uuid.UUID, ConfigBase]:
"""添加队列项配置"""
self.logger.info(f"{queue_id} 添加队列项配置")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
if isinstance(queue_config, QueueConfig):
uid, config = await queue_config.QueueItem.add(QueueItem)
else:
raise TypeError(f"Unsupported script config type: {type(queue_config)}")
await self.QueueConfig.save()
return uid, config
async def update_queue_item(
self, queue_id: str, queue_item_id: str, data: Dict[str, Dict[str, Any]]
) -> None:
"""更新队列项配置"""
self.logger.info(f"{queue_id} 更新队列项配置:{queue_item_id}")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
uid = uuid.UUID(queue_item_id)
for group, items in data.items():
for name, value in items.items():
if uuid.UUID(value) not in self.ScriptConfig:
raise ValueError(f"Script with uid {value} does not exist.")
self.logger.debug(
f"更新队列项配置:{queue_id} - {group}.{name} = {value}"
)
if isinstance(queue_config, QueueConfig):
await queue_config.QueueItem[uid].set(group, name, value)
await self.QueueConfig.save()
async def del_queue_item(self, queue_id: str, queue_item_id: str) -> None:
"""删除队列项配置"""
self.logger.info(f"{queue_id} 删除队列项配置:{queue_item_id}")
queue_config = self.QueueConfig[uuid.UUID(queue_id)]
uid = uuid.UUID(queue_item_id)
if isinstance(queue_config, QueueConfig):
await queue_config.QueueItem.remove(uid)
await self.QueueConfig.save()
async def get_setting(self) -> Dict[str, Any]:
"""获取全局设置"""
@@ -733,87 +861,6 @@ class AppConfig(GlobalConfig):
self.logger.debug(f"更新全局设置 - {group}.{name} = {value}")
await self.set(group, name, value)
# def get_stage(self) -> None:
# """从MAA服务器更新活动关卡信息"""
# logger.info("开始获取活动关卡信息", module="配置管理")
# network = Network.add_task(
# mode="get",
# url="https://api.maa.plus/MaaAssistantArknights/api/gui/StageActivity.json",
# )
# network.loop.exec()
# network_result = Network.get_result(network)
# if network_result["status_code"] == 200:
# stage_infos: List[Dict[str, Union[str, Dict[str, Union[str, int]]]]] = (
# network_result["response_json"]["Official"]["sideStoryStage"]
# )
# else:
# logger.warning(
# f"无法从MAA服务器获取活动关卡信息:{network_result['error_message']}",
# module="配置管理",
# )
# stage_infos = []
# ss_stage_dict = {"value": [], "text": []}
# for stage_info in stage_infos:
# if (
# datetime.strptime(
# stage_info["Activity"]["UtcStartTime"], "%Y/%m/%d %H:%M:%S"
# )
# < datetime.now()
# < datetime.strptime(
# stage_info["Activity"]["UtcExpireTime"], "%Y/%m/%d %H:%M:%S"
# )
# ):
# ss_stage_dict["value"].append(stage_info["Value"])
# ss_stage_dict["text"].append(stage_info["Value"])
# # 生成每日关卡信息
# stage_daily_info = [
# {"value": "-", "text": "当前/上次", "days": [1, 2, 3, 4, 5, 6, 7]},
# {"value": "1-7", "text": "1-7", "days": [1, 2, 3, 4, 5, 6, 7]},
# {"value": "R8-11", "text": "R8-11", "days": [1, 2, 3, 4, 5, 6, 7]},
# {
# "value": "12-17-HARD",
# "text": "12-17-HARD",
# "days": [1, 2, 3, 4, 5, 6, 7],
# },
# {"value": "CE-6", "text": "龙门币-6/5", "days": [2, 4, 6, 7]},
# {"value": "AP-5", "text": "红票-5", "days": [1, 4, 6, 7]},
# {"value": "CA-5", "text": "技能-5", "days": [2, 3, 5, 7]},
# {"value": "LS-6", "text": "经验-6/5", "days": [1, 2, 3, 4, 5, 6, 7]},
# {"value": "SK-5", "text": "碳-5", "days": [1, 3, 5, 6]},
# {"value": "PR-A-1", "text": "奶/盾芯片", "days": [1, 4, 5, 7]},
# {"value": "PR-A-2", "text": "奶/盾芯片组", "days": [1, 4, 5, 7]},
# {"value": "PR-B-1", "text": "术/狙芯片", "days": [1, 2, 5, 6]},
# {"value": "PR-B-2", "text": "术/狙芯片组", "days": [1, 2, 5, 6]},
# {"value": "PR-C-1", "text": "先/辅芯片", "days": [3, 4, 6, 7]},
# {"value": "PR-C-2", "text": "先/辅芯片组", "days": [3, 4, 6, 7]},
# {"value": "PR-D-1", "text": "近/特芯片", "days": [2, 3, 6, 7]},
# {"value": "PR-D-2", "text": "近/特芯片组", "days": [2, 3, 6, 7]},
# ]
# for day in range(0, 8):
# today_stage_dict = {"value": [], "text": []}
# for stage_info in stage_daily_info:
# if day in stage_info["days"] or day == 0:
# today_stage_dict["value"].append(stage_info["value"])
# today_stage_dict["text"].append(stage_info["text"])
# self.stage_dict[calendar.day_name[day - 1] if day > 0 else "ALL"] = {
# "value": today_stage_dict["value"] + ss_stage_dict["value"],
# "text": today_stage_dict["text"] + ss_stage_dict["text"],
# }
# self.stage_refreshed.emit()
# logger.success("活动关卡信息更新完成", module="配置管理")
def server_date(self) -> date:
"""
获取当前的服务器日期
@@ -828,630 +875,4 @@ class AppConfig(GlobalConfig):
return dt.date()
# def search_maa_user(self, name: str) -> None:
# """
# 更新指定 MAA 脚本实例的用户信息
# :param name: 脚本实例名称
# :type name: str
# """
# logger.info(f"开始搜索并读入 MAA 脚本实例 {name} 的用户信息", module="配置管理")
# user_dict: Dict[str, Dict[str, Union[Path, MaaUserConfig]]] = {}
# for user_dir in (Config.script_dict[name]["Path"] / "UserData").iterdir():
# if user_dir.is_dir():
# user_config = MaaUserConfig()
# user_config.load(user_dir / "config.json", user_config)
# user_config.save()
# user_dict[user_dir.stem] = {"Path": user_dir, "Config": user_config}
# self.script_dict[name]["UserData"] = dict(
# sorted(user_dict.items(), key=lambda x: int(x[0][3:]))
# )
# logger.success(
# f"MAA 脚本实例 {name} 的用户信息搜索完成,共找到 {len(user_dict)} 个用户",
# module="配置管理",
# )
# def search_general_sub(self, name: str) -> None:
# """
# 更新指定通用脚本实例的子配置信息
# :param name: 脚本实例名称
# :type name: str
# """
# logger.info(
# f"开始搜索并读入通用脚本实例 {name} 的子配置信息", module="配置管理"
# )
# user_dict: Dict[str, Dict[str, Union[Path, GeneralSubConfig]]] = {}
# for sub_dir in (Config.script_dict[name]["Path"] / "SubData").iterdir():
# if sub_dir.is_dir():
# sub_config = GeneralSubConfig()
# sub_config.load(sub_dir / "config.json", sub_config)
# sub_config.save()
# user_dict[sub_dir.stem] = {"Path": sub_dir, "Config": sub_config}
# self.script_dict[name]["SubData"] = dict(
# sorted(user_dict.items(), key=lambda x: int(x[0][3:]))
# )
# logger.success(
# f"通用脚本实例 {name} 的子配置信息搜索完成,共找到 {len(user_dict)} 个子配置",
# module="配置管理",
# )
# def search_plan(self) -> None:
# """更新计划表配置信息"""
# logger.info("开始搜索并读入计划表配置", module="配置管理")
# self.plan_dict: Dict[str, Dict[str, Union[str, Path, MaaPlanConfig]]] = {}
# if (self.app_path / "config/MaaPlanConfig").exists():
# for maa_plan_dir in (self.app_path / "config/MaaPlanConfig").iterdir():
# if maa_plan_dir.is_dir():
# maa_plan_config = MaaPlanConfig()
# maa_plan_config.load(maa_plan_dir / "config.json", maa_plan_config)
# maa_plan_config.save()
# self.plan_dict[maa_plan_dir.name] = {
# "Type": "Maa",
# "Path": maa_plan_dir,
# "Config": maa_plan_config,
# }
# self.plan_dict = dict(
# sorted(self.plan_dict.items(), key=lambda x: int(x[0][3:]))
# )
# logger.success(
# f"计划表配置搜索完成,共找到 {len(self.plan_dict)} 个计划表",
# module="配置管理",
# )
# def search_queue(self):
# """更新调度队列实例配置信息"""
# logger.info("开始搜索并读入调度队列配置", module="配置管理")
# self.queue_dict: Dict[str, Dict[str, Union[Path, QueueConfig]]] = {}
# if (self.app_path / "config/QueueConfig").exists():
# for json_file in (self.app_path / "config/QueueConfig").glob("*.json"):
# queue_config = QueueConfig()
# queue_config.load(json_file, queue_config)
# queue_config.save()
# self.queue_dict[json_file.stem] = {
# "Path": json_file,
# "Config": queue_config,
# }
# self.queue_dict = dict(
# sorted(self.queue_dict.items(), key=lambda x: int(x[0][5:]))
# )
# logger.success(
# f"调度队列配置搜索完成,共找到 {len(self.queue_dict)} 个调度队列",
# module="配置管理",
# )
# def change_queue(self, old: str, new: str) -> None:
# """
# 修改调度队列配置文件的队列参数
# :param old: 旧脚本名
# :param new: 新脚本名
# """
# logger.info(f"开始修改调度队列参数:{old} -> {new}", module="配置管理")
# for queue in self.queue_dict.values():
# for i in range(10):
# if (
# queue["Config"].get(
# queue["Config"].config_item_dict["Queue"][f"Script_{i}"]
# )
# == old
# ):
# queue["Config"].set(
# queue["Config"].config_item_dict["Queue"][f"Script_{i}"], new
# )
# logger.success(f"调度队列参数修改完成:{old} -> {new}", module="配置管理")
# def change_plan(self, old: str, new: str) -> None:
# """
# 修改脚本管理所有下属用户的计划表配置参数
# :param old: 旧计划表名
# :param new: 新计划表名
# """
# logger.info(f"开始修改计划表参数:{old} -> {new}", module="配置管理")
# for script in self.script_dict.values():
# for user in script["UserData"].values():
# if user["Config"].get(user["Config"].Info_StageMode) == old:
# user["Config"].set(user["Config"].Info_StageMode, new)
# logger.success(f"计划表参数修改完成:{old} -> {new}", module="配置管理")
# def change_maa_user_info(
# self, name: str, user_data: Dict[str, Dict[str, Union[str, Path, dict]]]
# ) -> None:
# """
# 保存代理完成后发生改动的用户信息
# :param name: 脚本实例名称
# :type name: str
# :param user_data: 用户信息字典,包含用户名称和对应的配置信息
# :type user_data: Dict[str, Dict[str, Union[str, Path, dict]]]
# """
# logger.info(f"开始保存 MAA 脚本实例 {name} 的用户信息变动", module="配置管理")
# for user, info in user_data.items():
# user_config = self.script_dict[name]["UserData"][user]["Config"]
# user_config.set(
# user_config.Info_RemainedDay, info["Config"]["Info"]["RemainedDay"]
# )
# user_config.set(
# user_config.Data_LastProxyDate, info["Config"]["Data"]["LastProxyDate"]
# )
# user_config.set(
# user_config.Data_LastAnnihilationDate,
# info["Config"]["Data"]["LastAnnihilationDate"],
# )
# user_config.set(
# user_config.Data_LastSklandDate,
# info["Config"]["Data"]["LastSklandDate"],
# )
# user_config.set(
# user_config.Data_ProxyTimes, info["Config"]["Data"]["ProxyTimes"]
# )
# user_config.set(
# user_config.Data_IfPassCheck, info["Config"]["Data"]["IfPassCheck"]
# )
# user_config.set(
# user_config.Data_CustomInfrastPlanIndex,
# info["Config"]["Data"]["CustomInfrastPlanIndex"],
# )
# self.sub_info_changed.emit()
# logger.success(f"MAA 脚本实例 {name} 的用户信息变动保存完成", module="配置管理")
# def change_general_sub_info(
# self, name: str, sub_data: Dict[str, Dict[str, Union[str, Path, dict]]]
# ) -> None:
# """
# 保存代理完成后发生改动的配置信息
# :param name: 脚本实例名称
# :type name: str
# :param sub_data: 子配置信息字典,包含子配置名称和对应的配置信息
# :type sub_data: Dict[str, Dict[str, Union[str, Path, dict]]]
# """
# logger.info(f"开始保存通用脚本实例 {name} 的子配置信息变动", module="配置管理")
# for sub, info in sub_data.items():
# sub_config = self.script_dict[name]["SubData"][sub]["Config"]
# sub_config.set(
# sub_config.Info_RemainedDay, info["Config"]["Info"]["RemainedDay"]
# )
# sub_config.set(
# sub_config.Data_LastProxyDate, info["Config"]["Data"]["LastProxyDate"]
# )
# sub_config.set(
# sub_config.Data_ProxyTimes, info["Config"]["Data"]["ProxyTimes"]
# )
# self.sub_info_changed.emit()
# logger.success(
# f"通用脚本实例 {name} 的子配置信息变动保存完成", module="配置管理"
# )
# def set_power_sign(self, sign: str) -> None:
# """
# 设置当前电源状态
# :param sign: 电源状态标志
# """
# self.power_sign = sign
# self.power_sign_changed.emit()
# logger.info(f"电源状态已更改为: {sign}", module="配置管理")
# def save_history(self, key: str, content: dict) -> None:
# """
# 保存历史记录
# :param key: 调度队列的键
# :type key: str
# :param content: 包含时间和历史记录内容的字典
# :type content: dict
# """
# if key in self.queue_dict:
# logger.info(f"保存调度队列 {key} 的历史记录", module="配置管理")
# self.queue_dict[key]["Config"].set(
# self.queue_dict[key]["Config"].Data_LastProxyTime, content["Time"]
# )
# self.queue_dict[key]["Config"].set(
# self.queue_dict[key]["Config"].Data_LastProxyHistory, content["History"]
# )
# logger.success(f"调度队列 {key} 的历史记录已保存", module="配置管理")
# else:
# logger.warning(f"保存历史记录时未找到调度队列: {key}")
# def save_maa_log(self, log_path: Path, logs: list, maa_result: str) -> bool:
# """
# 保存MAA日志并生成对应统计数据
# :param log_path: 日志文件保存路径
# :type log_path: Path
# :param logs: 日志内容列表
# :type logs: list
# :param maa_result: MAA 结果
# :type maa_result: str
# :return: 是否包含6★招募
# :rtype: bool
# """
# logger.info(
# f"开始处理 MAA 日志,日志长度: {len(logs)},日志标记:{maa_result}",
# module="配置管理",
# )
# data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = {
# "recruit_statistics": defaultdict(int),
# "drop_statistics": defaultdict(dict),
# "maa_result": maa_result,
# }
# if_six_star = False
# # 公招统计(仅统计招募到的)
# confirmed_recruit = False
# current_star_level = None
# i = 0
# while i < len(logs):
# if "公招识别结果:" in logs[i]:
# current_star_level = None # 每次识别公招时清空之前的星级
# i += 1
# while i < len(logs) and "Tags" not in logs[i]: # 读取所有公招标签
# i += 1
# if i < len(logs) and "Tags" in logs[i]: # 识别星级
# star_match = re.search(r"(\d+)\s*★ Tags", logs[i])
# if star_match:
# current_star_level = f"{star_match.group(1)}★"
# if current_star_level == "6★":
# if_six_star = True
# if "已确认招募" in logs[i]: # 只有确认招募后才统计
# confirmed_recruit = True
# if confirmed_recruit and current_star_level:
# data["recruit_statistics"][current_star_level] += 1
# confirmed_recruit = False # 重置,等待下一次公招
# current_star_level = None # 清空已处理的星级
# i += 1
# # 掉落统计
# # 存储所有关卡的掉落统计
# all_stage_drops = {}
# # 查找所有Fight任务的开始和结束位置
# fight_tasks = []
# for i, line in enumerate(logs):
# if "开始任务: Fight" in line or "开始任务: 刷理智" in line:
# # 查找对应的任务结束位置
# end_index = -1
# for j in range(i + 1, len(logs)):
# if "完成任务: Fight" in logs[j] or "完成任务: 刷理智" in logs[j]:
# end_index = j
# break
# # 如果遇到新的Fight任务开始则当前任务没有正常结束
# if j < len(logs) and (
# "开始任务: Fight" in logs[j] or "开始任务: 刷理智" in logs[j]
# ):
# break
# # 如果找到了结束位置,记录这个任务的范围
# if end_index != -1:
# fight_tasks.append((i, end_index))
# # 处理每个Fight任务
# for start_idx, end_idx in fight_tasks:
# # 提取当前任务的日志
# task_logs = logs[start_idx : end_idx + 1]
# # 查找任务中的最后一次掉落统计
# last_drop_stats = {}
# current_stage = None
# for line in task_logs:
# # 匹配掉落统计行,如"1-7 掉落统计:"
# drop_match = re.search(r"([A-Za-z0-9\-]+) 掉落统计:", line)
# if drop_match:
# # 发现新的掉落统计,重置当前关卡的掉落数据
# current_stage = drop_match.group(1)
# last_drop_stats = {}
# continue
# # 如果已经找到了关卡,处理掉落物
# if current_stage:
# item_match: List[str] = re.findall(
# r"^(?!\[)(\S+?)\s*:\s*([\d,]+)(?:\s*\(\+[\d,]+\))?",
# line,
# re.M,
# )
# for item, total in item_match:
# # 解析数值时去掉逗号 (如 2,160 -> 2160
# total = int(total.replace(",", ""))
# # 黑名单
# if item not in [
# "当前次数",
# "理智",
# "最快截图耗时",
# "专精等级",
# "剩余时间",
# ]:
# last_drop_stats[item] = total
# # 如果任务中有掉落统计,更新总统计
# if current_stage and last_drop_stats:
# if current_stage not in all_stage_drops:
# all_stage_drops[current_stage] = {}
# # 累加掉落数据
# for item, count in last_drop_stats.items():
# all_stage_drops[current_stage].setdefault(item, 0)
# all_stage_drops[current_stage][item] += count
# # 将累加后的掉落数据保存到结果中
# data["drop_statistics"] = all_stage_drops
# # 保存日志
# log_path.parent.mkdir(parents=True, exist_ok=True)
# with log_path.open("w", encoding="utf-8") as f:
# f.writelines(logs)
# with log_path.with_suffix(".json").open("w", encoding="utf-8") as f:
# json.dump(data, f, ensure_ascii=False, indent=4)
# logger.success(f"MAA 日志统计完成,日志路径:{log_path}", module="配置管理")
# return if_six_star
# def save_general_log(self, log_path: Path, logs: list, general_result: str) -> None:
# """
# 保存通用日志并生成对应统计数据
# :param log_path: 日志文件保存路径
# :param logs: 日志内容列表
# :param general_result: 待保存的日志结果信息
# """
# logger.info(
# f"开始处理通用日志,日志长度: {len(logs)},日志标记:{general_result}",
# module="配置管理",
# )
# data: Dict[str, str] = {"general_result": general_result}
# # 保存日志
# log_path.parent.mkdir(parents=True, exist_ok=True)
# with log_path.with_suffix(".log").open("w", encoding="utf-8") as f:
# f.writelines(logs)
# with log_path.with_suffix(".json").open("w", encoding="utf-8") as f:
# json.dump(data, f, ensure_ascii=False, indent=4)
# logger.success(
# f"通用日志统计完成,日志路径:{log_path.with_suffix('.log')}",
# module="配置管理",
# )
# def merge_statistic_info(self, statistic_path_list: List[Path]) -> dict:
# """
# 合并指定数据统计信息文件
# :param statistic_path_list: 需要合并的统计信息文件路径列表
# :return: 合并后的统计信息字典
# """
# logger.info(
# f"开始合并统计信息文件,共计 {len(statistic_path_list)} 个文件",
# module="配置管理",
# )
# data = {"index": {}}
# for json_file in statistic_path_list:
# with json_file.open("r", encoding="utf-8") as f:
# single_data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = (
# json.load(f)
# )
# for key in single_data.keys():
# if key not in data:
# data[key] = {}
# # 合并公招统计
# if key == "recruit_statistics":
# for star_level, count in single_data[key].items():
# if star_level not in data[key]:
# data[key][star_level] = 0
# data[key][star_level] += count
# # 合并掉落统计
# elif key == "drop_statistics":
# for stage, drops in single_data[key].items():
# if stage not in data[key]:
# data[key][stage] = {} # 初始化关卡
# for item, count in drops.items():
# if item not in data[key][stage]:
# data[key][stage][item] = 0
# data[key][stage][item] += count
# # 录入运行结果
# elif key in ["maa_result", "general_result"]:
# actual_date = datetime.strptime(
# f"{json_file.parent.parent.name} {json_file.stem}",
# "%Y-%m-%d %H-%M-%S",
# ) + timedelta(
# days=(
# 1
# if datetime.strptime(json_file.stem, "%H-%M-%S").time()
# < datetime.min.time().replace(hour=4)
# else 0
# )
# )
# if single_data[key] != "Success!":
# if "error_info" not in data:
# data["error_info"] = {}
# data["error_info"][actual_date.strftime("%d日 %H:%M:%S")] = (
# single_data[key]
# )
# data["index"][actual_date] = [
# actual_date.strftime("%d日 %H:%M:%S"),
# ("完成" if single_data[key] == "Success!" else "异常"),
# json_file,
# ]
# data["index"] = [data["index"][_] for _ in sorted(data["index"])]
# logger.success(
# f"统计信息合并完成,共计 {len(data['index'])} 条记录", module="配置管理"
# )
# return {k: v for k, v in data.items() if v}
# def search_history(
# self, mode: str, start_date: datetime, end_date: datetime
# ) -> dict:
# """
# 搜索指定范围内的历史记录
# :param mode: 合并模式(按日合并、按周合并、按月合并)
# :param start_date: 开始日期
# :param end_date: 结束日期
# :return: 搜索到的历史记录字典
# """
# logger.info(
# f"开始搜索历史记录,合并模式:{mode},日期范围:{start_date} 至 {end_date}",
# module="配置管理",
# )
# history_dict = {}
# for date_folder in (Config.app_path / "history").iterdir():
# if not date_folder.is_dir():
# continue # 只处理日期文件夹
# try:
# date = datetime.strptime(date_folder.name, "%Y-%m-%d")
# if not (start_date <= date <= end_date):
# continue # 只统计在范围内的日期
# if mode == "按日合并":
# date_name = date.strftime("%Y年 %m月 %d日")
# elif mode == "按周合并":
# year, week, _ = date.isocalendar()
# date_name = f"{year}年 第{week}周"
# elif mode == "按月合并":
# date_name = date.strftime("%Y年 %m月")
# if date_name not in history_dict:
# history_dict[date_name] = {}
# for user_folder in date_folder.iterdir():
# if not user_folder.is_dir():
# continue # 只处理用户文件夹
# if user_folder.stem not in history_dict[date_name]:
# history_dict[date_name][user_folder.stem] = list(
# user_folder.with_suffix("").glob("*.json")
# )
# else:
# history_dict[date_name][user_folder.stem] += list(
# user_folder.with_suffix("").glob("*.json")
# )
# except ValueError:
# logger.warning(f"非日期格式的目录: {date_folder}")
# logger.success(
# f"历史记录搜索完成,共计 {len(history_dict)} 条记录", module="配置管理"
# )
# return {
# k: v
# for k, v in sorted(history_dict.items(), key=lambda x: x[0], reverse=True)
# }
# def clean_old_history(self):
# """删除超过用户设定天数的历史记录文件(基于目录日期)"""
# if self.get(self.function_HistoryRetentionTime) == 0:
# logger.info("历史记录永久保留,跳过历史记录清理", module="配置管理")
# return
# logger.info("开始清理超过设定天数的历史记录", module="配置管理")
# deleted_count = 0
# for date_folder in (self.app_path / "history").iterdir():
# if not date_folder.is_dir():
# continue # 只处理日期文件夹
# try:
# # 只检查 `YYYY-MM-DD` 格式的文件夹
# folder_date = datetime.strptime(date_folder.name, "%Y-%m-%d")
# if datetime.now() - folder_date > timedelta(
# days=self.get(self.function_HistoryRetentionTime)
# ):
# shutil.rmtree(date_folder, ignore_errors=True)
# deleted_count += 1
# logger.info(f"已删除超期日志目录: {date_folder}", module="配置管理")
# except ValueError:
# logger.warning(f"非日期格式的目录: {date_folder}", module="配置管理")
# logger.success(f"清理完成: {deleted_count} 个日期目录", module="配置管理")
Config = AppConfig()

View File

@@ -22,38 +22,12 @@
import os
import sys
import ctypes
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api import scripts_router, setting_router
from core import Config
from utils import get_logger
logger = get_logger("主程序")
app = FastAPI(
title="AUTO_MAA",
description="API for managing automation scripts, plans, and tasks",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有域名跨域访问
allow_credentials=True,
allow_methods=["*"], # 允许所有请求方法,如 GET、POST、PUT、DELETE
allow_headers=["*"], # 允许所有请求头
)
app.include_router(scripts_router)
app.include_router(setting_router)
def is_admin() -> bool:
"""检查当前程序是否以管理员身份运行"""
try:
@@ -67,6 +41,44 @@ def main():
if is_admin():
import uvicorn
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
from core import Config
await Config.init_config()
yield
logger.info("AUTO_MAA 后端程序关闭")
logger.info("----------------END----------------")
from fastapi.middleware.cors import CORSMiddleware
from api import scripts_router, queue_router, setting_router
app = FastAPI(
title="AUTO_MAA",
description="API for managing automation scripts, plans, and tasks",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有域名跨域访问
allow_credentials=True,
allow_methods=["*"], # 允许所有请求方法,如 GET、POST、PUT、DELETE
allow_headers=["*"], # 允许所有请求头
)
app.include_router(scripts_router)
app.include_router(queue_router)
app.include_router(setting_router)
uvicorn.run(app, host="0.0.0.0", port=8000)
else:

View File

@@ -74,6 +74,22 @@ class OptionsValidator(ConfigValidator):
return value if self.validate(value) else self.options[0]
class UidValidator(ConfigValidator):
"""UID验证器"""
def validate(self, value: str) -> bool:
if value == "":
return True
try:
uuid.UUID(value)
return True
except ValueError:
return False
def correct(self, value: str) -> str:
return ""
class BoolValidator(OptionsValidator):
"""布尔值验证器"""

View File

@@ -74,6 +74,61 @@ class UserDeleteIn(UserInBase):
userId: str = Field(..., description="用户ID")
class QueueCreateOut(OutBase):
queueId: str = Field(..., description="新创建的队列ID")
data: Dict[str, Any] = Field(..., description="队列配置数据")
class QueueGetIn(BaseModel):
queueId: Optional[str] = Field(None, description="队列ID仅在模式为Single时需要")
class QueueGetOut(OutBase):
index: List[Dict[str, str]] = Field(..., description="队列索引列表")
data: Dict[str, Any] = Field(..., description="队列列表或单个队列数据")
class QueueUpdateIn(BaseModel):
queueId: str = Field(..., description="队列ID")
data: Dict[str, Dict[str, Any]] = Field(..., description="队列更新数据")
class QueueDeleteIn(BaseModel):
queueId: str = Field(..., description="队列ID")
class QueueSetInBase(BaseModel):
queueId: str = Field(..., description="所属队列ID")
class TimeSetCreateOut(OutBase):
timeSetId: str = Field(..., description="新创建的时间设置ID")
data: Dict[str, Any] = Field(..., description="时间设置配置数据")
class TimeSetUpdateIn(QueueSetInBase):
timeSetId: str = Field(..., description="时间设置ID")
data: Dict[str, Dict[str, Any]] = Field(..., description="时间设置更新数据")
class TimeSetDeleteIn(QueueSetInBase):
timeSetId: str = Field(..., description="时间设置ID")
class QueueItemCreateOut(OutBase):
queueItemId: str = Field(..., description="新创建的队列项ID")
data: Dict[str, Any] = Field(..., description="队列项配置数据")
class QueueItemUpdateIn(QueueSetInBase):
queueItemId: str = Field(..., description="队列项ID")
data: Dict[str, Dict[str, Any]] = Field(..., description="队列项更新数据")
class QueueItemDeleteIn(QueueSetInBase):
queueItemId: str = Field(..., description="队列项ID")
class SettingGetOut(OutBase):
data: Dict[str, Dict[str, Any]] = Field(..., description="全局设置数据")