refactor(ui): 重构历史记录保存与载入逻辑

This commit is contained in:
DLmaster361
2025-07-15 16:08:11 +08:00
parent 12cf10f97a
commit 403f69df8b
4 changed files with 135 additions and 219 deletions

View File

@@ -707,7 +707,7 @@ class GeneralSubConfig(LQConfig):
class AppConfig(GlobalConfig):
VERSION = "4.4.0.6"
VERSION = "4.4.0.0"
stage_refreshed = Signal()
PASSWORD_refreshed = Signal()
@@ -1386,7 +1386,7 @@ class AppConfig(GlobalConfig):
logger.warning(f"保存历史记录时未找到调度队列: {key}")
def save_maa_log(self, log_path: Path, logs: list, maa_result: str) -> bool:
"""保存MAA日志并生成初步统计数据"""
"""保存MAA日志并生成对应统计数据"""
data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = {
"recruit_statistics": defaultdict(int),
@@ -1507,117 +1507,77 @@ class AppConfig(GlobalConfig):
logger.info(f"处理完成:{log_path}")
self.merge_maa_logs("所有项", log_path.parent)
return if_six_star
def merge_maa_logs(self, mode: str, logs_path: Union[Path, List[Path]]) -> dict:
def merge_statistic_info(self, statistic_path_list: List[Path]) -> dict:
"""合并指定数据统计信息文件"""
data = {
"recruit_statistics": defaultdict(int),
"drop_statistics": defaultdict(dict),
"maa_result": defaultdict(str),
}
data = {"index": {}}
if mode == "所有项":
logs_path_list = list(logs_path.glob("*.json"))
elif mode == "指定项":
logs_path_list = logs_path
for json_file in logs_path_list:
for json_file in statistic_path_list:
with json_file.open("r", encoding="utf-8") as f:
single_data: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = (
json.load(f)
)
# 合并公招统计
for star_level, count in single_data["recruit_statistics"].items():
data["recruit_statistics"][star_level] += count
for key in single_data.keys():
# 合并掉落统计
for stage, drops in single_data["drop_statistics"].items():
if stage not in data["drop_statistics"]:
data["drop_statistics"][stage] = {} # 初始化关卡
if key not in data:
data[key] = {}
for item, count in drops.items():
# 合并公招统计
if key == "recruit_statistics":
if item in data["drop_statistics"][stage]:
data["drop_statistics"][stage][item] += count
else:
data["drop_statistics"][stage][item] = count
for star_level, count in single_data[key].items():
if star_level not in data[key]:
data[key][star_level] = 0
data[key][star_level] += count
# 合并MAA结果
data["maa_result"][json_file.stem.replace("-", ":")] = single_data[
"maa_result"
]
# 合并掉落统计
if key == "drop_statistics":
# 生成汇总 JSON 文件
if mode == "所有项":
for stage, drops in single_data[key].items():
if stage not in data[key]:
data[key][stage] = {} # 初始化关卡
with logs_path.with_suffix(".json").open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
for item, count in drops.items():
logger.info(f"统计完成:{logs_path.with_suffix('.json')}")
if item not in data[key][stage]:
data[key][stage][item] = 0
data[key][stage][item] += count
return data
# 录入MAA结果
if key == "maa_result":
def load_maa_logs(
self, mode: str, json_path: Path
) -> Dict[str, Union[str, list, Dict[str, list]]]:
"""加载MAA日志统计信息"""
actual_date = datetime.strptime(
f"{json_file.parent.parent.name} {json_file.stem}",
"%Y-%m-%d %H-%M-%S",
) + timedelta(
days=(
1
if datetime.strptime(json_file.stem, "%H-%M-%S").time()
< datetime.min.time().replace(hour=4)
else 0
)
)
if mode == "总览":
if single_data[key] != "Success!":
if "error_info" not in data:
data["error_info"] = {}
data["error_info"][actual_date.strftime("%d%H:%M:%S")] = (
single_data[key]
)
with json_path.open("r", encoding="utf-8") as f:
info: Dict[str, Dict[str, Union[int, dict]]] = json.load(f)
data["index"][actual_date] = [
actual_date.strftime("%d%H:%M:%S"),
("完成" if single_data["maa_result"] == "Success!" else "异常"),
json_file,
]
data = {}
# 4点前的记录放在当日最后
sorted_maa_result = sorted(
info["maa_result"].items(),
key=lambda x: (
(
1
if datetime.strptime(x[0], "%H:%M:%S").time()
< datetime.min.time().replace(hour=4)
else 0
),
datetime.strptime(x[0], "%H:%M:%S"),
),
)
data["条目索引"] = [
[k, "完成" if v == "Success!" else "异常"] for k, v in sorted_maa_result
]
data["条目索引"].insert(0, ["数据总览", "运行"])
data["统计数据"] = {"公招统计": list(info["recruit_statistics"].items())}
data["index"] = [data["index"][_] for _ in sorted(data["index"])]
for game_id, drops in info["drop_statistics"].items():
data["统计数据"][f"掉落统计:{game_id}"] = list(drops.items())
data["统计数据"]["报错汇总"] = [
[k, v] for k, v in info["maa_result"].items() if v != "Success!"
]
elif mode == "单项":
with json_path.open("r", encoding="utf-8") as f:
info: Dict[str, Union[str, Dict[str, Union[int, dict]]]] = json.load(f)
data = {}
data["统计数据"] = {"公招统计": list(info["recruit_statistics"].items())}
for game_id, drops in info["drop_statistics"].items():
data["统计数据"][f"掉落统计:{game_id}"] = list(drops.items())
with json_path.with_suffix(".log").open("r", encoding="utf-8") as f:
log = f.read()
data["日志信息"] = log
return data
return {k: v for k, v in data.items() if v}
def search_history(
self, mode: str, start_date: datetime, end_date: datetime
@@ -1638,43 +1598,28 @@ class AppConfig(GlobalConfig):
continue # 只统计在范围内的日期
if mode == "按日合并":
history_dict[date.strftime("%Y年 %m月 %d")] = list(
date_folder.glob("*.json")
)
date_name = date.strftime("%Y年 %m月 %d")
elif mode == "按周合并":
year, week, _ = date.isocalendar()
if f"{year}年 第{week}" not in history_dict:
history_dict[f"{year}年 第{week}"] = {}
for user in date_folder.glob("*.json"):
if user.stem not in history_dict[f"{year}年 第{week}"]:
history_dict[f"{year}年 第{week}"][user.stem] = list(
user.with_suffix("").glob("*.json")
)
else:
history_dict[f"{year}年 第{week}"][user.stem] += list(
user.with_suffix("").glob("*.json")
)
date_name = f"{year}年 第{week}"
elif mode == "按月合并":
date_name = date.strftime("%Y年 %m月")
if date.strftime("%Y年 %m月") not in history_dict:
history_dict[date.strftime("%Y年 %m月")] = {}
if date_name not in history_dict:
history_dict[date_name] = {}
for user in date_folder.glob("*.json"):
for user_folder in date_folder.iterdir():
if not user_folder.is_dir():
continue # 只处理用户文件夹
if user.stem not in history_dict[date.strftime("%Y年 %m月")]:
history_dict[date.strftime("%Y年 %m月")][user.stem] = list(
user.with_suffix("").glob("*.json")
)
else:
history_dict[date.strftime("%Y年 %m月")][user.stem] += list(
user.with_suffix("").glob("*.json")
)
if user_folder.stem not in history_dict[date_name]:
history_dict[date_name][user_folder.stem] = list(
user_folder.with_suffix("").glob("*.json")
)
else:
history_dict[date_name][user_folder.stem] += list(
user_folder.with_suffix("").glob("*.json")
)
except ValueError:
logger.warning(f"非日期格式的目录: {date_folder}")