初步完成调度队列开发

This commit is contained in:
DLmaster
2025-01-05 19:24:01 +08:00
parent f94e129cba
commit 684211c129
14 changed files with 2527 additions and 957 deletions

View File

@@ -10,68 +10,7 @@ class Main(QWidget):
self.notify = notify
self.crypto = crypto
self.PASSWORD = ""
self.if_user_list_editable = True
self.if_update_database = True
self.if_update_config = True
self.user_mode_list = ["simple", "beta"]
self.user_column = [
"admin",
"id",
"server",
"day",
"status",
"last",
"game",
"game_1",
"game_2",
"routine",
"annihilation",
"infrastructure",
"password",
"notes",
"numb",
"mode",
"uid",
]
self.userlist_simple_index = [
0,
1,
2,
3,
4,
5,
6,
7,
8,
"-",
9,
10,
11,
12,
"-",
"-",
"-",
]
self.userlist_beta_index = [
0,
"-",
"-",
1,
2,
3,
"-",
"-",
"-",
4,
5,
"-",
6,
7,
"-",
"-",
"-",
]
# uiLoader.registerCustomWidget(PushButton)
# uiLoader.registerCustomWidget(LineEdit)
@@ -172,7 +111,7 @@ class Main(QWidget):
# )
# self.if_self_start.stateChanged.connect(self.change_config)
# self.if_sleep: CheckBox = self.ui.findChild(CheckBox, "checkBox_ifsleep")
# self.if_sleep: CheckBox = self.ui.findChild(CheckBox, "checkBox_IfAllowSleep")
# self.if_sleep.stateChanged.connect(self.change_config)
# self.if_proxy_directly: CheckBox = self.ui.findChild(
@@ -262,279 +201,79 @@ class Main(QWidget):
if self.config.content["Default"]["SelfSet.IfProxyDirectly"] == "True":
self.maa_starter("日常代理")
def check_PASSWORD(self) -> None:
"""检查并配置管理密钥"""
if self.config.key_path.exists():
return None
while True:
# def update_config(self):
# """将self.config中的程序配置同步至GUI界面"""
if self.read("setkey"):
self.crypto.get_PASSWORD(self.PASSWORD)
break
else:
choice = MessageBox(
"确认", "您没有输入管理密钥,确定要暂时跳过这一步吗?", self.ui
)
if choice.exec():
break
# # 阻止GUI程序配置被立即读入程序形成死循环
# self.if_update_config = False
def update_user_info(self, operation: str) -> None:
"""将本地数据库中的用户配置同步至GUI的用户管理界面"""
# self.main_tab.setCurrentIndex(
# self.config.content["Default"]["SelfSet.MainIndex"]
# )
# 读入本地数据库
self.config.cur.execute("SELECT * FROM adminx WHERE True")
data = self.config.cur.fetchall()
# self.maa_path.setText(str(Path(self.config.content["Default"]["MaaSet.path"])))
# self.routine.setValue(self.config.content["Default"]["TimeLimit.routine"])
# self.annihilation.setValue(
# self.config.content["Default"]["TimeLimit.annihilation"]
# )
# self.num.setValue(self.config.content["Default"]["TimesLimit.run"])
# self.mail_address.setText(self.config.content["Default"]["SelfSet.MailAddress"])
# self.boss_key.setText(self.config.content["Default"]["SelfSet.BossKey"])
# 处理部分模式调整
if operation == "clear":
self.PASSWORD = ""
elif operation == "read_only":
self.if_user_list_editable = False
elif operation == "editable":
self.if_user_list_editable = True
# self.if_self_start.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfSelfStart"] == "True")
# )
# 阻止GUI用户数据被立即写入数据库形成死循环
self.if_update_database = False
# self.if_sleep.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfAllowSleep"] == "True")
# )
user_switch_list = ["转为高级", "转为简洁"]
self.user_switch.setText(user_switch_list[self.user_set.currentIndex()])
# self.if_proxy_directly.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfProxyDirectly"] == "True")
# )
# 同步简洁用户配置列表
data_simple = [_ for _ in data if _[15] == "simple"]
self.user_list_simple.setRowCount(len(data_simple))
# self.if_send_mail.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
# )
for i, row in enumerate(data_simple):
# self.mail_address.setVisible(
# bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
# )
for j, value in enumerate(row):
# self.if_send_error_only.setChecked(
# bool(
# self.config.content["Default"]["SelfSet.IfSendMail.OnlyError"] == "True"
# )
# )
if self.userlist_simple_index[j] == "-":
continue
# self.if_send_error_only.setVisible(
# bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
# )
# 生成表格组件
if j == 2:
item = ComboBox()
item.addItems(["官服", "B服"])
if value == "Official":
item.setCurrentIndex(0)
elif value == "Bilibili":
item.setCurrentIndex(1)
item.currentIndexChanged.connect(
partial(
self.change_user_CellWidget,
data_simple[i][16],
self.user_column[j],
)
)
elif j in [4, 10, 11]:
item = QComboBox()
if j in [4, 10]:
item.addItems(["启用", "禁用"])
elif j == 11:
item.addItems(["启用", "禁用", "更改配置文件"])
if value == "y":
item.setCurrentIndex(0)
elif value == "n":
item.setCurrentIndex(1)
item.currentIndexChanged.connect(
partial(
self.change_user_CellWidget,
data_simple[i][16],
self.user_column[j],
)
)
elif j == 3 and value == -1:
item = QTableWidgetItem("无限")
elif j == 5:
curdate = self.server_date()
if curdate != value:
item = QTableWidgetItem("今日未代理")
else:
item = QTableWidgetItem(f"今日已代理{data_simple[i][14]}")
item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled)
elif j == 12:
if self.PASSWORD == "":
item = QTableWidgetItem("******")
item.setFlags(
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
)
else:
result = self.crypto.decryptx(value, self.PASSWORD)
item = QTableWidgetItem(result)
if result == "管理密钥错误":
item.setFlags(
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
)
else:
item = QTableWidgetItem(str(value))
# self.if_silence.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfSilence"] == "True")
# )
# 组件录入表格
if j in [2, 4, 10, 11]:
if not self.if_user_list_editable:
item.setEnabled(False)
self.user_list_simple.setCellWidget(
data_simple[i][16], self.userlist_simple_index[j], item
)
else:
self.user_list_simple.setItem(
data_simple[i][16], self.userlist_simple_index[j], item
)
# self.boss_key.setVisible(
# bool(self.config.content["Default"]["SelfSet.IfSilence"] == "True")
# )
# 同步高级用户配置列表
data_beta = [_ for _ in data if _[15] == "beta"]
self.user_list_beta.setRowCount(len(data_beta))
# self.if_to_tray.setChecked(
# bool(self.config.content["Default"]["SelfSet.IfToTray"] == "True")
# )
for i, row in enumerate(data_beta):
for j, value in enumerate(row):
if self.userlist_beta_index[j] == "-":
continue
# 生成表格组件
if j in [4, 9, 10]:
item = ComboBox()
if j == 4:
item.addItems(["启用", "禁用"])
elif j in [9, 10]:
item.addItems(["启用", "禁用", "修改MAA配置"])
if value == "y":
item.setCurrentIndex(0)
elif value == "n":
item.setCurrentIndex(1)
item.currentIndexChanged.connect(
partial(
self.change_user_CellWidget,
data_beta[i][16],
self.user_column[j],
)
)
elif j == 3 and value == -1:
item = QTableWidgetItem("无限")
elif j == 5:
curdate = self.server_date()
if curdate != value:
item = QTableWidgetItem("今日未代理")
else:
item = QTableWidgetItem(f"今日已代理{data_beta[i][14]}")
item.setFlags(QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled)
elif j == 12:
if self.PASSWORD == "":
item = QTableWidgetItem("******")
item.setFlags(
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
)
else:
result = self.crypto.decryptx(value, self.PASSWORD)
item = QTableWidgetItem(result)
if result == "管理密钥错误":
item.setFlags(
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled
)
else:
item = QTableWidgetItem(str(value))
# 组件录入表格
if j in [4, 9, 10]:
if not self.if_user_list_editable:
item.setEnabled(False)
self.user_list_beta.setCellWidget(
data_beta[i][16], self.userlist_beta_index[j], item
)
else:
self.user_list_beta.setItem(
data_beta[i][16], self.userlist_beta_index[j], item
)
# 设置列表可编辑状态
if self.if_user_list_editable:
self.user_list_simple.setEditTriggers(TableWidget.AllEditTriggers)
self.user_list_beta.setEditTriggers(TableWidget.AllEditTriggers)
else:
self.user_list_simple.setEditTriggers(TableWidget.NoEditTriggers)
self.user_list_beta.setEditTriggers(TableWidget.NoEditTriggers)
# 允许GUI改变被同步到本地数据库
self.if_update_database = True
# 设置用户配置列表的标题栏宽度
self.user_list_simple.horizontalHeader().setSectionResizeMode(
QHeaderView.Stretch
)
self.user_list_beta.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
def update_config(self):
"""将self.config中的程序配置同步至GUI界面"""
# 阻止GUI程序配置被立即读入程序形成死循环
self.if_update_config = False
self.main_tab.setCurrentIndex(
self.config.content["Default"]["SelfSet.MainIndex"]
)
self.maa_path.setText(str(Path(self.config.content["Default"]["MaaSet.path"])))
self.routine.setValue(self.config.content["Default"]["TimeLimit.routine"])
self.annihilation.setValue(
self.config.content["Default"]["TimeLimit.annihilation"]
)
self.num.setValue(self.config.content["Default"]["TimesLimit.run"])
self.mail_address.setText(self.config.content["Default"]["SelfSet.MailAddress"])
self.boss_key.setText(self.config.content["Default"]["SelfSet.BossKey"])
self.if_self_start.setChecked(
bool(self.config.content["Default"]["SelfSet.IfSelfStart"] == "True")
)
self.if_sleep.setChecked(
bool(self.config.content["Default"]["SelfSet.IfSleep"] == "True")
)
self.if_proxy_directly.setChecked(
bool(self.config.content["Default"]["SelfSet.IfProxyDirectly"] == "True")
)
self.if_send_mail.setChecked(
bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
)
self.mail_address.setVisible(
bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
)
self.if_send_error_only.setChecked(
bool(
self.config.content["Default"]["SelfSet.IfSendMail.OnlyError"] == "True"
)
)
self.if_send_error_only.setVisible(
bool(self.config.content["Default"]["SelfSet.IfSendMail"] == "True")
)
self.if_silence.setChecked(
bool(self.config.content["Default"]["SelfSet.IfSilence"] == "True")
)
self.boss_key.setVisible(
bool(self.config.content["Default"]["SelfSet.IfSilence"] == "True")
)
self.if_to_tray.setChecked(
bool(self.config.content["Default"]["SelfSet.IfToTray"] == "True")
)
for i in range(10):
self.start_time[i][0].setChecked(
bool(self.config.content["Default"][f"TimeSet.set{i + 1}"] == "True")
)
time = QtCore.QTime(
int(self.config.content["Default"][f"TimeSet.run{i + 1}"][:2]),
int(self.config.content["Default"][f"TimeSet.run{i + 1}"][3:]),
)
self.start_time[i][1].setTime(time)
self.if_update_config = True
# for i in range(10):
# self.start_time[i][0].setChecked(
# bool(self.config.content["Default"][f"TimeSet.set{i + 1}"] == "True")
# )
# time = QtCore.QTime(
# int(self.config.content["Default"][f"TimeSet.run{i + 1}"][:2]),
# int(self.config.content["Default"][f"TimeSet.run{i + 1}"][3:]),
# )
# self.start_time[i][1].setTime(time)
# self.if_update_config = True
def update_board(self, run_text, wait_text, over_text, error_text, log_text):
"""写入数据至GUI执行界面的调度台面板"""
@@ -548,372 +287,6 @@ class Main(QWidget):
self.log_text.verticalScrollBar().maximum()
)
def add_user(self):
"""添加一位新用户"""
# 判断是否已设置管理密钥
if not self.config.key_path.exists():
choice = MessageBox(
"错误",
"请先设置管理密钥再执行添加用户操作",
self.ui,
)
choice.cancelButton.hide()
choice.buttonLayout.insertStretch(1)
if choice.exec():
return None
# 插入预设用户数据
set_book = [
["simple", self.user_list_simple.rowCount()],
["beta", self.user_list_beta.rowCount()],
]
self.config.cur.execute(
"INSERT INTO adminx VALUES('新用户','手机号码(官服)/B站IDB服','Official',-1,'y','2000-01-01','1-7','-','-','n','n','n',?,'',0,?,?)",
(
self.crypto.encryptx("未设置"),
set_book[self.user_set.currentIndex()][0],
set_book[self.user_set.currentIndex()][1],
),
)
self.config.db.commit(),
# 同步新用户至GUI
self.update_user_info("normal")
def del_user(self):
"""删除选中的首位用户"""
# 获取对应的行索引
if self.user_set.currentIndex() == 0:
row = self.user_list_simple.currentRow()
elif self.user_set.currentIndex() == 1:
row = self.user_list_beta.currentRow()
# 判断选择合理性
if row == -1:
choice = MessageBox("错误", "请选中一个用户后再执行删除操作", self.ui)
choice.cancelButton.hide()
choice.buttonLayout.insertStretch(1)
if choice.exec():
return None
# 确认待删除用户信息
self.config.cur.execute(
"SELECT * FROM adminx WHERE mode = ? AND uid = ?",
(
self.user_mode_list[self.user_set.currentIndex()],
row,
),
)
data = self.config.cur.fetchall()
choice = MessageBox("确认", f"确定要删除用户 {data[0][0]} 吗?", self.ui)
# 删除用户
if choice.exec():
# 删除所选用户
self.config.cur.execute(
"DELETE FROM adminx WHERE mode = ? AND uid = ?",
(
self.user_mode_list[self.user_set.currentIndex()],
row,
),
)
self.config.db.commit()
if (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}"
).exists():
shutil.rmtree(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}"
)
# 后续用户补位
if self.user_set.currentIndex() == 0:
current_numb = self.user_list_simple.rowCount()
elif self.user_set.currentIndex() == 1:
current_numb = self.user_list_beta.rowCount()
for i in range(row + 1, current_numb):
self.config.cur.execute(
"UPDATE adminx SET uid = ? WHERE mode = ? AND uid = ?",
(i - 1, self.user_mode_list[self.user_set.currentIndex()], i),
)
self.config.db.commit()
if (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i}"
).exists():
(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i}"
).rename(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i - 1}",
)
# 同步最终结果至GUI
self.update_user_info("normal")
def switch_user(self):
"""切换用户配置模式"""
# 获取当前用户配置模式信息
if self.user_set.currentIndex() == 0:
row = self.user_list_simple.currentRow()
elif self.user_set.currentIndex() == 1:
row = self.user_list_beta.currentRow()
# 判断选择合理性
if row == -1:
choice = MessageBox("错误", "请选中一个用户后再执行切换操作", self.ui)
choice.cancelButton.hide()
choice.buttonLayout.insertStretch(1)
if choice.exec():
return None
# 确认待切换用户信息
self.config.cur.execute(
"SELECT * FROM adminx WHERE mode = ? AND uid = ?",
(
self.user_mode_list[self.user_set.currentIndex()],
row,
),
)
data = self.config.cur.fetchall()
mode_list = ["简洁", "高级"]
choice = MessageBox(
"确认",
f"确定要将用户 {data[0][0]} 转为{mode_list[1 - self.user_set.currentIndex()]}配置模式吗?",
self.ui,
)
# 切换用户
if choice.exec():
self.config.cur.execute("SELECT * FROM adminx WHERE True")
data = self.config.cur.fetchall()
if self.user_set.currentIndex() == 0:
current_numb = self.user_list_simple.rowCount()
elif self.user_set.currentIndex() == 1:
current_numb = self.user_list_beta.rowCount()
# 切换所选用户
other_numb = len(data) - current_numb
self.config.cur.execute(
"UPDATE adminx SET mode = ?, uid = ? WHERE mode = ? AND uid = ?",
(
self.user_mode_list[1 - self.user_set.currentIndex()],
other_numb,
self.user_mode_list[self.user_set.currentIndex()],
row,
),
)
self.config.db.commit()
if (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}"
).exists():
shutil.move(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}",
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[1 - self.user_set.currentIndex()]}/{other_numb}",
)
# 后续用户补位
for i in range(row + 1, current_numb):
self.config.cur.execute(
"UPDATE adminx SET uid = ? WHERE mode = ? AND uid = ?",
(i - 1, self.user_mode_list[self.user_set.currentIndex()], i),
)
self.config.db.commit(),
if (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i}"
).exists():
(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i}"
).rename(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{i - 1}"
)
self.update_user_info("normal")
def get_maa_config(self, info):
"""获取MAA配置文件"""
# 获取全局MAA配置文件
if info == ["Default"]:
shutil.copy(
Path(self.config.content["Default"]["MaaSet.path"]) / "config/gui.json",
self.config.app_path / "data/MAAconfig/Default",
)
# 获取基建配置文件
elif info[2] == "infrastructure":
infrastructure_path = self.read("file_path_infrastructure")
if infrastructure_path:
(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/infrastructure"
).mkdir(parents=True, exist_ok=True)
shutil.copy(
infrastructure_path,
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/infrastructure/infrastructure.json",
)
return True
else:
choice = MessageBox(
"错误",
"未选择自定义基建文件",
self.ui,
)
choice.cancelButton.hide()
choice.buttonLayout.insertStretch(1)
if choice.exec():
return False
# 获取高级用户MAA配置文件
elif info[2] in ["routine", "annihilation"]:
(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/{info[2]}"
).mkdir(parents=True, exist_ok=True)
shutil.copy(
Path(self.config.content["Default"]["MaaSet.path"]) / "config/gui.json",
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/{info[2]}",
)
def change_user_Item(self, item: TableWidget, mode):
"""将GUI中发生修改的用户配置表中的一般信息同步至本地数据库"""
# 验证能否写入本地数据库
if not self.if_update_database:
return None
text = item.text()
# 简洁用户配置列表
if mode == "simple":
# 待写入信息预处理
if item.column() == 3: # 代理天数
try:
text = max(int(text), -1)
except ValueError:
self.update_user_info("normal")
return None
if item.column() in [6, 7, 8]: # 关卡号
# 导入与应用特殊关卡规则
games = {}
with self.config.gameid_path.open(mode="r", encoding="utf-8") as f:
gameids = f.readlines()
for line in gameids:
if "" in line:
game_in, game_out = line.split("", 1)
games[game_in.strip()] = game_out.strip()
text = games.get(text, text)
if item.column() == 11: # 密码
text = self.crypto.encryptx(text)
# 保存至本地数据库
if text != "":
self.config.cur.execute(
f"UPDATE adminx SET {self.user_column[self.userlist_simple_index.index(item.column())]} = ? WHERE mode = 'simple' AND uid = ?",
(text, item.row()),
)
# 高级用户配置列表
elif mode == "beta":
# 待写入信息预处理
if item.column() == 1: # 代理天数
try:
text = max(int(text), -1)
except ValueError:
self.update_user_info("normal")
return None
if item.column() == 6: # 密码
text = self.crypto.encryptx(text)
# 保存至本地数据库
if text != "":
self.config.cur.execute(
f"UPDATE adminx SET {self.user_column[self.userlist_beta_index.index(item.column())]} = ? WHERE mode = 'beta' AND uid = ?",
(text, item.row()),
)
self.config.db.commit()
# 同步一般用户信息更改到GUI
self.update_user_info("normal")
def change_user_CellWidget(self, row, column, index):
"""将GUI中发生修改的用户配置表中的CellWidget类信息同步至本地数据库"""
# 验证能否写入本地数据库
if not self.if_update_database:
return None
# 初次开启自定义基建或选择修改配置文件时选择配置文件
if (
self.user_set.currentIndex() == 0
and column == "infrastructure"
and (
index == 2
or (
index == 0
and not (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}/infrastructure/infrastructure.json"
).exists()
)
)
):
result = self.get_maa_config([0, row, "infrastructure"])
if index == 0 and not result:
index = 1
# 初次开启自定义MAA配置或选择修改MAA配置时调起MAA配置任务
if (
self.user_set.currentIndex() == 1
and column in ["routine", "annihilation"]
and (
index == 2
or (
index == 0
and not (
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[self.user_set.currentIndex()]}/{row}/{column}/gui.json"
).exists()
)
)
):
self.MaaManager.get_json_path = [
self.user_set.currentIndex(),
row,
column,
]
self.maa_starter("设置MAA_用户")
# 服务器
if self.user_set.currentIndex() == 0 and column == "server":
server_list = ["Official", "Bilibili"]
self.config.cur.execute(
f"UPDATE adminx SET server = ? WHERE mode = 'simple' AND uid = ?",
(server_list[index], row),
)
# 其它(启用/禁用)
elif index in [0, 1]:
index_list = ["y", "n"]
self.config.cur.execute(
f"UPDATE adminx SET {column} = ? WHERE mode = ? AND uid = ?",
(
index_list[index],
self.user_mode_list[self.user_set.currentIndex()],
row,
),
)
self.config.db.commit()
# 同步用户组件信息修改到GUI
self.update_user_info("normal")
def change_user_info(self, modes, uids, days, lasts, notes, numbs):
"""将代理完成后发生改动的用户信息同步至本地数据库"""
@@ -978,9 +351,9 @@ class Main(QWidget):
self.config.content["Default"]["SelfSet.BossKey"] = self.boss_key.text()
if self.if_sleep.isChecked():
self.config.content["Default"]["SelfSet.IfSleep"] = "True"
self.config.content["Default"]["SelfSet.IfAllowSleep"] = "True"
else:
self.config.content["Default"]["SelfSet.IfSleep"] = "False"
self.config.content["Default"]["SelfSet.IfAllowSleep"] = "False"
if self.if_self_start.isChecked():
self.config.content["Default"]["SelfSet.IfSelfStart"] = "True"
@@ -1026,6 +399,32 @@ class Main(QWidget):
# 同步程序配置至GUI
self.update_config()
def get_maa_config(self, info):
"""获取MAA配置文件"""
# 获取全局MAA配置文件
if info == ["Default"]:
shutil.copy(
Path(self.config.content["Default"]["MaaSet.path"])
/ "config/gui.json",
self.config.app_path / "data/MAAconfig/Default",
)
# 获取基建配置文件
# 获取高级用户MAA配置文件
elif info[2] in ["routine", "annihilation"]:
(
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/{info[2]}"
).mkdir(parents=True, exist_ok=True)
shutil.copy(
Path(self.config.content["Default"]["MaaSet.path"])
/ "config/gui.json",
self.config.app_path
/ f"data/MAAconfig/{self.user_mode_list[info[0]]}/{info[1]}/{info[2]}",
)
def set_theme(self):
"""手动更新主题色到组件"""
@@ -1122,65 +521,6 @@ class Main(QWidget):
)
return file_path
def set_system(self):
"""设置系统相关配置"""
# 同步系统休眠状态
if self.config.content["Default"]["SelfSet.IfSleep"] == "True":
# 设置系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(
self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
)
elif self.config.content["Default"]["SelfSet.IfSleep"] == "False":
# 恢复系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)
# 同步开机自启
if (
self.config.content["Default"]["SelfSet.IfSelfStart"] == "True"
and not self.is_startup()
):
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
winreg.KEY_SET_VALUE,
winreg.KEY_ALL_ACCESS | winreg.KEY_WRITE | winreg.KEY_CREATE_SUB_KEY,
)
winreg.SetValueEx(
key, self.config.app_name, 0, winreg.REG_SZ, self.config.app_path_sys
)
winreg.CloseKey(key)
elif (
self.config.content["Default"]["SelfSet.IfSelfStart"] == "False"
and self.is_startup()
):
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
winreg.KEY_SET_VALUE,
winreg.KEY_ALL_ACCESS | winreg.KEY_WRITE | winreg.KEY_CREATE_SUB_KEY,
)
winreg.DeleteValue(key, self.config.app_name)
winreg.CloseKey(key)
def is_startup(self):
"""判断程序是否已经开机自启"""
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
)
try:
value, _ = winreg.QueryValueEx(key, self.config.app_name)
winreg.CloseKey(key)
return True
except FileNotFoundError:
winreg.CloseKey(key)
return False
def timed_start(self):
"""定时启动代理任务"""
@@ -1228,20 +568,6 @@ class Main(QWidget):
# 执行日志记录,暂时缺省
pass
def get_window_info(self):
"""获取当前窗口信息"""
def callback(hwnd, window_info):
if win32gui.IsWindowVisible(hwnd) and win32gui.GetWindowText(hwnd):
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
window_info.append((win32gui.GetWindowText(hwnd), process.exe()))
return True
window_info = []
win32gui.EnumWindows(callback, window_info)
return window_info
def maa_starter(self, mode):
"""启动MaaManager线程运行任务"""