Files
AUTO-MAS-test/app/ui/queue_manager.py

534 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# AUTO_MAA:A MAA Multi Account Management and Automation Tool
# Copyright © 2024-2025 DLmaster361
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# Contact: DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA调度队列界面
v4.4
作者DLmaster_361
"""
from PySide6.QtWidgets import (
QWidget,
QVBoxLayout,
QStackedWidget,
QHBoxLayout,
)
from qfluentwidgets import (
Action,
ScrollArea,
FluentIcon,
MessageBox,
HeaderCardWidget,
CommandBar,
)
from typing import List, Dict
from app.core import QueueConfig, Config, MainInfoBar, SoundPlayer, logger
from .Widget import (
SwitchSettingCard,
ComboBoxSettingCard,
LineEditSettingCard,
TimeEditSettingCard,
NoOptionComboBoxSettingCard,
HistoryCard,
PivotArea,
)
class QueueManager(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setObjectName("调度队列")
layout = QVBoxLayout(self)
self.tools = CommandBar()
self.queue_manager = self.QueueSettingBox(self)
# 逐个添加动作
self.tools.addActions(
[
Action(FluentIcon.ADD_TO, "新建调度队列", triggered=self.add_queue),
Action(
FluentIcon.REMOVE_FROM, "删除调度队列", triggered=self.del_queue
),
]
)
self.tools.addSeparator()
self.tools.addActions(
[
Action(FluentIcon.LEFT_ARROW, "向左移动", triggered=self.left_queue),
Action(FluentIcon.RIGHT_ARROW, "向右移动", triggered=self.right_queue),
]
)
layout.addWidget(self.tools)
layout.addWidget(self.queue_manager)
def add_queue(self):
"""添加一个调度队列"""
index = len(Config.queue_dict) + 1
logger.info(f"正在添加调度队列_{index}", module="队列管理")
# 初始化队列配置
queue_config = QueueConfig()
queue_config.load(
Config.app_path / f"config/QueueConfig/调度队列_{index}.json", queue_config
)
queue_config.save()
Config.queue_dict[f"调度队列_{index}"] = {
"Path": Config.app_path / f"config/QueueConfig/调度队列_{index}.json",
"Config": queue_config,
}
# 添加到配置界面
self.queue_manager.add_SettingBox(index)
self.queue_manager.switch_SettingBox(index)
logger.success(f"调度队列_{index} 添加成功", module="队列管理")
MainInfoBar.push_info_bar("success", "操作成功", f"添加 调度队列_{index}", 3000)
SoundPlayer.play("添加调度队列")
def del_queue(self):
"""删除一个调度队列实例"""
name = self.queue_manager.pivot.currentRouteKey()
if name is None:
logger.warning("未选择调度队列", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "未选择调度队列", "请先选择一个调度队列", 5000
)
return None
if name in Config.running_list:
logger.warning("调度队列正在运行", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "调度队列正在运行", "请先停止调度队列", 5000
)
return None
choice = MessageBox("确认", f"确定要删除 {name} 吗?", self.window())
if choice.exec():
logger.info(f"正在删除调度队列 {name}", module="队列管理")
self.queue_manager.clear_SettingBox()
# 删除队列配置文件并同步到相关配置项
Config.queue_dict[name]["Path"].unlink()
for i in range(int(name[5:]) + 1, len(Config.queue_dict) + 1):
if Config.queue_dict[f"调度队列_{i}"]["Path"].exists():
Config.queue_dict[f"调度队列_{i}"]["Path"].rename(
Config.queue_dict[f"调度队列_{i}"]["Path"].with_name(
f"调度队列_{i-1}.json"
)
)
self.queue_manager.show_SettingBox(max(int(name[5:]) - 1, 1))
logger.success(f"{name} 删除成功", module="队列管理")
MainInfoBar.push_info_bar("success", "操作成功", f"删除 {name}", 3000)
SoundPlayer.play("删除调度队列")
def left_queue(self):
"""向左移动调度队列实例"""
name = self.queue_manager.pivot.currentRouteKey()
if name is None:
logger.warning("未选择调度队列", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "未选择调度队列", "请先选择一个调度队列", 5000
)
return None
index = int(name[5:])
if index == 1:
logger.warning("向左移动调度队列时已到达最左端", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "已经是第一个调度队列", "无法向左移动", 5000
)
return None
if name in Config.running_list or f"调度队列_{index-1}" in Config.running_list:
logger.warning("相关调度队列正在运行", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "相关调度队列正在运行", "请先停止调度队列", 5000
)
return None
logger.info(f"正在左移调度队列 {name}", module="队列管理")
self.queue_manager.clear_SettingBox()
# 移动配置文件并同步到相关配置项
Config.queue_dict[name]["Path"].rename(
Config.queue_dict[name]["Path"].with_name("调度队列_0.json")
)
Config.queue_dict[f"调度队列_{index-1}"]["Path"].rename(
Config.queue_dict[name]["Path"]
)
Config.queue_dict[name]["Path"].with_name("调度队列_0.json").rename(
Config.queue_dict[f"调度队列_{index-1}"]["Path"]
)
self.queue_manager.show_SettingBox(index - 1)
logger.success(f"{name} 左移成功", module="队列管理")
MainInfoBar.push_info_bar("success", "操作成功", f"左移 {name}", 3000)
def right_queue(self):
"""向右移动调度队列实例"""
name = self.queue_manager.pivot.currentRouteKey()
if name is None:
logger.warning("未选择调度队列", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "未选择调度队列", "请先选择一个调度队列", 5000
)
return None
index = int(name[5:])
if index == len(Config.queue_dict):
logger.warning("向右移动调度队列时已到达最右端", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "已经是最后一个调度队列", "无法向右移动", 5000
)
return None
if name in Config.running_list or f"调度队列_{index+1}" in Config.running_list:
logger.warning("相关调度队列正在运行", module="队列管理")
MainInfoBar.push_info_bar(
"warning", "相关调度队列正在运行", "请先停止调度队列", 5000
)
return None
logger.info(f"正在右移调度队列 {name}", module="队列管理")
self.queue_manager.clear_SettingBox()
# 移动配置文件并同步到相关配置项
Config.queue_dict[name]["Path"].rename(
Config.queue_dict[name]["Path"].with_name("调度队列_0.json")
)
Config.queue_dict[f"调度队列_{index+1}"]["Path"].rename(
Config.queue_dict[name]["Path"]
)
Config.queue_dict[name]["Path"].with_name("调度队列_0.json").rename(
Config.queue_dict[f"调度队列_{index+1}"]["Path"]
)
self.queue_manager.show_SettingBox(index + 1)
logger.success(f"{name} 右移成功", module="队列管理")
MainInfoBar.push_info_bar("success", "操作成功", f"右移 {name}", 3000)
def reload_script_name(self):
"""刷新调度队列脚本成员名称"""
# 获取脚本成员列表
script_list = [
["禁用"] + [_ for _ in Config.script_dict.keys()],
["未启用"]
+ [
(
k
if v["Config"].get_name() == ""
else f"{k} - {v["Config"].get_name()}"
)
for k, v in Config.script_dict.items()
],
]
for script in self.queue_manager.script_list:
for card in script.task.card_dict.values():
card.reLoadOptions(value=script_list[0], texts=script_list[1])
class QueueSettingBox(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setObjectName("调度队列管理")
self.pivotArea = PivotArea()
self.pivot = self.pivotArea.pivot
self.stackedWidget = QStackedWidget(self)
self.stackedWidget.setContentsMargins(0, 0, 0, 0)
self.stackedWidget.setStyleSheet("background: transparent; border: none;")
self.script_list: List[
QueueManager.QueueSettingBox.QueueMemberSettingBox
] = []
self.Layout = QVBoxLayout(self)
self.Layout.addWidget(self.pivotArea)
self.Layout.addWidget(self.stackedWidget)
self.Layout.setContentsMargins(0, 0, 0, 0)
self.pivot.currentItemChanged.connect(
lambda index: self.switch_SettingBox(
int(index[5:]), if_change_pivot=False
)
)
self.show_SettingBox(1)
def show_SettingBox(self, index) -> None:
"""加载所有子界面"""
Config.search_queue()
for name in Config.queue_dict.keys():
self.add_SettingBox(int(name[5:]))
self.switch_SettingBox(index)
def switch_SettingBox(self, index: int, if_change_pivot: bool = True) -> None:
"""
切换到指定的子界面并切换到指定的子页面
:param index: 要切换到的子界面索引
:param if_change_pivot: 是否更改导航栏当前项
"""
if len(Config.queue_dict) == 0:
return None
if index > len(Config.queue_dict):
return None
if if_change_pivot:
self.pivot.setCurrentItem(self.script_list[index - 1].objectName())
self.stackedWidget.setCurrentWidget(self.script_list[index - 1])
def clear_SettingBox(self) -> None:
"""清空所有子界面"""
for sub_interface in self.script_list:
self.stackedWidget.removeWidget(sub_interface)
sub_interface.deleteLater()
self.script_list.clear()
self.pivot.clear()
def add_SettingBox(self, uid: int) -> None:
"""添加一个调度队列设置界面"""
setting_box = self.QueueMemberSettingBox(uid, self)
self.script_list.append(setting_box)
self.stackedWidget.addWidget(self.script_list[-1])
self.pivot.addItem(routeKey=f"调度队列_{uid}", text=f"调度队列 {uid}")
class QueueMemberSettingBox(QWidget):
def __init__(self, uid: int, parent=None):
super().__init__(parent)
self.setObjectName(f"调度队列_{uid}")
self.config = Config.queue_dict[f"调度队列_{uid}"]["Config"]
self.queue_set = self.QueueSetSettingCard(self.config, self)
self.time = self.TimeSettingCard(self.config, self)
self.task = self.TaskSettingCard(self.config, self)
self.history = HistoryCard(
qconfig=self.config,
configItem=self.config.Data_LastProxyHistory,
parent=self,
)
content_widget = QWidget()
content_layout = QVBoxLayout(content_widget)
content_layout.setContentsMargins(0, 0, 11, 0)
content_layout.addWidget(self.queue_set)
content_layout.addWidget(self.time)
content_layout.addWidget(self.task)
content_layout.addWidget(self.history)
content_layout.addStretch(1)
scrollArea = ScrollArea()
scrollArea.setWidgetResizable(True)
scrollArea.setContentsMargins(0, 0, 0, 0)
scrollArea.setStyleSheet("background: transparent; border: none;")
scrollArea.setWidget(content_widget)
layout = QVBoxLayout(self)
layout.addWidget(scrollArea)
class QueueSetSettingCard(HeaderCardWidget):
def __init__(self, config: QueueConfig, parent=None):
super().__init__(parent)
self.setTitle("队列设置")
self.config = config
self.card_Name = LineEditSettingCard(
icon=FluentIcon.EDIT,
title="调度队列名称",
content="用于标识调度队列的名称",
text="请输入调度队列名称",
qconfig=self.config,
configItem=self.config.QueueSet_Name,
parent=self,
)
self.card_StartUpEnabled = SwitchSettingCard(
icon=FluentIcon.CHECKBOX,
title="启动时运行状态",
content="调度队列启动时运行状态,启用后将在软件启动时自动运行本队列",
qconfig=self.config,
configItem=self.config.QueueSet_StartUpEnabled,
parent=self,
)
self.card_TimeEnable = SwitchSettingCard(
icon=FluentIcon.CHECKBOX,
title="定时运行状态",
content="调度队列定时运行状态,启用时会执行定时任务",
qconfig=self.config,
configItem=self.config.QueueSet_TimeEnabled,
parent=self,
)
self.card_AfterAccomplish = ComboBoxSettingCard(
icon=FluentIcon.POWER_BUTTON,
title="调度队列结束后",
content="选择调度队列结束后的操作",
texts=[
"无动作",
"退出AUTO_MAA",
"睡眠win系统需禁用休眠",
"休眠",
"关机",
"关机(强制)",
],
qconfig=self.config,
configItem=self.config.QueueSet_AfterAccomplish,
parent=self,
)
Layout = QVBoxLayout()
Layout.addWidget(self.card_Name)
Layout.addWidget(self.card_StartUpEnabled)
Layout.addWidget(self.card_TimeEnable)
Layout.addWidget(self.card_AfterAccomplish)
self.viewLayout.addLayout(Layout)
class TimeSettingCard(HeaderCardWidget):
def __init__(self, config: QueueConfig, parent=None):
super().__init__(parent)
self.setTitle("定时设置")
self.config = config
widget_1 = QWidget()
Layout_1 = QVBoxLayout(widget_1)
widget_2 = QWidget()
Layout_2 = QVBoxLayout(widget_2)
Layout = QHBoxLayout()
self.card_dict: Dict[str, TimeEditSettingCard] = {}
for i in range(10):
self.card_dict[f"Time_{i}"] = TimeEditSettingCard(
icon=FluentIcon.STOP_WATCH,
title=f"定时 {i + 1}",
content=None,
qconfig=self.config,
configItem_bool=self.config.config_item_dict["Time"][
f"Enabled_{i}"
],
configItem_time=self.config.config_item_dict["Time"][
f"Set_{i}"
],
parent=self,
)
if i < 5:
Layout_1.addWidget(self.card_dict[f"Time_{i}"])
else:
Layout_2.addWidget(self.card_dict[f"Time_{i}"])
Layout.addWidget(widget_1)
Layout.addWidget(widget_2)
self.viewLayout.addLayout(Layout)
class TaskSettingCard(HeaderCardWidget):
def __init__(self, config: QueueConfig, parent=None):
super().__init__(parent)
self.setTitle("任务队列")
self.config = config
script_list = [
["禁用"] + [_ for _ in Config.script_dict.keys()],
["未启用"]
+ [
(
k
if v["Config"].get_name() == ""
else f"{k} - {v["Config"].get_name()}"
)
for k, v in Config.script_dict.items()
],
]
self.card_dict: Dict[
str,
NoOptionComboBoxSettingCard,
] = {}
Layout = QVBoxLayout()
for i in range(10):
self.card_dict[f"Script_{i}"] = NoOptionComboBoxSettingCard(
icon=FluentIcon.APPLICATION,
title=f"任务实例 {i + 1}",
content=f"{i + 1}个调起的脚本任务实例",
value=script_list[0],
texts=script_list[1],
qconfig=self.config,
configItem=self.config.config_item_dict["Queue"][
f"Script_{i}"
],
parent=self,
)
Layout.addWidget(self.card_dict[f"Script_{i}"])
self.viewLayout.addLayout(Layout)