Files
AUTO-MAS-test/app/models/general.py
2025-07-19 10:13:48 +08:00

1155 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
通用功能组件
v4.4
作者DLmaster_361
"""
from PySide6.QtCore import QObject, Signal, QEventLoop, QFileSystemWatcher, QTimer
import os
import sys
import shutil
import subprocess
from functools import partial
from datetime import datetime, timedelta
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from typing import Union, List, Dict
from app.core import Config, GeneralConfig, GeneralSubConfig, logger
from app.services import Notify, System
from app.utils import ProcessManager
class GeneralManager(QObject):
"""通用脚本通用控制器"""
question = Signal(str, str)
question_response = Signal(bool)
update_sub_info = Signal(str, dict)
push_info_bar = Signal(str, str, str, int)
play_sound = Signal(str)
create_user_list = Signal(list)
update_user_list = Signal(list)
update_log_text = Signal(str)
interrupt = Signal()
accomplish = Signal(dict)
def __init__(
self,
mode: str,
config: Dict[
str,
Union[
str,
Path,
GeneralConfig,
Dict[str, Dict[str, Union[Path, GeneralSubConfig]]],
],
],
sub_config_path: Path = None,
):
super(GeneralManager, self).__init__()
self.sub_list = []
self.mode = mode
self.config_path = config["Path"]
self.name = config["Config"].get(config["Config"].Script_Name)
self.sub_config_path = sub_config_path
self.game_process_manager = ProcessManager()
self.script_process_manager = ProcessManager()
self.log_monitor = QFileSystemWatcher()
self.log_monitor.fileChanged.connect(self.check_script_log)
self.log_monitor_timer = QTimer()
self.log_monitor_timer.timeout.connect(self.refresh_log)
self.monitor_loop = QEventLoop()
self.loge_start_time = datetime.now()
self.script_logs = []
self.script_result = "Wait"
self.script_process_manager.processClosed.connect(self.check_script_log)
self.question_loop = QEventLoop()
self.question_response.connect(self.__capture_response)
self.question_response.connect(self.question_loop.quit)
self.wait_loop = QEventLoop()
self.isInterruptionRequested = False
self.interrupt.connect(self.quit_monitor)
self.task_dict = {}
self.set = config["Config"].toDict()
self.data: Dict[str, Dict[str, Union[Path, dict]]] = {}
if self.mode != "设置通用脚本":
for name, info in config["SubData"].items():
self.data[name] = {
"Path": info["Path"],
"Config": info["Config"].toDict(),
}
self.data = dict(sorted(self.data.items(), key=lambda x: int(x[0][3:])))
logger.success(
f"初始化通用调度器,模式:{self.mode}", module=f"通用调度器-{self.name}"
)
def check_config_info(self) -> bool:
"""检查配置完整性"""
if not (
Path(self.set["Script"]["RootPath"]).exists()
and Path(self.set["Script"]["ScriptPath"]).exists()
and Path(self.set["Script"]["ConfigPath"]).exists()
and Path(self.set["Script"]["LogPath"]).parent.exists()
and self.set["Script"]["LogTimeFormat"]
and self.set["Script"]["ErrorLog"]
) or (
self.set["Game"]["Enabled"] and not Path(self.set["Game"]["Path"]).exists()
):
logger.error("脚本配置缺失", module=f"通用调度器-{self.name}")
self.push_info_bar.emit("error", "脚本配置缺失", "请检查脚本配置!", -1)
return False
return True
def configure(self):
"""提取配置信息"""
self.script_root_path = Path(self.set["Script"]["RootPath"])
self.script_exe_path = Path(self.set["Script"]["ScriptPath"])
self.script_config_path = Path(self.set["Script"]["ConfigPath"])
self.script_log_path = (
Path(self.set["Script"]["LogPath"]).with_stem(
datetime.now().strftime(self.set["Script"]["LogPathFormat"])
)
if self.set["Script"]["LogPathFormat"]
else Path(self.set["Script"]["LogPath"])
)
if not self.script_log_path.exists():
self.script_log_path.parent.mkdir(parents=True, exist_ok=True)
self.script_log_path.touch(exist_ok=True)
self.game_path = Path(self.set["Game"]["Path"])
self.log_time_range = [
self.set["Script"]["LogTimeStart"] - 1,
self.set["Script"]["LogTimeEnd"],
]
self.success_log = (
[_.strip() for _ in self.set["Script"]["SuccessLog"].split("|")]
if self.set["Script"]["SuccessLog"]
else []
)
self.error_log = [_.strip() for _ in self.set["Script"]["ErrorLog"].split("|")]
def run(self):
"""主进程,运行通用脚本代理进程"""
current_date = datetime.now().strftime("%m-%d")
curdate = Config.server_date().strftime("%Y-%m-%d")
begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.mode == "人工排查":
logger.error("通用脚本不支持人工排查模式", module=f"通用调度器-{self.name}")
self.accomplish.emit(
{
"Time": begin_time,
"History": "通用脚本不支持人工排查模式,通用代理进程中止",
}
)
return None
# 检查配置完整性
if not self.check_config_info():
logger.error(
"配置不完整,无法启动通用代理进程", module=f"通用调度器-{self.name}"
)
self.accomplish.emit(
{"Time": begin_time, "History": "由于配置不完整,通用代理进程中止"}
)
return None
self.configure()
# 整理用户数据,筛选需代理的用户
if self.mode != "设置通用脚本":
self.data = dict(sorted(self.data.items(), key=lambda x: int(x[0][3:])))
self.sub_list: List[List[str, str, str]] = [
[_["Config"]["Info"]["Name"], "等待", index]
for index, _ in self.data.items()
if (
_["Config"]["Info"]["RemainedDay"] != 0
and _["Config"]["Info"]["Status"]
)
]
self.create_user_list.emit(self.sub_list)
logger.info(
f"配置列表创建完成,已筛选子配置数:{len(self.sub_list)}",
module=f"通用调度器-{self.name}",
)
# 自动代理模式
if self.mode == "自动代理":
# 执行情况预处理
for _ in self.sub_list:
if self.data[_[2]]["Config"]["Data"]["LastProxyDate"] != curdate:
self.data[_[2]]["Config"]["Data"]["LastProxyDate"] = curdate
self.data[_[2]]["Config"]["Data"]["ProxyTimes"] = 0
_[
0
] += f" - 第{self.data[_[2]]['Config']['Data']['ProxyTimes'] + 1}次代理"
# 开始代理
for sub in self.sub_list:
sub_data = self.data[sub[2]]["Config"]
if self.isInterruptionRequested:
break
if (
self.set["Run"]["ProxyTimesLimit"] == 0
or sub_data["Data"]["ProxyTimes"]
< self.set["Run"]["ProxyTimesLimit"]
):
sub[1] = "运行"
self.update_user_list.emit(self.sub_list)
else:
sub[1] = "跳过"
self.update_user_list.emit(self.sub_list)
continue
logger.info(f"开始代理配置: {sub[0]}", module=f"通用调度器-{self.name}")
sub_start_time = datetime.now()
run_book = False
if not (self.data[sub[2]]["Path"] / "ConfigFiles").exists():
logger.error(
f"配置: {sub[0]} - 未找到配置文件",
module=f"通用调度器-{self.name}",
)
self.push_info_bar.emit(
"error",
"启动通用代理进程失败",
f"未找到{sub[0]}的配置文件!",
-1,
)
run_book = False
continue
# 尝试次数循环
for i in range(self.set["Run"]["RunTimesLimit"]):
if self.isInterruptionRequested or run_book:
break
logger.info(
f"用户: {sub[0]} - 尝试次数: {i + 1}/{self.set['Run']['RunTimesLimit']}",
module=f"通用调度器-{self.name}",
)
# 记录当前时间
self.log_start_time = datetime.now()
# 配置脚本
self.set_sub(sub[2])
# 执行任务前脚本
if (
sub_data["Info"]["IfScriptBeforeTask"]
and Path(sub_data["Info"]["ScriptBeforeTask"]).exists()
):
self.execute_script_task(
Path(sub_data["Info"]["ScriptBeforeTask"]), "脚本前任务"
)
# 启动游戏/模拟器
if self.set["Game"]["Enabled"]:
try:
logger.info(
f"启动游戏/模拟器:{self.game_path},参数:{self.set['Game']['Arguments']}",
module=f"通用调度器-{self.name}",
)
self.game_process_manager.open_process(
self.game_path,
str(self.set["Game"]["Arguments"]).split(" "),
0,
)
except Exception as e:
logger.exception(
f"启动游戏/模拟器时出现异常:{e}",
module=f"通用调度器-{self.name}",
)
self.push_info_bar.emit(
"error",
"启动游戏/模拟器时出现异常",
"请检查游戏/模拟器路径设置",
-1,
)
self.script_result = "游戏/模拟器启动失败"
break
# 添加静默进程标记
if self.set["Game"]["Style"] == "Emulator":
logger.info(
f"添加静默进程标记:{self.game_path}",
module=f"通用调度器-{self.name}",
)
Config.silence_list.append(self.game_path)
self.update_log_text.emit(
f"正在等待游戏/模拟器完成启动\n请等待{self.set['Game']['WaitTime']}s"
)
self.sleep(self.set["Game"]["WaitTime"])
# 10s后移除静默进程标记
if self.set["Game"]["Style"] == "Emulator":
logger.info(
f"10s后移除静默进程标记{self.game_path}",
module=f"通用调度器-{self.name}",
)
QTimer.singleShot(
10000,
partial(Config.silence_list.remove, self.game_path),
)
# 运行脚本任务
logger.info(
f"运行脚本任务:{self.script_exe_path},参数:{self.set['Script']['Arguments']}",
module=f"通用调度器-{self.name}",
)
self.script_process_manager.open_process(
self.script_exe_path,
str(self.set["Script"]["Arguments"]).split(" "),
tracking_time=60 if self.set["Script"]["IfTrackProcess"] else 0,
)
# 监测运行状态
self.start_monitor()
if self.script_result == "Success!":
# 标记任务完成
run_book = True
# 中止相关程序
logger.info(
f"中止相关程序:{self.script_exe_path}",
module=f"通用调度器-{self.name}",
)
self.script_process_manager.kill()
System.kill_process(self.script_exe_path)
if self.set["Game"]["Enabled"]:
logger.info(
f"中止游戏/模拟器进程:{list(self.game_process_manager.tracked_pids)}",
module=f"通用调度器-{self.name}",
)
self.game_process_manager.kill()
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
logger.info(
f"配置: {sub[0]} - 通用脚本进程完成代理任务",
module=f"通用调度器-{self.name}",
)
self.update_log_text.emit(
"检测到通用脚本进程完成代理任务\n正在等待相关程序结束\n请等待10s"
)
self.sleep(10)
# 更新脚本配置文件
if self.set["Script"]["UpdateConfigMode"] in [
"Success",
"Always",
]:
if self.set["Script"]["ConfigPathMode"] == "文件夹":
shutil.copytree(
self.script_config_path,
self.data[sub[2]]["Path"] / "ConfigFiles",
dirs_exist_ok=True,
)
else:
shutil.copy(
self.script_config_path,
self.data[sub[2]]["Path"]
/ "ConfigFiles"
/ self.script_config_path.name,
)
logger.success(
"通用脚本配置文件已更新",
module=f"通用调度器-{self.name}",
)
else:
logger.error(
f"配置: {sub[0]} - 代理任务异常: {self.script_result}",
module=f"通用调度器-{self.name}",
)
# 打印中止信息
# 此时log变量内存储的就是出现异常的日志信息可以保存或发送用于问题排查
self.update_log_text.emit(
f"{self.script_result}\n正在中止相关程序\n请等待10s"
)
# 中止相关程序
logger.info(
f"中止相关程序:{self.script_exe_path}",
module=f"通用调度器-{self.name}",
)
self.script_process_manager.kill()
if self.set["Game"]["Enabled"]:
logger.info(
f"中止游戏/模拟器进程:{list(self.game_process_manager.tracked_pids)}",
module=f"通用调度器-{self.name}",
)
self.game_process_manager.kill()
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
# 推送异常通知
Notify.push_plyer(
"用户自动代理出现异常!",
f"用户 {sub[0].replace("_", " 今天的")}出现一次异常",
f"{sub[0].replace("_", " ")}出现异常",
1,
)
if i == self.set["Run"]["RunTimesLimit"] - 1:
self.play_sound.emit("子任务失败")
else:
self.play_sound.emit(self.script_result)
self.sleep(10)
# 更新脚本配置文件
if self.set["Script"]["UpdateConfigMode"] in [
"Failure",
"Always",
]:
if self.set["Script"]["ConfigPathMode"] == "文件夹":
shutil.copytree(
self.script_config_path,
self.data[sub[2]]["Path"] / "ConfigFiles",
dirs_exist_ok=True,
)
else:
shutil.copy(
self.script_config_path,
self.data[sub[2]]["Path"]
/ "ConfigFiles"
/ self.script_config_path.name,
)
logger.success(
"通用脚本配置文件已更新",
module=f"通用调度器-{self.name}",
)
# 执行任务后脚本
if (
sub_data["Info"]["IfScriptAfterTask"]
and Path(sub_data["Info"]["ScriptAfterTask"]).exists()
):
self.execute_script_task(
Path(sub_data["Info"]["ScriptAfterTask"]), "脚本后任务"
)
# 保存运行日志以及统计信息
Config.save_general_log(
Config.app_path
/ f"history/{curdate}/{sub_data['Info']['Name']}/{self.log_start_time.strftime("%H-%M-%S")}.log",
self.script_logs,
self.script_result,
)
# 发送统计信息
statistics = {
"sub_index": sub[2],
"sub_info": sub[0],
"start_time": sub_start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"sub_result": "代理成功" if run_book else self.script_result,
}
self.push_notification(
"统计信息",
f"{current_date} | 配置 {sub[0]} 的自动代理统计报告",
statistics,
sub_data,
)
if run_book:
# 成功完成代理的用户修改相关参数
if (
sub_data["Data"]["ProxyTimes"] == 0
and sub_data["Info"]["RemainedDay"] != -1
):
sub_data["Info"]["RemainedDay"] -= 1
sub_data["Data"]["ProxyTimes"] += 1
sub[1] = "完成"
logger.success(
f"配置: {sub[0]} - 代理任务完成",
module=f"通用调度器-{self.name}",
)
Notify.push_plyer(
"成功完成一个自动代理任务!",
f"已完成配置 {sub[0].replace("_", " 今天的")}任务",
f"已完成 {sub[0].replace("_", "")}",
3,
)
else:
# 录入代理失败的用户
sub[1] = "异常"
logger.error(
f"配置: {sub[0]} - 代理任务异常: {self.script_result}",
module=f"通用调度器-{self.name}",
)
self.update_user_list.emit(self.sub_list)
# 设置通用脚本模式
elif self.mode == "设置通用脚本":
# 配置通用脚本
self.set_sub()
try:
# 创建通用脚本任务
logger.info(
f"无参数启动通用脚本:{self.script_exe_path}",
module=f"通用调度器-{self.name}",
)
self.script_process_manager.open_process(
self.script_exe_path,
tracking_time=60 if self.set["Script"]["IfTrackProcess"] else 0,
)
# 记录当前时间
self.log_start_time = datetime.now()
# 监测通用脚本运行状态
self.start_monitor()
self.sub_config_path.mkdir(parents=True, exist_ok=True)
if self.set["Script"]["ConfigPathMode"] == "文件夹":
shutil.copytree(
self.script_config_path,
self.sub_config_path,
dirs_exist_ok=True,
)
logger.success(
f"通用脚本配置已保存到:{self.sub_config_path}",
module=f"通用调度器-{self.name}",
)
else:
shutil.copy(self.script_config_path, self.sub_config_path)
logger.success(
f"通用脚本配置已保存到:{self.sub_config_path}",
module=f"通用调度器-{self.name}",
)
except Exception as e:
logger.exception(
f"启动通用脚本时出现异常:{e}", module=f"通用调度器-{self.name}"
)
self.push_info_bar.emit(
"error", "启动通用脚本时出现异常", "请检查相关设置", -1
)
result_text = ""
# 导出结果
if self.mode in ["自动代理"]:
# 关闭可能未正常退出的通用脚本进程
if self.isInterruptionRequested:
logger.info(
f"关闭可能未正常退出的通用脚本进程:{self.script_exe_path}",
module=f"通用调度器-{self.name}",
)
self.script_process_manager.kill(if_force=True)
System.kill_process(self.script_exe_path)
if self.set["Game"]["Enabled"]:
logger.info(
f"关闭可能未正常退出的游戏/模拟器进程:{list(self.game_process_manager.tracked_pids)}",
module=f"通用调度器-{self.name}",
)
self.game_process_manager.kill(if_force=True)
if self.set["Game"]["IfForceClose"]:
System.kill_process(self.game_path)
# 更新用户数据
updated_info = {_[2]: self.data[_[2]] for _ in self.sub_list}
self.update_sub_info.emit(self.config_path.name, updated_info)
error_index = [_[2] for _ in self.sub_list if _[1] == "异常"]
over_index = [_[2] for _ in self.sub_list if _[1] == "完成"]
wait_index = [_[2] for _ in self.sub_list if _[1] == "等待"]
# 保存运行日志
title = (
f"{current_date} | {self.name}{self.mode[:4]}任务报告"
if self.name != ""
else f"{current_date} | {self.mode[:4]}任务报告"
)
result = {
"title": f"{self.mode[:4]}任务报告",
"script_name": (self.name if self.name != "" else "空白"),
"start_time": begin_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"completed_count": len(over_index),
"uncompleted_count": len(error_index) + len(wait_index),
"failed_sub": [
self.data[_]["Config"]["Info"]["Name"] for _ in error_index
],
"waiting_sub": [
self.data[_]["Config"]["Info"]["Name"] for _ in wait_index
],
}
# 生成结果文本
result_text = (
f"任务开始时间:{result['start_time']},结束时间:{result['end_time']}\n"
f"已完成数:{result['completed_count']},未完成数:{result['uncompleted_count']}\n\n"
)
if len(result["failed_sub"]) > 0:
result_text += f"{self.mode[2:4]}未成功的配置:\n{"\n".join(result['failed_sub'])}\n"
if len(result["waiting_sub"]) > 0:
result_text += f"\n未开始{self.mode[2:4]}的配置:\n{"\n".join(result['waiting_sub'])}\n"
# 推送代理结果通知
Notify.push_plyer(
title.replace("报告", "已完成!"),
f"已完成配置数:{len(over_index)},未完成配置数:{len(error_index) + len(wait_index)}",
f"已完成配置数:{len(over_index)},未完成配置数:{len(error_index) + len(wait_index)}",
10,
)
self.push_notification("代理结果", title, result)
self.log_monitor.deleteLater()
self.log_monitor_timer.deleteLater()
self.accomplish.emit({"Time": begin_time, "History": result_text})
def requestInterruption(self) -> None:
"""请求中止通用脚本任务"""
logger.info(f"收到任务中止申请", module=f"通用调度器-{self.name}")
if len(self.log_monitor.files()) != 0:
self.interrupt.emit()
self.script_result = "任务被手动中止"
self.isInterruptionRequested = True
self.wait_loop.quit()
def push_question(self, title: str, message: str) -> bool:
"""推送问题询问"""
logger.info(
f"推送问题询问:{title} - {message}", module=f"通用调度器-{self.name}"
)
self.question.emit(title, message)
self.question_loop.exec()
return self.response
def __capture_response(self, response: bool) -> None:
"""捕获问题询问的响应"""
logger.info(f"捕获问题询问的响应:{response}", module=f"通用调度器-{self.name}")
self.response = response
def sleep(self, time: int) -> None:
"""非阻塞型等待"""
logger.info(f"等待 {time}", module=f"通用调度器-{self.name}")
QTimer.singleShot(time * 1000, self.wait_loop.quit)
self.wait_loop.exec()
def refresh_log(self) -> None:
"""刷新脚本日志"""
logger.debug(
f"刷新通用脚本日志:{self.script_log_path}",
module=f"通用调度器-{self.name}",
)
try:
with self.script_log_path.open(mode="r", encoding="utf-8") as f:
pass
except FileNotFoundError:
logger.error(
f"脚本日志文件不存在:{self.script_log_path}",
module=f"通用调度器-{self.name}",
)
except Exception as e:
logger.exception(f"刷新脚本日志失败:{e}", module=f"通用调度器-{self.name}")
# 一分钟内未执行日志变化检查,强制检查一次
if (datetime.now() - self.last_check_time).total_seconds() > 60:
logger.info("触发 1 分钟超时检查", module=f"通用调度器-{self.name}")
self.check_script_log()
def strptime(
self, date_string: str, format: str, default_date: datetime
) -> datetime:
"""根据指定格式解析日期字符串"""
# 时间字段映射表
time_fields = {
"%Y": "year",
"%m": "month",
"%d": "day",
"%H": "hour",
"%M": "minute",
"%S": "second",
"%f": "microsecond",
}
date = datetime.strptime(date_string, format)
# 构建参数字典
datetime_kwargs = {}
for format_code, field_name in time_fields.items():
if format_code in format:
datetime_kwargs[field_name] = getattr(date, field_name)
else:
datetime_kwargs[field_name] = getattr(default_date, field_name)
return datetime(**datetime_kwargs)
def check_script_log(self) -> None:
"""获取脚本日志并检查以判断脚本程序运行状态"""
self.last_check_time = datetime.now()
# 获取日志
self.script_logs = []
if_log_start = False
try:
with self.script_log_path.open(mode="r", encoding="utf-8") as f:
for entry in f:
if not if_log_start:
try:
entry_time = self.strptime(
entry[self.log_time_range[0] : self.log_time_range[1]],
self.set["Script"]["LogTimeFormat"],
self.last_check_time,
)
if entry_time > self.log_start_time:
if_log_start = True
self.script_logs.append(entry)
except ValueError:
pass
else:
self.script_logs.append(entry)
except FileNotFoundError:
logger.error(
f"脚本日志文件不存在:{self.script_log_path}",
module=f"通用调度器-{self.name}",
)
self.update_log_text.emit("脚本日志文件不存在")
return None
except Exception as e:
logger.exception(
f"读取脚本日志文件失败:{e}", module=f"通用调度器-{self.name}"
)
self.update_log_text.emit(f"读取脚本日志文件失败:{e}")
return None
log = "".join(self.script_logs)
# 更新日志
if len(self.script_logs) > 100:
self.update_log_text.emit("".join(self.script_logs[-100:]))
else:
self.update_log_text.emit("".join(self.script_logs))
if "自动代理" in self.mode:
# 获取最近一条日志的时间
latest_time = self.log_start_time
for _ in self.script_logs[::-1]:
try:
latest_time = self.strptime(
_[self.log_time_range[0] : self.log_time_range[1]],
self.set["Script"]["LogTimeFormat"],
self.last_check_time,
)
break
except ValueError:
pass
logger.info(
f"通用脚本最近一条日志时间:{latest_time}",
module=f"通用调度器-{self.name}",
)
for success_sign in self.success_log:
if success_sign in log:
self.script_result = "Success!"
break
else:
if self.isInterruptionRequested:
self.script_result = "任务被手动中止"
elif datetime.now() - latest_time > timedelta(
minutes=self.set["Run"]["RunTimeLimit"]
):
self.script_result = "脚本进程超时"
else:
for error_sign in self.error_log:
if error_sign in log:
self.script_result = f"异常日志:{error_sign}"
break
else:
if self.script_process_manager.is_running():
self.script_result = "Wait"
elif self.success_log:
self.script_result = "脚本在完成任务前退出"
else:
self.script_result = "Success!"
elif self.mode == "设置通用脚本":
if self.script_process_manager.is_running():
self.script_result = "Wait"
else:
self.script_result = "Success!"
logger.info(
f"通用脚本日志分析结果:{self.script_result}",
module=f"通用调度器-{self.name}",
)
if self.script_result != "Wait":
self.quit_monitor()
def start_monitor(self) -> None:
"""开始监视通用脚本日志"""
logger.info(
f"开始监视通用脚本日志,路径:{self.script_log_path},日志起始时间:{self.log_start_time}",
module=f"通用调度器-{self.name}",
)
self.log_monitor.addPath(str(self.script_log_path))
self.log_monitor_timer.start(1000)
self.last_check_time = datetime.now()
self.monitor_loop.exec()
def quit_monitor(self) -> None:
"""退出通用脚本日志监视进程"""
if len(self.log_monitor.files()) != 0:
logger.info(
f"通用脚本日志监视器移除路径:{self.script_log_path}",
module=f"通用调度器-{self.name}",
)
self.log_monitor.removePath(str(self.script_log_path))
else:
logger.warning(
f"通用脚本日志监视器没有正在监看的路径:{self.log_monitor.files()}",
module=f"通用调度器-{self.name}",
)
self.log_monitor_timer.stop()
self.last_check_time = None
self.monitor_loop.quit()
logger.info("通用脚本日志监视锁已释放", module=f"通用调度器-{self.name}")
def set_sub(self, index: str = "") -> dict:
"""配置通用脚本运行参数"""
logger.info(f"开始配置脚本运行参数:{index}", module=f"通用调度器-{self.name}")
# 配置前关闭可能未正常退出的脚本进程
System.kill_process(self.script_exe_path)
# 预导入配置文件
if self.mode == "设置通用脚本":
if self.sub_config_path.exists():
if self.set["Script"]["ConfigPathMode"] == "文件夹":
shutil.copytree(
self.sub_config_path,
self.script_config_path,
dirs_exist_ok=True,
)
elif (self.sub_config_path / self.script_config_path.name).exists():
shutil.copy(
self.sub_config_path / self.script_config_path.name,
self.script_config_path,
)
else:
if self.set["Script"]["ConfigPathMode"] == "文件夹":
shutil.copytree(
self.data[index]["Path"] / "ConfigFiles",
self.script_config_path,
dirs_exist_ok=True,
)
else:
shutil.copy(
self.data[index]["Path"]
/ "ConfigFiles"
/ self.script_config_path.name,
self.script_config_path,
)
logger.info(f"脚本运行参数配置完成:{index}", module=f"通用调度器-{self.name}")
def execute_script_task(self, script_path: Path, task_name: str) -> bool:
"""执行脚本任务并等待结束"""
try:
logger.info(
f"开始执行{task_name}: {script_path}", module=f"通用调度器-{self.name}"
)
# 根据文件类型选择执行方式
if script_path.suffix.lower() == ".py":
cmd = [sys.executable, script_path]
elif script_path.suffix.lower() in [".bat", ".cmd", ".exe"]:
cmd = [str(script_path)]
elif script_path.suffix.lower() == "":
logger.warning(
f"{task_name}脚本没有指定后缀名,无法执行",
module=f"通用调度器-{self.name}",
)
return False
else:
# 使用系统默认程序打开
os.startfile(str(script_path))
return True
# 执行脚本并等待结束
result = subprocess.run(
cmd,
cwd=script_path.parent,
stdin=subprocess.DEVNULL,
creationflags=(
subprocess.CREATE_NO_WINDOW
if Config.get(Config.function_IfSilence)
else 0
),
timeout=600,
capture_output=True,
errors="ignore",
)
if result.returncode == 0:
logger.info(f"{task_name}执行成功", module=f"通用调度器-{self.name}")
if result.stdout.strip():
logger.info(
f"{task_name}输出: {result.stdout}",
module=f"通用调度器-{self.name}",
)
return True
else:
logger.error(
f"{task_name}执行失败,返回码: {result.returncode}",
module=f"通用调度器-{self.name}",
)
if result.stderr.strip():
logger.error(
f"{task_name}错误输出: {result.stderr}",
module=f"通用调度器-{self.name}",
)
return False
except subprocess.TimeoutExpired:
logger.error(f"{task_name}执行超时", module=f"通用调度器-{self.name}")
return False
except Exception as e:
logger.exception(
f"执行{task_name}时出现异常: {e}", module=f"通用调度器-{self.name}"
)
return False
def push_notification(
self,
mode: str,
title: str,
message: Union[str, dict],
sub_data: Dict[str, Dict[str, Union[str, int, bool]]] = None,
) -> None:
"""通过所有渠道推送通知"""
logger.info(
f"开始推送通知,模式:{mode},标题:{title}",
module=f"通用调度器-{self.name}",
)
env = Environment(
loader=FileSystemLoader(str(Config.app_path / "resources/html"))
)
if mode == "代理结果" and (
Config.get(Config.notify_SendTaskResultTime) == "任何时刻"
or (
Config.get(Config.notify_SendTaskResultTime) == "仅失败时"
and message["uncompleted_count"] != 0
)
):
# 生成文本通知内容
message_text = (
f"任务开始时间:{message['start_time']},结束时间:{message['end_time']}\n"
f"已完成数:{message['completed_count']},未完成数:{message['uncompleted_count']}\n\n"
)
if len(message["failed_sub"]) > 0:
message_text += f"{self.mode[2:4]}未成功的配置:\n{"\n".join(message['failed_sub'])}\n"
if len(message["waiting_sub"]) > 0:
message_text += f"\n未开始{self.mode[2:4]}的配置:\n{"\n".join(message['waiting_sub'])}\n"
# 生成HTML通知内容
message["failed_sub"] = "".join(message["failed_sub"])
message["waiting_sub"] = "".join(message["waiting_sub"])
template = env.get_template("general_result.html")
message_html = template.render(message)
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
serverchan_message = message_text.replace("\n", "\n\n")
# 发送全局通知
if Config.get(Config.notify_IfSendMail):
Notify.send_mail(
"网页", title, message_html, Config.get(Config.notify_ToAddress)
)
if Config.get(Config.notify_IfServerChan):
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
)
if Config.get(Config.notify_IfCompanyWebHookBot):
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
elif mode == "统计信息":
message_text = (
f"开始时间: {message['start_time']}\n"
f"结束时间: {message['end_time']}\n"
f"通用脚本执行结果: {message['sub_result']}\n\n"
)
# 生成HTML通知内容
template = env.get_template("general_statistics.html")
message_html = template.render(message)
# ServerChan的换行是两个换行符。故而将\n替换为\n\n
serverchan_message = message_text.replace("\n", "\n\n")
# 发送全局通知
if Config.get(Config.notify_IfSendStatistic):
if Config.get(Config.notify_IfSendMail):
Notify.send_mail(
"网页", title, message_html, Config.get(Config.notify_ToAddress)
)
if Config.get(Config.notify_IfServerChan):
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_ServerChanKey),
Config.get(Config.notify_ServerChanTag),
Config.get(Config.notify_ServerChanChannel),
)
if Config.get(Config.notify_IfCompanyWebHookBot):
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
Config.get(Config.notify_CompanyWebHookBotUrl),
)
# 发送用户单独通知
if sub_data["Notify"]["Enabled"] and sub_data["Notify"]["IfSendStatistic"]:
# 发送邮件通知
if sub_data["Notify"]["IfSendMail"]:
if sub_data["Notify"]["ToAddress"]:
Notify.send_mail(
"网页",
title,
message_html,
sub_data["Notify"]["ToAddress"],
)
else:
logger.error(f"用户邮箱地址为空,无法发送用户单独的邮件通知")
# 发送ServerChan通知
if sub_data["Notify"]["IfServerChan"]:
if sub_data["Notify"]["ServerChanKey"]:
Notify.ServerChanPush(
title,
f"{serverchan_message}\n\nAUTO_MAA 敬上",
sub_data["Notify"]["ServerChanKey"],
sub_data["Notify"]["ServerChanTag"],
sub_data["Notify"]["ServerChanChannel"],
)
else:
logger.error(
f"{self.name} |用户ServerChan密钥为空无法发送用户单独的ServerChan通知"
)
# 推送CompanyWebHookBot通知
if sub_data["Notify"]["IfCompanyWebHookBot"]:
if sub_data["Notify"]["CompanyWebHookBotUrl"]:
Notify.CompanyWebHookBotPush(
title,
f"{message_text}\n\nAUTO_MAA 敬上",
sub_data["Notify"]["CompanyWebHookBotUrl"],
)
else:
logger.error(
f"{self.name} |用户CompanyWebHookBot密钥为空无法发送用户单独的CompanyWebHookBot通知"
)
return None