Squashed commit of the following:

commit 8724c545a8af8f34565aa71620e66cbd71547f37
Author: DLmaster <DLmaster_361@163.com>
Date:   Fri Apr 11 18:08:28 2025 +0800

    feat(core): 预接入mirrorc

commit d57ebaa281ff7c418aa8f11fe8e8ba260d8dbeca
Author: DLmaster <DLmaster_361@163.com>
Date:   Thu Apr 10 12:37:26 2025 +0800

    chore(core): 基础配置相关内容重构

    - 添加理智药设置选项 #34
    - 输入对话框添加回车键确认能力 #35
    - 用户列表UI改版升级
    - 配置类取消单例限制
    - 配置读取方式与界面渲染方法优化

commit 710542287d04719c8443b91acb227de1dccc20d0
Author: DLmaster <DLmaster_361@163.com>
Date:   Fri Mar 28 23:32:17 2025 +0800

    chore(core): search相关结构重整

commit 8009c69236655e29119ce62ff53a0360abaed2af
Merge: 648f42b 9f88f92
Author: DLmaster <DLmaster_361@163.com>
Date:   Mon Mar 24 15:31:40 2025 +0800

    Merge branch 'dev' into user_list_dev
This commit is contained in:
DLmaster
2025-04-11 18:57:10 +08:00
parent bded794647
commit 38a04fc4b2
21 changed files with 3759 additions and 3018 deletions

View File

@@ -29,16 +29,16 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .core import AppConfig, QueueConfig, MaaConfig, Task, TaskManager, MainTimer
from .core import QueueConfig, MaaConfig, MaaUserConfig, Task, TaskManager, MainTimer
from .models import MaaManager
from .services import Notify, Crypto, System
from .ui import AUTO_MAA
from .utils import DownloadManager
__all__ = [
"AppConfig",
"QueueConfig",
"MaaConfig",
"MaaUserConfig",
"Task",
"TaskManager",
"MainTimer",

View File

@@ -29,16 +29,16 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .config import AppConfig, QueueConfig, MaaConfig, Config
from .config import QueueConfig, MaaConfig, MaaUserConfig, Config
from .main_info_bar import MainInfoBar
from .task_manager import Task, TaskManager
from .timer import MainTimer
__all__ = [
"AppConfig",
"Config",
"QueueConfig",
"MaaConfig",
"MaaUserConfig",
"MainInfoBar",
"Task",
"TaskManager",

File diff suppressed because it is too large Load Diff

View File

@@ -28,8 +28,6 @@ v4.2
from loguru import logger
from PySide6.QtCore import QThread, QObject, Signal
from qfluentwidgets import MessageBox
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Union
@@ -45,7 +43,7 @@ class Task(QThread):
push_info_bar = Signal(str, str, str, int)
question = Signal(str, str)
question_response = Signal(bool)
update_user_info = Signal(Path, list, list, list, list, list, list)
update_user_info = Signal(str, dict)
create_task_list = Signal(list)
create_user_list = Signal(list)
update_task_list = Signal(list)
@@ -76,13 +74,8 @@ class Task(QThread):
self.task = MaaManager(
self.mode,
Config.app_path / f"config/MaaConfig/{self.name}",
(
None
if "全局" in self.mode
else Config.app_path
/ f"config/MaaConfig/{self.name}/beta/{self.info["SetMaaInfo"]["UserId"]}/{self.info["SetMaaInfo"]["SetType"]}"
),
Config.member_dict[self.name],
(None if "全局" in self.mode else self.info["SetMaaInfo"]["Path"]),
)
self.task.push_info_bar.connect(self.push_info_bar.emit)
self.task.accomplish.connect(lambda: self.accomplish.emit([]))
@@ -91,42 +84,52 @@ class Task(QThread):
else:
self.member_dict = self.search_member()
self.task_dict = [
[value, "等待"]
for _, value in self.info["Queue"].items()
self.task_list = [
[
(
value
if Config.member_dict[value]["Config"].get(
Config.member_dict[value]["Config"].MaaSet_Name
)
== ""
else f"{value} - {Config.member_dict[value]["Config"].get(Config.member_dict[value]["Config"].MaaSet_Name)}"
),
"等待",
value,
]
for _, value in sorted(
self.info["Queue"].items(), key=lambda x: int(x[0][7:])
)
if value != "禁用"
]
self.create_task_list.emit(self.task_dict)
self.create_task_list.emit(self.task_list)
for i in range(len(self.task_dict)):
for task in self.task_list:
if self.isInterruptionRequested():
break
self.task_dict[i][1] = "运行"
self.update_task_list.emit(self.task_dict)
task[1] = "运行"
self.update_task_list.emit(self.task_list)
if self.task_dict[i][0] in Config.running_list:
if task[2] in Config.running_list:
self.task_dict[i][1] = "跳过"
self.update_task_list.emit(self.task_dict)
logger.info(f"跳过任务:{self.task_dict[i][0]}")
self.push_info_bar.emit(
"info", "跳过任务", self.task_dict[i][0], 3000
)
task[1] = "跳过"
self.update_task_list.emit(self.task_list)
logger.info(f"跳过任务:{task[0]}")
self.push_info_bar.emit("info", "跳过任务", task[0], 3000)
continue
Config.running_list.append(self.task_dict[i][0])
logger.info(f"任务开始:{self.task_dict[i][0]}")
self.push_info_bar.emit("info", "任务开始", self.task_dict[i][0], 3000)
Config.running_list.append(task[2])
logger.info(f"任务开始:{task[0]}")
self.push_info_bar.emit("info", "任务开始", task[0], 3000)
if self.member_dict[self.task_dict[i][0]][0] == "Maa":
if Config.member_dict[task[2]]["Type"] == "Maa":
self.task = MaaManager(
self.mode[0:4],
self.member_dict[self.task_dict[i][0]][1],
Config.member_dict[task[2]],
)
self.task.question.connect(self.question.emit)
@@ -136,44 +139,21 @@ class Task(QThread):
self.task.create_user_list.connect(self.create_user_list.emit)
self.task.update_user_list.connect(self.update_user_list.emit)
self.task.update_log_text.connect(self.update_log_text.emit)
self.task.update_user_info.connect(
lambda modes, uids, days, lasts, notes, numbs: self.update_user_info.emit(
self.member_dict[self.task_dict[i][0]][1],
modes,
uids,
days,
lasts,
notes,
numbs,
)
)
self.task.update_user_info.connect(self.update_user_info.emit)
self.task.accomplish.connect(
lambda log: self.task_accomplish(self.task_dict[i][0], log)
lambda log: self.task_accomplish(task[2], log)
)
self.task.run()
Config.running_list.remove(self.task_dict[i][0])
Config.running_list.remove(task[2])
self.task_dict[i][1] = "完成"
logger.info(f"任务完成:{self.task_dict[i][0]}")
self.push_info_bar.emit("info", "任务完成", self.task_dict[i][0], 3000)
task[1] = "完成"
logger.info(f"任务完成:{task[0]}")
self.push_info_bar.emit("info", "任务完成", task[0], 3000)
self.accomplish.emit(self.logs)
def search_member(self) -> dict:
"""搜索所有脚本实例及其路径"""
member_dict = {}
if (Config.app_path / "config/MaaConfig").exists():
for subdir in (Config.app_path / "config/MaaConfig").iterdir():
if subdir.is_dir():
member_dict[subdir.name] = ["Maa", subdir]
return member_dict
def task_accomplish(self, name: str, log: dict):
"""保存保存任务结果"""
@@ -279,12 +259,13 @@ class _TaskManager(QObject):
Config.running_list.remove(name)
if "调度队列" in name and "人工排查" not in mode:
with (Config.app_path / f"config/QueueConfig/{name}.json").open(
"r", encoding="utf-8"
) as f:
info = json.load(f)
if info["QueueSet"]["AfterAccomplish"] != "None":
if (
Config.queue_dict[name]["Config"].get(
Config.queue_dict[name]["Config"].queueSet_AfterAccomplish
)
!= "None"
):
from app.ui import ProgressRingMessageBox
@@ -297,10 +278,14 @@ class _TaskManager(QObject):
choice = ProgressRingMessageBox(
self.main_window,
f"{mode_book[info['QueueSet']['AfterAccomplish']]}倒计时",
f"{mode_book[Config.queue_dict[name]["Config"].get(Config.queue_dict[name]["Config"].queueSet_AfterAccomplish)]}倒计时",
)
if choice.exec():
System.set_power(info["QueueSet"]["AfterAccomplish"])
System.set_power(
Config.queue_dict[name]["Config"].get(
Config.queue_dict[name]["Config"].queueSet_AfterAccomplish
)
)
def push_dialog(self, name: str, title: str, content: str):
"""推送对话框"""

View File

@@ -28,7 +28,6 @@ v4.2
from loguru import logger
from PySide6.QtWidgets import QWidget
from PySide6.QtCore import QTimer
import json
from datetime import datetime
import pyautogui
@@ -48,26 +47,31 @@ class _MainTimer(QWidget):
self.Timer.timeout.connect(self.timed_start)
self.Timer.timeout.connect(self.set_silence)
self.Timer.start(1000)
self.LongTimer = QTimer()
self.LongTimer.timeout.connect(self.long_timed_task)
self.LongTimer.start(3600000)
def long_timed_task(self):
"""长时间定期检定任务"""
Config.get_gameid("ALL")
def timed_start(self):
"""定时启动代理任务"""
# 获取定时列表
queue_list = self.search_queue()
for name, info in Config.queue_dict.items():
for i in queue_list:
name, info = i
if not info["QueueSet"]["Enabled"]:
if not info["Config"].get(info["Config"].queueSet_Enabled):
continue
history = Config.get_history(name)
data = info["Config"].toDict()
time_set = [
info["Time"][f"TimeSet_{_}"]
data["Time"][f"TimeSet_{_}"]
for _ in range(10)
if info["Time"][f"TimeEnabled_{_}"]
if data["Time"][f"TimeEnabled_{_}"]
]
# 按时间调起代理任务
curtime = datetime.now().strftime("%Y-%m-%d %H:%M")
@@ -78,15 +82,15 @@ class _MainTimer(QWidget):
):
logger.info(f"定时任务:{name}")
TaskManager.add_task("自动代理_新调度台", name, info)
TaskManager.add_task("自动代理_新调度台", name, data)
def set_silence(self):
"""设置静默模式"""
if (
not Config.if_ignore_silence
and Config.global_config.get(Config.global_config.function_IfSilence)
and Config.global_config.get(Config.global_config.function_BossKey) != ""
and Config.get(Config.function_IfSilence)
and Config.get(Config.function_BossKey) != ""
):
windows = System.get_window_info()
@@ -99,9 +103,7 @@ class _MainTimer(QWidget):
pyautogui.hotkey(
*[
_.strip().lower()
for _ in Config.global_config.get(
Config.global_config.function_BossKey
).split("+")
for _ in Config.get(Config.function_BossKey).split("+")
]
)
except pyautogui.FailSafeException as e:
@@ -109,18 +111,5 @@ class _MainTimer(QWidget):
logger.warning(f"FailSafeException: {e}")
self.if_FailSafeException = True
def search_queue(self) -> list:
"""搜索所有调度队列实例"""
queue_list = []
if (Config.app_path / "config/QueueConfig").exists():
for json_file in (Config.app_path / "config/QueueConfig").glob("*.json"):
with json_file.open("r", encoding="utf-8") as f:
info = json.load(f)
queue_list.append([json_file.stem, info])
return queue_list
MainTimer = _MainTimer()

View File

@@ -28,16 +28,15 @@ v4.2
from loguru import logger
from PySide6.QtCore import QObject, Signal, QEventLoop, QFileSystemWatcher, QTimer
import json
import sqlite3
from datetime import datetime, timedelta
import subprocess
import shutil
import time
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from typing import Union, List
from typing import Union, List, Dict
from app.core import Config
from app.core import Config, MaaConfig, MaaUserConfig
from app.services import Notify, System
@@ -46,7 +45,7 @@ class MaaManager(QObject):
question = Signal(str, str)
question_response = Signal(bool)
update_user_info = Signal(list, list, list, list, list, list)
update_user_info = Signal(str, dict)
push_info_bar = Signal(str, str, str, int)
create_user_list = Signal(list)
update_user_list = Signal(list)
@@ -59,16 +58,22 @@ class MaaManager(QObject):
def __init__(
self,
mode: str,
config_path: Path,
config: Dict[
str,
Union[
str,
Path,
MaaConfig,
Dict[str, Dict[str, Union[Path, MaaUserConfig]]],
],
],
user_config_path: Path = None,
):
super(MaaManager, self).__init__()
self.current_user = ""
self.weekly_annihilation_limit_reached = False
self.user_list = ""
self.mode = mode
self.config_path = config_path
self.config_path = config["Path"]
self.user_config_path = user_config_path
self.log_monitor = QFileSystemWatcher()
@@ -82,21 +87,17 @@ class MaaManager(QObject):
self.interrupt.connect(self.quit_monitor)
with (self.config_path / "config.json").open("r", encoding="utf-8") as f:
self.set = json.load(f)
self.set = config["Config"].toDict()
self.data = {}
if "设置MAA" not in self.mode:
for name, info in config["UserData"].items():
self.data[name] = {
"Path": info["Path"],
"Config": info["Config"].toDict(),
}
db = sqlite3.connect(self.config_path / "user_data.db")
cur = db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
self.data = cur.fetchall()
self.data = [list(row) for row in self.data]
cur.close()
db.close()
else:
self.data = []
self.data = dict(sorted(self.data.items(), key=lambda x: int(x[0][3:])))
def configure(self):
"""提取配置信息"""
@@ -110,10 +111,8 @@ class MaaManager(QObject):
def run(self):
"""主进程运行MAA代理进程"""
# 初始化本周剿灭上限标志
self.weekly_annihilation_limit_reached = False
curdate = self.server_date()
curdate = Config.server_date()
begin_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.configure()
@@ -135,11 +134,19 @@ class MaaManager(QObject):
# 整理用户数据,筛选需代理的用户
if "设置MAA" not in self.mode:
self.data = sorted(self.data, key=lambda x: (-len(x[15]), x[16]))
self.user_list: List[List[str, str, int]] = [
[_[0], "等待", index]
for index, _ in enumerate(self.data)
if (_[3] != 0 and _[4] == "y")
self.data = dict(
sorted(
self.data.items(),
key=lambda x: (x[1]["Config"]["Info"]["Mode"], int(x[0][3:])),
)
)
self.user_list: List[List[str, str, str]] = [
[_["Config"]["Info"]["Name"], "等待", index]
for index, _ in self.data.items()
if (
_["Config"]["Info"]["RemainedDay"] != 0
and _["Config"]["Info"]["Status"]
)
]
self.create_user_list.emit(self.user_list)
@@ -150,26 +157,25 @@ class MaaManager(QObject):
self.if_open_emulator = True
# 执行情况预处理
for _ in self.user_list:
if self.data[_[2]][5] != curdate:
self.data[_[2]][5] = curdate
self.data[_[2]][14] = 0
_[0] += f" - 第{self.data[_[2]][14] + 1}次代理"
if self.data[_[2]]["Config"]["Data"]["LastProxyDate"] != curdate:
self.data[_[2]]["Config"]["Data"]["LastProxyDate"] = curdate
self.data[_[2]]["Config"]["Data"]["ProxyTimes"] = 0
_[
0
] += f" - 第{self.data[_[2]]["Config"]["Data"]["ProxyTimes"] + 1}次代理"
# 开始代理
for user in self.user_list:
self.current_user = user[0].split(" - ")[0]
if self.load_weekly_annihilation_status(self.current_user):
self.weekly_annihilation_limit_reached = True
logger.info(f"用户 {self.current_user} 本周已达上限")
else:
self.weekly_annihilation_limit_reached = False
user_data = self.data[user[2]]["Config"]
if self.isInterruptionRequested:
break
if (
self.set["RunSet"]["ProxyTimesLimit"] == 0
or self.data[user[2]][14] < self.set["RunSet"]["ProxyTimesLimit"]
or user_data["Data"]["ProxyTimes"]
< self.set["RunSet"]["ProxyTimesLimit"]
):
user[1] = "运行"
self.update_user_list.emit(self.user_list)
@@ -181,17 +187,20 @@ class MaaManager(QObject):
logger.info(f"{self.name} | 开始代理用户: {user[0]}")
# 初始化代理情况记录和模式替换记录
run_book = [False for _ in range(2)]
mode_book = ["自动代理_剿灭", "自动代理_日常"]
run_book = {"Annihilation": False, "Routine": False}
mode_book = {
"Annihilation": "自动代理_剿灭",
"Routine": "自动代理_日常",
}
# 简洁模式用户默认开启日常选项
if self.data[user[2]][15] == "simple":
self.data[user[2]][9] = "y"
elif self.data[user[2]][15] == "beta":
check_book = [
[True, "annihilation", "剿灭"],
[True, "routine", "日常"],
]
if user_data["Info"]["Mode"] == "简洁":
user_data["Info"]["Routine"] = True
elif user_data["Info"]["Mode"] == "详细":
check_book = {
"Annihilation": True,
"Routine": True,
}
user_logs_list = []
user_start_time = datetime.now()
@@ -207,62 +216,76 @@ class MaaManager(QObject):
)
# 剿灭-日常模式循环
for j in range(2):
for mode in ["Annihilation", "Routine"]:
if self.isInterruptionRequested:
break
# j == 0 剿灭模式;满足条件跳过剿灭
# 剿灭模式;满足条件跳过剿灭
if (
j == 0
and self.set["RunSet"].get(
"RunSet_AnnihilationWeeklyLimit", True
)
and self.weekly_annihilation_limit_reached
mode == "Annihilation"
and self.set["RunSet"]["AnnihilationWeeklyLimit"]
and datetime.strptime(
user_data["Data"]["LastAnnihilationDate"], "%Y-%m-%d"
).isocalendar()[:2]
== datetime.strptime(curdate, "%Y-%m-%d").isocalendar()[:2]
):
logger.info(
f"{self.name} | 用户: {self.current_user} - 本周剿灭模式已达上限,跳过执行剿灭任务"
f"{self.name} | 用户: {user_data["Info"]["Name"]} - 本周剿灭模式已达上限,跳过执行剿灭任务"
)
run_book[j] = True
run_book[mode] = True
continue
else:
self.weekly_annihilation_limit_reached = False
if self.data[user[2]][10 - j] == "n":
run_book[j] = True
if not user_data["Info"][mode]:
run_book[mode] = True
continue
if run_book[j]:
if run_book[mode]:
continue
logger.info(
f"{self.name} | 用户: {user[0]} - 模式: {mode_book[j]}"
f"{self.name} | 用户: {user[0]} - 模式: {mode_book[mode]}"
)
if self.data[user[2]][15] == "beta":
if user_data["Info"]["Mode"] == "详细":
self.if_open_emulator = True
if (
check_book[j][0]
check_book[mode]
and not (
self.config_path
/ f"beta/{self.data[user[2]][16]}/{check_book[j][1]}/gui.json"
self.data[user[2]]["Path"] / f"{mode}/gui.json"
).exists()
):
logger.error(
f"{self.name} | 用户: {user[0]} - 未找到{check_book[j][2]}配置文件"
f"{self.name} | 用户: {user[0]} - 未找到{mode_book[mode][5:7]}配置文件"
)
self.push_info_bar.emit(
"error",
"启动MAA代理进程失败",
f"未找到{user[0]}{check_book[j][2]}配置文件!",
f"未找到{user[0]}{mode_book[mode][5:7]}配置文件!",
-1,
)
check_book[j][0] = False
check_book[mode] = False
continue
elif not check_book[j][0]:
elif not check_book[mode]:
continue
# 更新当前模式到界面
self.update_user_list.emit(
[
(
[f"{_[0]} - {mode_book[mode][5:7]}", _[1], _[2]]
if _[2] == user[2]
else _
)
for _ in self.user_list
]
)
# 配置MAA
set = self.set_maa(mode_book[j], user[2])
set = self.set_maa(mode_book[mode], user[2])
# 记录当前时间
start_time = datetime.now()
@@ -291,13 +314,13 @@ class MaaManager(QObject):
creationflags=subprocess.CREATE_NO_WINDOW,
)
# 监测MAA运行状态
self.start_monitor(start_time, mode_book[j])
self.start_monitor(start_time, mode_book[mode])
if self.maa_result == "Success!":
logger.info(
f"{self.name} | 用户: {user[0]} - MAA进程完成代理任务"
)
run_book[j] = True
run_book[mode] = True
self.update_log_text.emit(
"检测到MAA进程完成代理任务\n正在等待相关程序结束\n请等待10s"
)
@@ -322,8 +345,8 @@ class MaaManager(QObject):
# 推送异常通知
Notify.push_plyer(
"用户自动代理出现异常!",
f"用户 {user[0].replace("_", " 今天的")}{mode_book[j][5:7]}部分出现一次异常",
f"{user[0].replace("_", " ")}{mode_book[j][5:7]}出现异常",
f"用户 {user[0].replace("_", " 今天的")}{mode_book[mode][5:7]}部分出现一次异常",
f"{user[0].replace("_", " ")}{mode_book[mode][5:7]}出现异常",
1,
)
for _ in range(10):
@@ -343,36 +366,40 @@ class MaaManager(QObject):
):
System.kill_process(self.emulator_path)
# 记录剿灭情况
if (
mode == "Annihilation"
and self.weekly_annihilation_limit_reached
):
user_data["Data"]["LastAnnihilationDate"] = curdate
# 保存运行日志以及统计信息
if_six_star = Config.save_maa_log(
Config.app_path
/ f"history/{curdate}/{self.data[user[2]][0]}/{start_time.strftime("%H-%M-%S")}.log",
self.check_maa_log(start_time, mode_book[j]),
/ f"history/{curdate}/{user_data["Info"]["Name"]}/{start_time.strftime("%H-%M-%S")}.log",
self.check_maa_log(start_time, mode_book[mode]),
self.maa_result,
)
user_logs_list.append(
Config.app_path
/ f"history/{curdate}/{self.data[user[2]][0]}/{start_time.strftime("%H-%M-%S")}.json",
/ f"history/{curdate}/{user_data["Info"]["Name"]}/{start_time.strftime("%H-%M-%S")}.json",
)
if (
Config.global_config.get(
Config.global_config.notify_IfSendSixStar
)
and if_six_star
):
if Config.get(Config.notify_IfSendSixStar) and if_six_star:
self.push_notification(
"公招六星",
f"喜报:用户 {user[0]} 公招出六星啦!",
{"user_name": self.data[user[2]][0]},
{"user_name": user_data["Info"]["Name"]},
)
# 成功完成代理的用户修改相关参数
if run_book[0] and run_book[1]:
if self.data[user[2]][14] == 0 and self.data[user[2]][3] != -1:
self.data[user[2]][3] -= 1
self.data[user[2]][14] += 1
if run_book["Annihilation"] and run_book["Routine"]:
if (
user_data["Data"]["ProxyTimes"] == 0
and user_data["Info"]["RemainedDay"] != -1
):
user_data["Info"]["RemainedDay"] -= 1
user_data["Data"]["ProxyTimes"] += 1
user[1] = "完成"
Notify.push_plyer(
"成功完成一个自动代理任务!",
@@ -382,9 +409,7 @@ class MaaManager(QObject):
)
break
if Config.global_config.get(
Config.global_config.notify_IfSendStatistic
):
if Config.get(Config.notify_IfSendStatistic):
statistics = Config.merge_maa_logs("指定项", user_logs_list)
statistics["user_info"] = user[0]
@@ -396,7 +421,7 @@ class MaaManager(QObject):
)
statistics["maa_result"] = (
"代理任务全部完成"
if (run_book[0] and run_book[1])
if (run_book["Annihilation"] and run_book["Routine"])
else "代理任务未全部完成"
)
self.push_notification(
@@ -404,7 +429,7 @@ class MaaManager(QObject):
)
# 录入代理失败的用户
if not (run_book[0] and run_book[1]):
if not (run_book["Annihilation"] and run_book["Routine"]):
user[1] = "异常"
self.update_user_list.emit(self.user_list)
@@ -424,6 +449,8 @@ class MaaManager(QObject):
# 开始排查
for user in self.user_list:
user_data = self.data[user[2]]["Config"]
if self.isInterruptionRequested:
break
@@ -432,7 +459,7 @@ class MaaManager(QObject):
user[1] = "运行"
self.update_user_list.emit(self.user_list)
if self.data[user[2]][15] == "beta":
if user_data["Info"]["Mode"] == "详细":
self.if_open_emulator = True
run_book = [False for _ in range(2)]
@@ -495,20 +522,14 @@ class MaaManager(QObject):
):
run_book[1] = True
# 结果录入用户备注栏
# 结果录入
if run_book[0] and run_book[1]:
logger.info(f"{self.name} | 用户 {user[0]} 通过人工排查")
if "未通过人工排查" in self.data[user[2]][13]:
self.data[user[2]][13] = self.data[user[2]][13].replace(
"未通过人工排查|", ""
)
user_data["Data"]["IfPassCheck"] = True
user[1] = "完成"
else:
logger.info(f"{self.name} | 用户 {user[0]} 未通过人工排查")
if not "未通过人工排查" in self.data[user[2]][13]:
self.data[user[2]][
13
] = f"未通过人工排查|{self.data[user[2]][13]}"
user_data["Data"]["IfPassCheck"] = False
user[1] = "异常"
self.update_user_list.emit(self.user_list)
@@ -551,13 +572,8 @@ class MaaManager(QObject):
System.kill_process(self.maa_exe_path)
# 更新用户数据
modes = [self.data[_[2]][15] for _ in self.user_list]
uids = [self.data[_[2]][16] for _ in self.user_list]
days = [self.data[_[2]][3] for _ in self.user_list]
lasts = [self.data[_[2]][5] for _ in self.user_list]
notes = [self.data[_[2]][13] for _ in self.user_list]
numbs = [self.data[_[2]][14] for _ in self.user_list]
self.update_user_info.emit(modes, uids, days, lasts, notes, numbs)
updated_info = {_[2]: self.data[_[2]] for _ in self.user_list}
self.update_user_info.emit(self.config_path.name, updated_info)
error_index = [_[2] for _ in self.user_list if _[1] == "异常"]
over_index = [_[2] for _ in self.user_list if _[1] == "完成"]
@@ -580,8 +596,12 @@ class MaaManager(QObject):
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"completed_count": len(over_index),
"uncompleted_count": len(error_index) + len(wait_index),
"failed_user": [self.data[_][0] for _ in error_index],
"waiting_user": [self.data[_][0] for _ in wait_index],
"failed_user": [
self.data[_]["Config"]["Info"]["Name"] for _ in error_index
],
"waiting_user": [
self.data[_]["Config"]["Info"]["Name"] for _ in wait_index
],
}
# 推送代理结果通知
Notify.push_plyer(
@@ -590,11 +610,8 @@ class MaaManager(QObject):
f"已完成用户数:{len(over_index)},未完成用户数:{len(error_index) + len(wait_index)}",
10,
)
if Config.global_config.get(
Config.global_config.notify_SendTaskResultTime
) == "任何时刻" or (
Config.global_config.get(Config.global_config.notify_SendTaskResultTime)
== "仅失败时"
if Config.get(Config.notify_SendTaskResultTime) == "任何时刻" or (
Config.get(Config.notify_SendTaskResultTime) == "仅失败时"
and len(error_index) + len(wait_index) != 0
):
result_text = self.push_notification("代理结果", title, result)
@@ -658,12 +675,6 @@ class MaaManager(QObject):
logs.append(entry)
log = "".join(logs)
# 如果日志中包含提示信息,则设置上限标志,并记录持久化信息
if "剿灭任务失败" in log:
logger.info(f"用户:{self.current_user} | 剿灭任务失败设置上限标志为True")
self.weekly_annihilation_limit_reached = True
self.record_weekly_annihilation_limit(self.current_user)
# 更新MAA日志
if len(logs) > 100:
self.update_log_text.emit("".join(logs[-100:]))
@@ -686,6 +697,11 @@ class MaaManager(QObject):
"自动代理_日常": "RoutineTimeLimit",
}
if mode == "自动代理_剿灭" and "剿灭任务失败" in log:
self.weekly_annihilation_limit_reached = True
else:
self.weekly_annihilation_limit_reached = False
if mode == "自动代理_日常" and "任务出错: Fight" in log:
self.maa_result = "检测到MAA未能实际执行任务"
elif "任务出错: StartUp" in log:
@@ -763,6 +779,9 @@ class MaaManager(QObject):
"""配置MAA运行参数"""
logger.info(f"{self.name} | 配置MAA运行参数: {mode}/{index}")
if "设置MAA" not in self.mode:
user_data = self.data[index]["Config"]
# 配置MAA前关闭可能未正常退出的MAA进程
System.kill_process(self.maa_exe_path)
@@ -777,36 +796,38 @@ class MaaManager(QObject):
)
elif (mode == "设置MAA_全局") or (
("自动代理" in mode or "人工排查" in mode)
and self.data[index][15] == "simple"
and user_data["Info"]["Mode"] == "简洁"
):
shutil.copy(
self.config_path / "Default/gui.json",
self.maa_set_path,
)
elif "自动代理" in mode and self.data[index][15] == "beta":
elif "自动代理" in mode and user_data["Info"]["Mode"] == "详细":
if mode == "自动代理_剿灭":
shutil.copy(
self.config_path
/ f"beta/{self.data[index][16]}/annihilation/gui.json",
self.data[index]["Path"] / "Annihilation/gui.json",
self.maa_set_path,
)
elif mode == "自动代理_日常":
shutil.copy(
self.config_path / f"beta/{self.data[index][16]}/routine/gui.json",
self.data[index]["Path"] / "Routine/gui.json",
self.maa_set_path,
)
elif "人工排查" in mode and self.data[index][15] == "beta":
elif "人工排查" in mode and user_data["Info"]["Mode"] == "详细":
shutil.copy(
self.config_path / f"beta/{self.data[index][16]}/routine/gui.json",
self.data[index]["Path"] / "Routine/gui.json",
self.maa_set_path,
)
with self.maa_set_path.open(mode="r", encoding="utf-8") as f:
data = json.load(f)
if "设置MAA" not in mode and (
(self.data[index][15] == "simple" and self.data[index][2] == "Bilibili")
(
user_data["Info"]["Mode"] == "简洁"
and user_data["Info"]["Server"] == "Bilibili"
)
or (
self.data[index][15] == "beta"
user_data["Info"]["Mode"] == "详细"
and data["Configurations"]["Default"]["Start.ClientType"] == "Bilibili"
)
):
@@ -822,16 +843,19 @@ class MaaManager(QObject):
data["Global"][f"Timer.Timer{i}"] = "False" # 时间设置
if (
[i for i, _ in enumerate(self.user_list) if _[2] == index][0]
next((i for i, _ in enumerate(self.user_list) if _[2] == index), None)
== len(self.user_list) - 1
) or (
self.data[
self.user_list[
[i for i, _ in enumerate(self.user_list) if _[2] == index][0]
next(
(i for i, _ in enumerate(self.user_list) if _[2] == index),
None,
)
+ 1
][2]
][15]
== "beta"
]["Config"]["Info"]["Mode"]
== "详细"
):
data["Configurations"]["Default"][
"MainFunction.PostActions"
@@ -849,12 +873,24 @@ class MaaManager(QObject):
"True" if self.if_open_emulator else "False"
) # 启动MAA后自动开启模拟器
if Config.global_config.get(Config.global_config.function_IfSilence):
if Config.get(Config.function_IfSilence):
data["Global"]["Start.MinimizeDirectly"] = "True" # 启动MAA后直接最小化
data["Global"]["GUI.UseTray"] = "True" # 显示托盘图标
data["Global"]["GUI.MinimizeToTray"] = "True" # 最小化时隐藏至托盘
if self.data[index][15] == "simple":
# 账号切换
if user_data["Info"]["Server"] == "Official":
data["Configurations"]["Default"]["Start.AccountName"] = (
f"{user_data["Info"]["Id"][:3]}****{user_data["Info"]["Id"][7:]}"
if len(user_data["Info"]["Id"]) == 11
else user_data["Info"]["Id"]
)
elif user_data["Info"]["Server"] == "Bilibili":
data["Configurations"]["Default"]["Start.AccountName"] = user_data[
"Info"
]["Id"]
if user_data["Info"]["Mode"] == "简洁":
data["Global"][
"VersionUpdate.ScheduledUpdateCheck"
@@ -865,22 +901,11 @@ class MaaManager(QObject):
data["Global"][
"VersionUpdate.AutoInstallUpdatePackage"
] = "False" # 自动安装更新包
data["Configurations"]["Default"]["Start.ClientType"] = self.data[
index
data["Configurations"]["Default"]["Start.ClientType"] = user_data[
"Info"
][
2
"Server"
] # 客户端类型
# 账号切换
if self.data[index][2] == "Official":
data["Configurations"]["Default"]["Start.AccountName"] = (
f"{self.data[index][1][:3]}****{self.data[index][1][7:]}"
if len(self.data[index][1]) == 11
else self.data[index][1]
)
elif self.data[index][2] == "Bilibili":
data["Configurations"]["Default"]["Start.AccountName"] = self.data[
index
][1]
if "剿灭" in mode:
@@ -968,32 +993,35 @@ class MaaManager(QObject):
data["Configurations"]["Default"][
"TaskQueue.Reclamation.IsChecked"
] = "False" # 生息演算
# 主关卡
if self.data[index][6] == "-":
data["Configurations"]["Default"]["MainFunction.Stage1"] = ""
else:
data["Configurations"]["Default"]["MainFunction.Stage1"] = (
self.data[index][6]
)
# 备选关卡1
if self.data[index][7] == "-":
data["Configurations"]["Default"]["MainFunction.Stage2"] = ""
else:
data["Configurations"]["Default"]["MainFunction.Stage2"] = (
self.data[index][7]
)
# 备选关卡2
if self.data[index][8] == "-":
data["Configurations"]["Default"]["MainFunction.Stage3"] = ""
else:
data["Configurations"]["Default"]["MainFunction.Stage3"] = (
self.data[index][8]
)
data["Configurations"]["Default"]["MainFunction.UseMedicine"] = (
"False" if user_data["Info"]["MedicineNumb"] == 0 else "True"
) # 吃理智药
data["Configurations"]["Default"][
"MainFunction.UseMedicine.Quantity"
] = str(
user_data["Info"]["MedicineNumb"]
) # 吃理智药数量
data["Configurations"]["Default"]["MainFunction.Stage1"] = (
user_data["Info"]["GameId"]
if user_data["Info"]["GameId"] != "-"
else ""
) # 主关卡
data["Configurations"]["Default"]["MainFunction.Stage2"] = (
user_data["Info"]["GameId_1"]
if user_data["Info"]["GameId_1"] != "-"
else ""
) # 备选关卡1
data["Configurations"]["Default"]["MainFunction.Stage3"] = (
user_data["Info"]["GameId_2"]
if user_data["Info"]["GameId_2"] != "-"
else ""
) # 备选关卡2
data["Configurations"]["Default"][
"Fight.RemainingSanityStage"
] = "" # 剩余理智关卡
# 连战次数
if self.data[index][6] == "1-7":
if user_data["Info"]["GameId"] == "1-7":
data["Configurations"]["Default"][
"MainFunction.Series.Quantity"
] = "6"
@@ -1008,7 +1036,10 @@ class MaaManager(QObject):
"GUI.CustomStageCode"
] = "True" # 手动输入关卡名
# 备选关卡
if self.data[index][7] == "-" and self.data[index][8] == "-":
if (
user_data["Info"]["GameId_1"] == "-"
and user_data["Info"]["GameId_2"] == "-"
):
data["Configurations"]["Default"][
"GUI.UseAlternateStage"
] = "False"
@@ -1023,29 +1054,92 @@ class MaaManager(QObject):
"Fight.UseExpiringMedicine"
] = "True" # 无限吃48小时内过期的理智药
# 自定义基建配置
if self.data[index][11] == "n":
data["Configurations"]["Default"][
"Infrast.CustomInfrastEnabled"
] = "False" # 禁用自定义基建配置
if user_data["Info"]["Infrastructure"]:
if (
self.data[index]["Path"]
/ "Infrastructure/infrastructure.json"
).exists():
data["Configurations"]["Default"][
"Infrast.CustomInfrastEnabled"
] = "True" # 启用自定义基建配置
data["Configurations"]["Default"][
"Infrast.CustomInfrastPlanIndex"
] = "1" # 自定义基建配置索引
data["Configurations"]["Default"][
"Infrast.DefaultInfrast"
] = "user_defined" # 内置配置
data["Configurations"]["Default"][
"Infrast.IsCustomInfrastFileReadOnly"
] = "False" # 自定义基建配置文件只读
data["Configurations"]["Default"][
"Infrast.CustomInfrastFile"
] = str(
self.data[index]["Path"]
/ "Infrastructure/infrastructure.json"
) # 自定义基建配置文件地址
else:
logger.warning(
f"未选择用户 {user_data["Info"]["Name"]} 的自定义基建配置文件"
)
self.push_info_bar.emit(
"warning",
"启用自定义基建失败",
f"未选择用户 {user_data["Info"]["Name"]} 的自定义基建配置文件",
-1,
)
data["Configurations"]["Default"][
"Infrast.CustomInfrastEnabled"
] = "False" # 禁用自定义基建配置
else:
data["Configurations"]["Default"][
"Infrast.CustomInfrastEnabled"
] = "True" # 用自定义基建配置
] = "False" # 用自定义基建配置
elif user_data["Info"]["Mode"] == "详细":
if "剿灭" in mode:
pass
elif "日常" in mode:
data["Configurations"]["Default"]["MainFunction.UseMedicine"] = (
"False" if user_data["Info"]["MedicineNumb"] == 0 else "True"
) # 吃理智药
data["Configurations"]["Default"][
"MainFunction.UseMedicine.Quantity"
] = str(
user_data["Info"]["MedicineNumb"]
) # 吃理智药数量
data["Configurations"]["Default"]["MainFunction.Stage1"] = (
user_data["Info"]["GameId"]
if user_data["Info"]["GameId"] != "-"
else ""
) # 主关卡
data["Configurations"]["Default"]["MainFunction.Stage2"] = (
user_data["Info"]["GameId_1"]
if user_data["Info"]["GameId_1"] != "-"
else ""
) # 备选关卡1
data["Configurations"]["Default"]["MainFunction.Stage3"] = (
user_data["Info"]["GameId_2"]
if user_data["Info"]["GameId_2"] != "-"
else ""
) # 备选关卡2
# 备选关卡
if (
user_data["Info"]["GameId_1"] == "-"
and user_data["Info"]["GameId_2"] == "-"
):
data["Configurations"]["Default"][
"Infrast.CustomInfrastPlanIndex"
] = "1" # 自定义基建配置索引
"GUI.UseAlternateStage"
] = "False"
else:
data["Configurations"]["Default"][
"Infrast.DefaultInfrast"
] = "user_defined" # 内置配置
data["Configurations"]["Default"][
"Infrast.IsCustomInfrastFileReadOnly"
] = "False" # 自定义基建配置文件只读
data["Configurations"]["Default"][
"Infrast.CustomInfrastFile"
] = str(
self.config_path
/ f"simple/{self.data[index][16]}/infrastructure/infrastructure.json"
) # 自定义基建配置文件地址
"GUI.UseAlternateStage"
] = "True"
# 人工排查配置
elif "人工排查" in mode:
@@ -1060,10 +1154,6 @@ class MaaManager(QObject):
"Start.RunDirectly"
] = "True" # 启动MAA后直接运行
data["Global"]["Start.MinimizeDirectly"] = "True" # 启动MAA后直接最小化
# 启动MAA后直接运行
data["Configurations"]["Default"]["Start.OpenEmulatorAfterLaunch"] = "True"
# 启动MAA后自动开启模拟器
data["Configurations"]["Default"]["Start.RunDirectly"] = "True"
data["Global"]["GUI.UseTray"] = "True" # 显示托盘图标
data["Global"]["GUI.MinimizeToTray"] = "True" # 最小化时隐藏至托盘
@@ -1071,7 +1161,19 @@ class MaaManager(QObject):
"True" if self.if_open_emulator else "False"
) # 启动MAA后自动开启模拟器
if self.data[index][15] == "simple":
# 账号切换
if user_data["Info"]["Server"] == "Official":
data["Configurations"]["Default"]["Start.AccountName"] = (
f"{user_data["Info"]["Id"][:3]}****{user_data["Info"]["Id"][7:]}"
if len(user_data["Info"]["Id"]) == 11
else user_data["Info"]["Id"]
)
elif user_data["Info"]["Server"] == "Bilibili":
data["Configurations"]["Default"]["Start.AccountName"] = user_data[
"Info"
]["Id"]
if user_data["Info"]["Mode"] == "简洁":
data["Global"][
"VersionUpdate.ScheduledUpdateCheck"
@@ -1082,22 +1184,11 @@ class MaaManager(QObject):
data["Global"][
"VersionUpdate.AutoInstallUpdatePackage"
] = "False" # 自动安装更新包
data["Configurations"]["Default"]["Start.ClientType"] = self.data[
index
data["Configurations"]["Default"]["Start.ClientType"] = user_data[
"Info"
][
2
"Server"
] # 客户端类型
# 账号切换
if self.data[index][2] == "Official":
data["Configurations"]["Default"]["Start.AccountName"] = (
f"{self.data[index][1][:3]}****{self.data[index][1][7:]}"
if len(self.data[index][1]) == 11
else self.data[index][1]
)
elif self.data[index][2] == "Bilibili":
data["Configurations"]["Default"]["Start.AccountName"] = self.data[
index
][1]
data["Configurations"]["Default"][
"TaskQueue.WakeUp.IsChecked"
@@ -1140,7 +1231,7 @@ class MaaManager(QObject):
"Start.OpenEmulatorAfterLaunch"
] = "False" # 启动MAA后自动开启模拟器
if Config.global_config.get(Config.global_config.function_IfSilence):
if Config.get(Config.function_IfSilence):
data["Global"][
"Start.MinimizeDirectly"
] = "False" # 启动MAA后直接最小化
@@ -1204,9 +1295,7 @@ class MaaManager(QObject):
with self.maa_tasks_path.open(mode="r", encoding="utf-8") as f:
data = json.load(f)
if if_agree and Config.global_config.get(
Config.global_config.function_IfAgreeBilibili
):
if if_agree and Config.get(Config.function_IfAgreeBilibili):
data["BilibiliAgreement_AUTO"] = {
"algorithm": "OcrDetect",
"action": "ClickSelf",
@@ -1226,14 +1315,6 @@ class MaaManager(QObject):
with self.maa_tasks_path.open(mode="w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def server_date(self):
"""获取当前的服务器日期"""
dt = datetime.now()
if dt.time() < datetime.min.time().replace(hour=4):
dt = dt - timedelta(days=1)
return dt.strftime("%Y-%m-%d")
def push_notification(
self,
mode: str,
@@ -1318,60 +1399,3 @@ class MaaManager(QObject):
Notify.send_mail("网页", title, message_html)
Notify.ServerChanPush(title, "好羡慕~\n\nAUTO_MAA 敬上")
Notify.CompanyWebHookBotPush(title, "好羡慕~\n\nAUTO_MAA 敬上")
def record_weekly_annihilation_limit(self, username: str) -> None:
"""
持久化记录当前用户在本周剿灭模式达到上限的信息
计算方式以周一凌晨4点为一周起点故将当前时间减4小时再计算 ISO 周
"""
self.weekly_annihilation_limit_reached = True
logger.info(f"已记录用户 {username} 达到本周剿灭模式达到上限")
now = datetime.now()
shifted_time = now - timedelta(hours=4)
year, week, _ = shifted_time.isocalendar()
# 构造/history/annihilation.json路径
record_path = Config.app_path / "history" / "annihilation.json"
record_path.parent.mkdir(parents=True, exist_ok=True)
# 如果文件存在,则读取原有数据,否则初始化为空字典
if record_path.exists():
with record_path.open("r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {}
# 如果该用户本周已经记录则不更新,否则写入当前记录
if username in data:
existing = data[username]
if existing.get("year") == year and existing.get("week") == week:
return
data[username] = {"year": year, "week": week}
with record_path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
@staticmethod
def load_weekly_annihilation_status(username: str) -> bool:
"""
读取 /history/annihilation.json 判断当前用户是否在本周已经达到剿灭上限
计算方式当前时间减4小时再计算 ISO 周
"""
record_path = Config.app_path / "history" / "annihilation.json"
if not record_path.exists():
logger.info("未找到记录文件,将不进行剿灭上限判断")
return False
with record_path.open("r", encoding="utf-8") as f:
data = json.load(f)
now = datetime.now()
shifted_time = now - timedelta(hours=4)
year, week, _ = shifted_time.isocalendar()
if username in data:
existing = data[username]
if existing.get("year") == year and existing.get("week") == week:
return True
return False

View File

@@ -53,7 +53,7 @@ class Notification(QWidget):
def push_plyer(self, title, message, ticker, t):
"""推送系统通知"""
if Config.global_config.get(Config.global_config.notify_IfPushPlyer):
if Config.get(Config.notify_IfPushPlyer):
notification.notify(
title=title,
@@ -70,27 +70,21 @@ class Notification(QWidget):
def send_mail(self, mode, title, content) -> None:
"""推送邮件通知"""
if Config.global_config.get(Config.global_config.notify_IfSendMail):
if Config.get(Config.notify_IfSendMail):
if (
Config.global_config.get(Config.global_config.notify_SMTPServerAddress)
== ""
or Config.global_config.get(
Config.global_config.notify_AuthorizationCode
)
== ""
Config.get(Config.notify_SMTPServerAddress) == ""
or Config.get(Config.notify_AuthorizationCode) == ""
or not bool(
re.match(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
Config.global_config.get(
Config.global_config.notify_FromAddress
),
Config.get(Config.notify_FromAddress),
)
)
or not bool(
re.match(
r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
Config.global_config.get(Config.global_config.notify_ToAddress),
Config.get(Config.notify_ToAddress),
)
)
):
@@ -114,15 +108,13 @@ class Notification(QWidget):
message["From"] = formataddr(
(
Header("AUTO_MAA通知服务", "utf-8").encode(),
Config.global_config.get(
Config.global_config.notify_FromAddress
),
Config.get(Config.notify_FromAddress),
)
) # 发件人显示的名字
message["To"] = formataddr(
(
Header("AUTO_MAA用户", "utf-8").encode(),
Config.global_config.get(Config.global_config.notify_ToAddress),
Config.get(Config.notify_ToAddress),
)
) # 收件人显示的名字
message["Subject"] = Header(title, "utf-8")
@@ -131,22 +123,16 @@ class Notification(QWidget):
message.attach(MIMEText(content, "html", "utf-8"))
smtpObj = smtplib.SMTP_SSL(
Config.global_config.get(
Config.global_config.notify_SMTPServerAddress
),
Config.get(Config.notify_SMTPServerAddress),
465,
)
smtpObj.login(
Config.global_config.get(Config.global_config.notify_FromAddress),
Crypto.win_decryptor(
Config.global_config.get(
Config.global_config.notify_AuthorizationCode
)
),
Config.get(Config.notify_FromAddress),
Crypto.win_decryptor(Config.get(Config.notify_AuthorizationCode)),
)
smtpObj.sendmail(
Config.global_config.get(Config.global_config.notify_FromAddress),
Config.global_config.get(Config.global_config.notify_ToAddress),
Config.get(Config.notify_FromAddress),
Config.get(Config.notify_ToAddress),
message.as_string(),
)
smtpObj.quit()
@@ -158,12 +144,9 @@ class Notification(QWidget):
def ServerChanPush(self, title, content):
"""使用Server酱推送通知"""
if Config.global_config.get(Config.global_config.notify_IfServerChan):
if Config.get(Config.notify_IfServerChan):
if (
Config.global_config.get(Config.global_config.notify_ServerChanKey)
== ""
):
if Config.get(Config.notify_ServerChanKey) == "":
logger.error("请正确设置Server酱的SendKey")
self.push_info_bar.emit(
"error",
@@ -173,9 +156,7 @@ class Notification(QWidget):
)
return None
else:
send_key = Config.global_config.get(
Config.global_config.notify_ServerChanKey
)
send_key = Config.get(Config.notify_ServerChanKey)
option = {}
is_valid = lambda s: s == "" or (
@@ -186,16 +167,11 @@ class Notification(QWidget):
允许空的Tag和Channel即不启用但不允许例如a||b|a|ba|b|||||
"""
send_tag = "|".join(
_.strip()
for _ in Config.global_config.get(
Config.global_config.notify_ServerChanTag
).split("|")
_.strip() for _ in Config.get(Config.notify_ServerChanTag).split("|")
)
send_channel = "|".join(
_.strip()
for _ in Config.global_config.get(
Config.global_config.notify_ServerChanChannel
).split("|")
for _ in Config.get(Config.notify_ServerChanChannel).split("|")
)
if is_valid(send_tag):
@@ -239,14 +215,9 @@ class Notification(QWidget):
def CompanyWebHookBotPush(self, title, content):
"""使用企业微信群机器人推送通知"""
if Config.global_config.get(Config.global_config.notify_IfCompanyWebHookBot):
if Config.get(Config.notify_IfCompanyWebHookBot):
if (
Config.global_config.get(
Config.global_config.notify_CompanyWebHookBotUrl
)
== ""
):
if Config.get(Config.notify_CompanyWebHookBotUrl) == "":
logger.error("请正确设置企业微信群机器人的WebHook地址")
self.push_info_bar.emit(
"error",
@@ -259,9 +230,7 @@ class Notification(QWidget):
content = f"{title}\n{content}"
data = {"msgtype": "text", "text": {"content": content}}
response = requests.post(
url=Config.global_config.get(
Config.global_config.notify_CompanyWebHookBotUrl
),
url=Config.get(Config.notify_CompanyWebHookBotUrl),
json=data,
)
if response.json()["errcode"] == 0:
@@ -291,7 +260,7 @@ class Notification(QWidget):
)
# 发送邮件通知
if Config.global_config.get(Config.global_config.notify_IfSendMail):
if Config.get(Config.notify_IfSendMail):
self.send_mail(
"文本",
"AUTO_MAA测试通知",
@@ -299,14 +268,14 @@ class Notification(QWidget):
)
# 发送Server酱通知
if Config.global_config.get(Config.global_config.notify_IfServerChan):
if Config.get(Config.notify_IfServerChan):
self.ServerChanPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",
)
# 发送企业微信机器人通知
if Config.global_config.get(Config.global_config.notify_IfCompanyWebHookBot):
if Config.get(Config.notify_IfCompanyWebHookBot):
self.CompanyWebHookBotPush(
"AUTO_MAA测试通知",
"这是 AUTO_MAA 外部通知测试信息。如果你看到了这段内容,说明 AUTO_MAA 的通知功能已经正确配置且可以正常工作!",

View File

@@ -26,7 +26,6 @@ v4.2
"""
from loguru import logger
import sqlite3
import hashlib
import random
import secrets
@@ -85,9 +84,12 @@ class CryptoHandler:
private_key_local = AES_key.encrypt(pad(private_key.exportKey(), 32))
(Config.app_path / "data/key/private_key.bin").write_bytes(private_key_local)
def AUTO_encryptor(self, note: str) -> bytes:
def AUTO_encryptor(self, note: str) -> str:
"""使用AUTO_MAA的算法加密数据"""
if note == "":
return ""
# 读取RSA公钥
public_key_local = RSA.import_key(
(Config.app_path / "data/key/public_key.pem").read_bytes()
@@ -95,11 +97,14 @@ class CryptoHandler:
# 使用RSA公钥对数据进行加密
cipher = PKCS1_OAEP.new(public_key_local)
encrypted = cipher.encrypt(note.encode("utf-8"))
return encrypted
return base64.b64encode(encrypted).decode("utf-8")
def AUTO_decryptor(self, note: bytes, PASSWORD: str) -> str:
def AUTO_decryptor(self, note: str, PASSWORD: str) -> str:
"""使用AUTO_MAA的算法解密数据"""
if note == "":
return ""
# 读入RSA私钥密文、盐与校验哈希值
private_key_local = (
(Config.app_path / "data/key/private_key.bin").read_bytes().strip()
@@ -133,63 +138,40 @@ class CryptoHandler:
private_key = RSA.import_key(private_key_pem)
# 使用RSA私钥解密数据
decrypter = PKCS1_OAEP.new(private_key)
note = decrypter.decrypt(note)
return note.decode("utf-8")
note = decrypter.decrypt(base64.b64decode(note)).decode("utf-8")
return note
def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None:
"""修改管理密钥"""
member_list = self.search_member()
for user_data in member_list:
# 读取用户数据
db = sqlite3.connect(user_data["Path"])
cur = db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
for member in Config.member_dict.values():
# 使用旧管理密钥解密
user_data["Password"] = []
for i in range(len(data)):
user_data["Password"].append(
self.AUTO_decryptor(data[i][12], PASSWORD_old)
for user in member["UserData"].values():
user["Password"] = self.AUTO_decryptor(
user["Config"].get(user["Config"].Info_Password), PASSWORD_old
)
cur.close()
db.close()
self.get_PASSWORD(PASSWORD_new)
for user_data in member_list:
# 读取用户数据
db = sqlite3.connect(user_data["Path"])
cur = db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
for member in Config.member_dict.values():
# 使用新管理密钥重新加密
for i in range(len(data)):
cur.execute(
"UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?",
(
self.AUTO_encryptor(user_data["Password"][i]),
data[i][15],
data[i][16],
),
for user in member["UserData"].values():
user["Config"].set(
user["Config"].Info_Password, self.AUTO_encryptor(user["Password"])
)
db.commit()
user_data["Password"][i] = None
del user_data["Password"]
cur.close()
db.close()
user["Password"] = None
del user["Password"]
def win_encryptor(
self, note: str, description: str = None, entropy: bytes = None
) -> str:
"""使用Windows DPAPI加密数据"""
if note == "":
return ""
encrypted = win32crypt.CryptProtectData(
note.encode("utf-8"), description, entropy, None, None, 0
)
@@ -223,7 +205,7 @@ class CryptoHandler:
"""验证管理密钥"""
return bool(
self.AUTO_decryptor(self.AUTO_encryptor(""), PASSWORD) != "管理密钥错误"
self.AUTO_decryptor(self.AUTO_encryptor("-"), PASSWORD) != "管理密钥错误"
)

View File

@@ -54,7 +54,7 @@ class _SystemHandler:
def set_Sleep(self) -> None:
"""同步系统休眠状态"""
if Config.global_config.get(Config.global_config.function_IfAllowSleep):
if Config.get(Config.function_IfAllowSleep):
# 设置系统电源状态
ctypes.windll.kernel32.SetThreadExecutionState(
self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED
@@ -66,10 +66,7 @@ class _SystemHandler:
def set_SelfStart(self) -> None:
"""同步开机自启"""
if (
Config.global_config.get(Config.global_config.start_IfSelfStart)
and not self.is_startup()
):
if Config.get(Config.start_IfSelfStart) and not self.is_startup():
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
@@ -78,10 +75,7 @@ class _SystemHandler:
)
winreg.SetValueEx(key, "AUTO_MAA", 0, winreg.REG_SZ, Config.app_path_sys)
winreg.CloseKey(key)
elif (
not Config.global_config.get(Config.global_config.start_IfSelfStart)
and self.is_startup()
):
elif not Config.get(Config.start_IfSelfStart) and self.is_startup():
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",

View File

@@ -46,12 +46,13 @@ from qfluentwidgets import (
FluentIconBase,
Signal,
ComboBox,
EditableComboBox,
CheckBox,
IconWidget,
FluentIcon,
CardWidget,
BodyLabel,
qconfig,
QConfig,
ConfigItem,
TimeEdit,
OptionsConfigItem,
@@ -65,22 +66,27 @@ from qfluentwidgets import (
ProgressRing,
TextBrowser,
HeaderCardWidget,
SwitchButton,
IndicatorPosition,
Slider,
)
from qfluentwidgets.common.overload import singledispatchmethod
import os
import re
import markdown
from datetime import datetime
from urllib.parse import urlparse
from functools import partial
from typing import Optional, Union, List, Dict
from app.core import Config
from app.services import Crypto
class LineEditMessageBox(MessageBoxBase):
"""输入对话框"""
def __init__(self, parent, title: str, content: str, mode: str):
def __init__(self, parent, title: str, content: Union[str, None], mode: str):
super().__init__(parent)
self.title = SubtitleLabel(title)
@@ -90,6 +96,7 @@ class LineEditMessageBox(MessageBoxBase):
elif mode == "密码":
self.input = PasswordLineEdit()
self.input.returnPressed.connect(self.yesButton.click)
self.input.setPlaceholderText(content)
# 将组件添加到布局中
@@ -248,6 +255,143 @@ class NoticeMessageBox(MessageBoxBase):
self.Layout.addStretch(1)
class SwitchSettingCard(SettingCard):
"""Setting card with switch button"""
checkedChanged = Signal(bool)
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.switchButton = SwitchButton(self.tr("Off"), self, IndicatorPosition.RIGHT)
if configItem:
self.setValue(self.qconfig.get(configItem))
configItem.valueChanged.connect(self.setValue)
# add switch button to layout
self.hBoxLayout.addWidget(self.switchButton, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.switchButton.checkedChanged.connect(self.__onCheckedChanged)
def __onCheckedChanged(self, isChecked: bool):
"""switch button checked state changed slot"""
self.setValue(isChecked)
self.checkedChanged.emit(isChecked)
def setValue(self, isChecked: bool):
if self.configItem:
self.qconfig.set(self.configItem, isChecked)
self.switchButton.setChecked(isChecked)
self.switchButton.setText(self.tr("On") if isChecked else self.tr("Off"))
def setChecked(self, isChecked: bool):
self.setValue(isChecked)
def isChecked(self):
return self.switchButton.isChecked()
class RangeSettingCard(SettingCard):
"""Setting card with a slider"""
valueChanged = Signal(int)
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.slider = Slider(Qt.Horizontal, self)
self.valueLabel = QLabel(self)
self.slider.setMinimumWidth(268)
self.slider.setSingleStep(1)
self.slider.setRange(*configItem.range)
self.slider.setValue(configItem.value)
self.valueLabel.setNum(configItem.value)
self.hBoxLayout.addStretch(1)
self.hBoxLayout.addWidget(self.valueLabel, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(6)
self.hBoxLayout.addWidget(self.slider, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.valueLabel.setObjectName("valueLabel")
configItem.valueChanged.connect(self.setValue)
self.slider.valueChanged.connect(self.__onValueChanged)
def __onValueChanged(self, value: int):
"""slider value changed slot"""
self.setValue(value)
self.valueChanged.emit(value)
def setValue(self, value):
self.qconfig.set(self.configItem, value)
self.valueLabel.setNum(value)
self.valueLabel.adjustSize()
self.slider.setValue(value)
class ComboBoxSettingCard(SettingCard):
"""Setting card with a combo box"""
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
texts: List[str],
qconfig: QConfig,
configItem: OptionsConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.comboBox = ComboBox(self)
self.hBoxLayout.addWidget(self.comboBox, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.optionToText = {o: t for o, t in zip(configItem.options, texts)}
for text, option in zip(texts, configItem.options):
self.comboBox.addItem(text, userData=option)
self.comboBox.setCurrentText(self.optionToText[self.qconfig.get(configItem)])
self.comboBox.currentIndexChanged.connect(self._onCurrentIndexChanged)
configItem.valueChanged.connect(self.setValue)
def _onCurrentIndexChanged(self, index: int):
self.qconfig.set(self.configItem, self.comboBox.itemData(index))
def setValue(self, value):
if value not in self.optionToText:
return
self.comboBox.setCurrentText(self.optionToText[value])
self.qconfig.set(self.configItem, value)
class LineEditSettingCard(SettingCard):
"""Setting card with LineEdit"""
@@ -255,22 +399,24 @@ class LineEditSettingCard(SettingCard):
def __init__(
self,
text,
icon: Union[str, QIcon, FluentIconBase],
title,
content=None,
configItem: ConfigItem = None,
title: str,
content: Union[str, None],
text: str,
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.LineEdit = LineEdit(self)
self.LineEdit.setMinimumWidth(250)
self.LineEdit.setPlaceholderText(text)
if configItem:
self.setValue(qconfig.get(configItem))
self.setValue(self.qconfig.get(configItem))
configItem.valueChanged.connect(self.setValue)
self.hBoxLayout.addWidget(self.LineEdit, 0, Qt.AlignRight)
@@ -279,39 +425,46 @@ class LineEditSettingCard(SettingCard):
self.LineEdit.textChanged.connect(self.__textChanged)
def __textChanged(self, content: str):
self.setValue(content)
self.textChanged.emit(content)
self.setValue(content.strip())
self.textChanged.emit(content.strip())
def setValue(self, content: str):
if self.configItem:
qconfig.set(self.configItem, content)
self.qconfig.set(self.configItem, content.strip())
self.LineEdit.setText(content)
self.LineEdit.setText(content.strip())
class PasswordLineEditSettingCard(SettingCard):
"""Setting card with PasswordLineEdit"""
textChanged = Signal(str)
textChanged = Signal()
def __init__(
self,
text,
icon: Union[str, QIcon, FluentIconBase],
title,
content=None,
configItem: ConfigItem = None,
title: str,
content: Union[str, None],
text: str,
algorithm: str,
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.algorithm = algorithm
self.qconfig = qconfig
self.configItem = configItem
self.LineEdit = PasswordLineEdit(self)
self.LineEdit.setMinimumWidth(250)
self.LineEdit.setMinimumWidth(200)
self.LineEdit.setPlaceholderText(text)
if algorithm == "AUTO":
self.LineEdit.setViewPasswordButtonVisible(False)
self.if_setValue = False
if configItem:
self.setValue(qconfig.get(configItem))
self.setValue(self.qconfig.get(configItem))
configItem.valueChanged.connect(self.setValue)
self.hBoxLayout.addWidget(self.LineEdit, 0, Qt.AlignRight)
@@ -320,14 +473,141 @@ class PasswordLineEditSettingCard(SettingCard):
self.LineEdit.textChanged.connect(self.__textChanged)
def __textChanged(self, content: str):
self.setValue(Crypto.win_encryptor(content))
self.textChanged.emit(content)
if self.if_setValue:
return None
if self.algorithm == "DPAPI":
self.setValue(Crypto.win_encryptor(content))
elif self.algorithm == "AUTO":
self.setValue(Crypto.AUTO_encryptor(content))
self.textChanged.emit()
def setValue(self, content: str):
if self.configItem:
qconfig.set(self.configItem, content)
self.LineEdit.setText(Crypto.win_decryptor(content))
self.if_setValue = True
if self.configItem:
self.qconfig.set(self.configItem, content)
if self.algorithm == "DPAPI":
self.LineEdit.setText(Crypto.win_decryptor(content))
elif self.algorithm == "AUTO":
if Crypto.check_PASSWORD(Config.PASSWORD):
self.LineEdit.setText(Crypto.AUTO_decryptor(content, Config.PASSWORD))
self.LineEdit.setPasswordVisible(True)
self.LineEdit.setReadOnly(False)
elif Config.PASSWORD:
self.LineEdit.setText("管理密钥错误")
self.LineEdit.setPasswordVisible(True)
self.LineEdit.setReadOnly(True)
else:
self.LineEdit.setText("************")
self.LineEdit.setPasswordVisible(False)
self.LineEdit.setReadOnly(True)
self.if_setValue = False
class UserLableSettingCard(SettingCard):
"""Setting card with User's Lable"""
textChanged = Signal(str)
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
qconfig: QConfig,
configItems: Dict[str, ConfigItem],
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItems = configItems
self.Lable = SubtitleLabel(self)
if configItems:
for configItem in configItems.values():
configItem.valueChanged.connect(self.setValue)
self.setValue()
self.hBoxLayout.addWidget(self.Lable, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
def setValue(self):
if self.configItems:
text_list = []
if not self.qconfig.get(self.configItems["IfPassCheck"]):
text_list.append("未通过人工排查")
text_list.append(
f"今日已代理{self.qconfig.get(self.configItems["ProxyTimes"])}"
if Config.server_date()
== self.qconfig.get(self.configItems["LastProxyDate"])
else "今日未进行代理"
)
text_list.append(
"本周剿灭已完成"
if datetime.strptime(
self.qconfig.get(self.configItems["LastAnnihilationDate"]),
"%Y-%m-%d",
).isocalendar()[:2]
== datetime.strptime(Config.server_date(), "%Y-%m-%d").isocalendar()[:2]
else "本周剿灭未完成"
)
self.Lable.setText(" | ".join(text_list))
class PushAndSwitchButtonSettingCard(SettingCard):
"""Setting card with push & switch button"""
checkedChanged = Signal(bool)
clicked = Signal()
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
text: str,
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.switchButton = SwitchButton("", self, IndicatorPosition.RIGHT)
self.button = PushButton(text, self)
self.hBoxLayout.addWidget(self.button, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.button.clicked.connect(self.clicked)
if configItem:
self.setValue(self.qconfig.get(configItem))
configItem.valueChanged.connect(self.setValue)
# add switch button to layout
self.hBoxLayout.addWidget(self.switchButton, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.switchButton.checkedChanged.connect(self.__onCheckedChanged)
def __onCheckedChanged(self, isChecked: bool):
"""switch button checked state changed slot"""
self.setValue(isChecked)
self.checkedChanged.emit(isChecked)
def setValue(self, isChecked: bool):
if self.configItem:
self.qconfig.set(self.configItem, isChecked)
self.switchButton.setChecked(isChecked)
self.switchButton.setText("" if isChecked else "")
class SpinBoxSettingCard(SettingCard):
@@ -337,15 +617,17 @@ class SpinBoxSettingCard(SettingCard):
def __init__(
self,
range: tuple[int, int],
icon: Union[str, QIcon, FluentIconBase],
title,
content=None,
configItem: ConfigItem = None,
title: str,
content: Union[str, None],
range: tuple[int, int],
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.SpinBox = SpinBox(self)
self.SpinBox.setRange(range[0], range[1])
@@ -366,7 +648,7 @@ class SpinBoxSettingCard(SettingCard):
def setValue(self, value: int):
if self.configItem:
qconfig.set(self.configItem, value)
self.qconfig.set(self.configItem, value)
self.SpinBox.setValue(value)
@@ -375,16 +657,18 @@ class NoOptionComboBoxSettingCard(SettingCard):
def __init__(
self,
configItem: OptionsConfigItem,
icon: Union[str, QIcon, FluentIconBase],
title,
content=None,
value=None,
texts=None,
title: str,
content: Union[str, None],
value: List[str],
texts: List[str],
qconfig: QConfig,
configItem: OptionsConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.comboBox = ComboBox(self)
self.comboBox.setMinimumWidth(250)
@@ -395,20 +679,131 @@ class NoOptionComboBoxSettingCard(SettingCard):
for text, option in zip(texts, value):
self.comboBox.addItem(text, userData=option)
self.comboBox.setCurrentText(self.optionToText[qconfig.get(configItem)])
self.comboBox.setCurrentText(self.optionToText[self.qconfig.get(configItem)])
self.comboBox.currentIndexChanged.connect(self._onCurrentIndexChanged)
configItem.valueChanged.connect(self.setValue)
def _onCurrentIndexChanged(self, index: int):
qconfig.set(self.configItem, self.comboBox.itemData(index))
self.qconfig.set(self.configItem, self.comboBox.itemData(index))
def setValue(self, value):
if value not in self.optionToText:
return
self.comboBox.setCurrentText(self.optionToText[value])
qconfig.set(self.configItem, value)
self.qconfig.set(self.configItem, value)
def reLoadOptions(self, value: List[str], texts: List[str]):
self.comboBox.currentIndexChanged.disconnect()
self.comboBox.clear()
self.optionToText = {o: t for o, t in zip(value, texts)}
for text, option in zip(texts, value):
self.comboBox.addItem(text, userData=option)
self.comboBox.setCurrentText(
self.optionToText[self.qconfig.get(self.configItem)]
)
self.comboBox.currentIndexChanged.connect(self._onCurrentIndexChanged)
class EditableComboBoxSettingCard(SettingCard):
"""Setting card with EditableComboBox"""
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title: str,
content: Union[str, None],
value: List[str],
texts: List[str],
qconfig: QConfig,
configItem: OptionsConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.comboBox = self._EditableComboBox(self)
self.comboBox.setMinimumWidth(100)
self.hBoxLayout.addWidget(self.comboBox, 0, Qt.AlignRight)
self.hBoxLayout.addSpacing(16)
self.optionToText = {o: t for o, t in zip(value, texts)}
for text, option in zip(texts, value):
self.comboBox.addItem(text, userData=option)
if qconfig.get(configItem) not in self.optionToText:
self.optionToText[qconfig.get(configItem)] = qconfig.get(configItem)
self.comboBox.addItem(
qconfig.get(configItem), userData=qconfig.get(configItem)
)
self.comboBox.setCurrentText(self.optionToText[qconfig.get(configItem)])
self.comboBox.currentIndexChanged.connect(self._onCurrentIndexChanged)
configItem.valueChanged.connect(self.setValue)
def _onCurrentIndexChanged(self, index: int):
self.qconfig.set(
self.configItem,
(
self.comboBox.itemData(index)
if self.comboBox.itemData(index)
else self.comboBox.itemText(index)
),
)
def setValue(self, value):
if value not in self.optionToText:
self.optionToText[value] = value
if self.comboBox.findText(value) == -1:
self.comboBox.addItem(value, userData=value)
else:
self.comboBox.setItemData(self.comboBox.findText(value), value)
self.comboBox.setCurrentText(self.optionToText[value])
self.qconfig.set(self.configItem, value)
def reLoadOptions(self, value: List[str], texts: List[str]):
self.comboBox.currentIndexChanged.disconnect()
self.comboBox.clear()
self.optionToText = {o: t for o, t in zip(value, texts)}
for text, option in zip(texts, value):
self.comboBox.addItem(text, userData=option)
if self.qconfig.get(self.configItem) not in self.optionToText:
self.optionToText[self.qconfig.get(self.configItem)] = self.qconfig.get(
self.configItem
)
self.comboBox.addItem(
self.qconfig.get(self.configItem),
userData=self.qconfig.get(self.configItem),
)
self.comboBox.setCurrentText(
self.optionToText[self.qconfig.get(self.configItem)]
)
self.comboBox.currentIndexChanged.connect(self._onCurrentIndexChanged)
class _EditableComboBox(EditableComboBox):
"""EditableComboBox"""
def __init__(self, parent=None):
super().__init__(parent)
def _onReturnPressed(self):
if not self.text():
return
index = self.findText(self.text())
if index >= 0 and index != self.currentIndex():
self._currentIndex = index
self.currentIndexChanged.emit(index)
elif index == -1:
self.addItem(self.text())
self.setCurrentIndex(self.count() - 1)
self.currentIndexChanged.emit(self.count() - 1)
class TimeEditSettingCard(SettingCard):
@@ -419,14 +814,16 @@ class TimeEditSettingCard(SettingCard):
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
title,
content=None,
configItem_bool: ConfigItem = None,
configItem_time: ConfigItem = None,
title: str,
content: Union[str, None],
qconfig: QConfig,
configItem_bool: ConfigItem,
configItem_time: ConfigItem,
parent=None,
):
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem_bool = configItem_bool
self.configItem_time = configItem_time
self.CheckBox = CheckBox(self)
@@ -464,13 +861,13 @@ class TimeEditSettingCard(SettingCard):
def setValue_bool(self, value: bool):
if self.configItem_bool:
qconfig.set(self.configItem_bool, value)
self.qconfig.set(self.configItem_bool, value)
self.CheckBox.setChecked(value)
def setValue_time(self, value: str):
if self.configItem_time:
qconfig.set(self.configItem_time, value)
self.qconfig.set(self.configItem_time, value)
self.TimeEdit.setTime(QTime.fromString(value, "HH:mm"))
@@ -510,31 +907,18 @@ class UrlListSettingCard(ExpandSettingCard):
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
configItem: ConfigItem,
title: str,
content: str = None,
content: Union[str, None],
qconfig: QConfig,
configItem: ConfigItem,
parent=None,
):
"""
Parameters
----------
configItem: RangeConfigItem
configuration item operated by the card
title: str
the title of card
content: str
the content of card
parent: QWidget
parent widget
"""
super().__init__(icon, title, content, parent)
self.qconfig = qconfig
self.configItem = configItem
self.addUrlButton = PushButton("添加代理网址", self)
self.urls: List[str] = qconfig.get(configItem).copy()
self.urls: List[str] = self.qconfig.get(configItem).copy()
self.__initWidget()
def __initWidget(self):
@@ -567,7 +951,7 @@ class UrlListSettingCard(ExpandSettingCard):
self.__addUrlItem(url)
self.urls.append(url)
qconfig.set(self.configItem, self.urls)
self.qconfig.set(self.configItem, self.urls)
self.urlChanged.emit(self.urls)
def __addUrlItem(self, url: str):
@@ -600,7 +984,7 @@ class UrlListSettingCard(ExpandSettingCard):
self._adjustViewSize()
self.urlChanged.emit(self.urls)
qconfig.set(self.configItem, self.urls)
self.qconfig.set(self.configItem, self.urls)
def __validate(self, value):
@@ -681,7 +1065,7 @@ class IconButton(TransparentToolButton):
icon: Union[str, QIcon, FluentIconBase],
isTooltip: bool,
tip_title: str,
tip_content: str,
tip_content: Union[str, None],
parent: QWidget = None,
):
self.__init__(parent)

View File

@@ -47,7 +47,6 @@ from qfluentwidgets import (
from PySide6.QtCore import Qt
from PySide6.QtGui import QTextCursor
from typing import List, Dict
import json
from app.core import Config, TaskManager, Task, MainInfoBar
@@ -164,31 +163,39 @@ class DispatchCenter(QWidget):
def update_top_bar(self):
"""更新顶栏"""
list = []
queue_numb, member_numb = 0, 0
if (Config.app_path / "config/QueueConfig").exists():
for json_file in (Config.app_path / "config/QueueConfig").glob("*.json"):
list.append(f"队列 - {json_file.stem}")
queue_numb += 1
if (Config.app_path / "config/MaaConfig").exists():
for subdir in (Config.app_path / "config/MaaConfig").iterdir():
if subdir.is_dir():
list.append(f"实例 - Maa - {subdir.name}")
member_numb += 1
self.script_list["主调度台"].top_bar.object.clear()
self.script_list["主调度台"].top_bar.object.addItems(list)
self.script_list["主调度台"].top_bar.mode.clear()
self.script_list["主调度台"].top_bar.mode.addItems(["自动代理", "人工排查"])
if queue_numb == 1:
for name, info in Config.queue_dict.items():
self.script_list["主调度台"].top_bar.object.addItem(
(
"队列"
if info["Config"].get(info["Config"].queueSet_Name) == ""
else f"队列 - {info["Config"].get(info["Config"].queueSet_Name)}"
),
userData=name,
)
for name, info in Config.member_dict.items():
self.script_list["主调度台"].top_bar.object.addItem(
(
f"实例 - {info['Type']}"
if info["Config"].get(info["Config"].MaaSet_Name) == ""
else f"实例 - {info['Type']} - {info["Config"].get(info["Config"].MaaSet_Name)}"
),
userData=name,
)
if len(Config.queue_dict) == 1:
self.script_list["主调度台"].top_bar.object.setCurrentIndex(0)
elif member_numb == 1:
self.script_list["主调度台"].top_bar.object.setCurrentIndex(queue_numb)
elif len(Config.member_dict) == 1:
self.script_list["主调度台"].top_bar.object.setCurrentIndex(
len(Config.queue_dict)
)
else:
self.script_list["主调度台"].top_bar.object.setCurrentIndex(-1)
self.script_list["主调度台"].top_bar.mode.clear()
self.script_list["主调度台"].top_bar.mode.addItems(["自动代理", "人工排查"])
self.script_list["主调度台"].top_bar.mode.setCurrentIndex(0)
@@ -270,32 +277,31 @@ class DispatchBox(QWidget):
)
return None
name = self.object.currentText().split(" - ")[-1]
if name in Config.running_list:
logger.warning(f"任务已存在:{name}")
MainInfoBar.push_info_bar("warning", "任务已存在", name, 5000)
if self.object.currentData() in Config.running_list:
logger.warning(f"任务已存在:{self.object.currentData()}")
MainInfoBar.push_info_bar(
"warning", "任务已存在", self.object.currentData(), 5000
)
return None
if self.object.currentText().split(" - ")[0] == "队列":
if "调度队列" in self.object.currentData():
with (Config.app_path / f"config/QueueConfig/{name}.json").open(
mode="r", encoding="utf-8"
) as f:
info = json.load(f)
logger.info(f"用户添加任务:{self.object.currentData()}")
TaskManager.add_task(
f"{self.mode.currentText()}_主调度台",
self.object.currentData(),
Config.queue_dict[self.object.currentData()]["Config"].toDict(),
)
logger.info(f"用户添加任务:{name}")
TaskManager.add_task(f"{self.mode.currentText()}_主调度台", name, info)
elif "脚本" in self.object.currentData():
elif self.object.currentText().split(" - ")[0] == "实例":
if Config.member_dict[self.object.currentData()]["Type"] == "Maa":
if self.object.currentText().split(" - ")[1] == "Maa":
info = {"Queue": {"Member_1": name}}
logger.info(f"用户添加任务:{name}")
logger.info(f"用户添加任务:{self.object.currentData()}")
TaskManager.add_task(
f"{self.mode.currentText()}_主调度台", "自定义队列", info
f"{self.mode.currentText()}_主调度台",
"自定义队列",
{"Queue": {"Member_1": self.object.currentData()}},
)
class DispatchInfoCard(HeaderCardWidget):

View File

@@ -160,15 +160,9 @@ class Home(QWidget):
def get_home_image(self) -> None:
"""获取主页图片"""
if (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "默认"
):
if Config.get(Config.function_HomeImageMode) == "默认":
pass
elif (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "自定义"
):
elif Config.get(Config.function_HomeImageMode) == "自定义":
file_path, _ = QFileDialog.getOpenFileName(
self, "打开自定义主页图片", "", "图片文件 (*.png *.jpg *.bmp)"
@@ -202,10 +196,7 @@ class Home(QWidget):
"未选择图片文件!",
5000,
)
elif (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "主题图像"
):
elif Config.get(Config.function_HomeImageMode) == "主题图像":
# 从远程服务器获取最新主题图像
for _ in range(3):
@@ -244,7 +235,6 @@ class Home(QWidget):
).exists() or (
datetime.now()
> datetime.strptime(theme_image["time"], "%Y-%m-%d %H:%M")
and datetime.strptime(theme_image["time"], "%Y-%m-%d %H:%M")
> time_local
):
@@ -293,28 +283,19 @@ class Home(QWidget):
def set_banner(self):
"""设置主页图像"""
if (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "默认"
):
if Config.get(Config.function_HomeImageMode) == "默认":
self.banner.set_banner_image(
str(Config.app_path / "resources/images/Home/BannerDefault.png")
)
self.imageButton.hide()
self.banner_text.setVisible(False)
elif (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "自定义"
):
elif Config.get(Config.function_HomeImageMode) == "自定义":
for file in Config.app_path.glob("resources/images/Home/BannerCustomize.*"):
self.banner.set_banner_image(str(file))
break
self.imageButton.show()
self.banner_text.setVisible(False)
elif (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "主题图像"
):
elif Config.get(Config.function_HomeImageMode) == "主题图像":
self.banner.set_banner_image(
str(Config.app_path / "resources/images/Home/BannerTheme.jpg")
)

View File

@@ -28,28 +28,23 @@ v4.2
from loguru import logger
from PySide6.QtWidgets import QSystemTrayIcon
from qfluentwidgets import (
qconfig,
Action,
PushButton,
SystemTrayMenu,
SplashScreen,
FluentIcon,
InfoBar,
InfoBarPosition,
setTheme,
isDarkTheme,
SystemThemeListener,
Theme,
MSFluentWindow,
NavigationItemPosition,
qconfig,
FluentBackgroundTheme,
)
from PySide6.QtGui import QIcon, QCloseEvent
from PySide6.QtCore import Qt, QTimer
import json
from PySide6.QtCore import QTimer
from datetime import datetime, timedelta
import shutil
import sys
import darkdetect
from app.core import Config, TaskManager, MainTimer, MainInfoBar
from app.services import Notify, Crypto, System
@@ -69,7 +64,6 @@ class AUTO_MAA(MSFluentWindow):
self.setWindowIcon(QIcon(str(Config.app_path / "resources/icons/AUTO_MAA.ico")))
self.setWindowTitle("AUTO_MAA")
setTheme(Theme.AUTO, lazy=True)
self.switch_theme()
self.splashScreen = SplashScreen(self.windowIcon(), self)
@@ -130,10 +124,9 @@ class AUTO_MAA(MSFluentWindow):
NavigationItemPosition.BOTTOM,
)
self.stackedWidget.currentChanged.connect(
lambda index: (self.member_manager.refresh() if index == 1 else None)
)
self.stackedWidget.currentChanged.connect(
lambda index: self.queue_manager.refresh() if index == 2 else None
lambda index: (
self.queue_manager.reload_member_name() if index == 2 else None
)
)
self.stackedWidget.currentChanged.connect(
lambda index: (
@@ -190,6 +183,7 @@ class AUTO_MAA(MSFluentWindow):
self.tray.setContextMenu(self.tray_menu)
self.tray.activated.connect(self.on_tray_activated)
Config.gameid_refreshed.connect(self.member_manager.refresh_gameid)
TaskManager.create_gui.connect(self.dispatch_center.add_board)
TaskManager.connect_gui.connect(self.dispatch_center.connect_main_board)
Notify.push_info_bar.connect(MainInfoBar.push_info_bar)
@@ -211,7 +205,10 @@ class AUTO_MAA(MSFluentWindow):
def switch_theme(self) -> None:
"""切换主题"""
setTheme(Theme.AUTO, lazy=True)
setTheme(
Theme(darkdetect.theme()) if darkdetect.theme() else Theme.LIGHT, lazy=True
)
QTimer.singleShot(300, lambda: setTheme(Theme.AUTO, lazy=True))
# 云母特效启用时需要增加重试机制
@@ -238,25 +235,22 @@ class AUTO_MAA(MSFluentWindow):
def start_up_task(self) -> None:
"""启动时任务"""
# 加载配置
qconfig.load(Config.config_path, Config.global_config)
Config.global_config.save()
# 清理旧日志
self.clean_old_logs()
# 清理临时更新器
if (Config.app_path / "AUTO_Updater.active.exe").exists():
(Config.app_path / "AUTO_Updater.active.exe").unlink()
# 检查密码
self.setting.check_PASSWORD()
# 获取主题图像
if (
Config.global_config.get(Config.global_config.function_HomeImageMode)
== "主题图像"
):
if Config.get(Config.function_HomeImageMode) == "主题图像":
self.home.get_home_image()
# 直接运行主任务
if Config.global_config.get(Config.global_config.start_IfRunDirectly):
if Config.get(Config.start_IfRunDirectly):
self.start_main_task()
@@ -264,18 +258,18 @@ class AUTO_MAA(MSFluentWindow):
self.setting.show_notice(if_show=False)
# 检查更新
if Config.global_config.get(Config.global_config.update_IfAutoUpdate):
if Config.get(Config.update_IfAutoUpdate):
self.setting.check_update()
# 直接最小化
if Config.global_config.get(Config.global_config.start_IfMinimizeDirectly):
if Config.get(Config.start_IfMinimizeDirectly):
self.titleBar.minBtn.click()
def set_min_method(self) -> None:
"""设置最小化方法"""
if Config.global_config.get(Config.global_config.ui_IfToTray):
if Config.get(Config.ui_IfToTray):
self.titleBar.minBtn.clicked.disconnect()
self.titleBar.minBtn.clicked.connect(lambda: self.show_ui("隐藏到托盘"))
@@ -295,10 +289,7 @@ class AUTO_MAA(MSFluentWindow):
删除超过用户设定天数的日志文件(基于目录日期)
"""
if (
Config.global_config.get(Config.global_config.function_HistoryRetentionTime)
== 0
):
if Config.get(Config.function_HistoryRetentionTime) == 0:
logger.info("由于用户设置日志永久保留,跳过日志清理")
return
@@ -312,9 +303,7 @@ class AUTO_MAA(MSFluentWindow):
# 只检查 `YYYY-MM-DD` 格式的文件夹
folder_date = datetime.strptime(date_folder.name, "%Y-%m-%d")
if datetime.now() - folder_date > timedelta(
days=Config.global_config.get(
Config.global_config.function_HistoryRetentionTime
)
days=Config.get(Config.function_HistoryRetentionTime)
):
shutil.rmtree(date_folder, ignore_errors=True)
deleted_count += 1
@@ -327,26 +316,25 @@ class AUTO_MAA(MSFluentWindow):
def start_main_task(self) -> None:
"""启动主任务"""
if (Config.app_path / "config/QueueConfig/调度队列_1.json").exists():
with (Config.app_path / "config/QueueConfig/调度队列_1.json").open(
mode="r", encoding="utf-8"
) as f:
info = json.load(f)
if "调度队列_1" in Config.queue_dict:
logger.info("自动添加任务调度队列_1")
TaskManager.add_task("自动代理_主调度台", "主任务队列", info)
TaskManager.add_task(
"自动代理_主调度台",
"主任务队列",
Config.queue_dict["调度队列_1"]["Config"].toDict(),
)
elif (Config.app_path / "config/MaaConfig/脚本_1").exists():
info = {"Queue": {"Member_1": "脚本_1"}}
elif "脚本_1" in Config.member_dict:
logger.info("自动添加任务脚本_1")
TaskManager.add_task("自动代理_主调度台", "主任务队列", info)
TaskManager.add_task(
"自动代理_主调度台", "主任务队列", {"Queue": {"Member_1": "脚本_1"}}
)
else:
logger.worning("启动主任务失败:未找到有效的主任务配置文件")
logger.warning("启动主任务失败:未找到有效的主任务配置文件")
MainInfoBar.push_info_bar(
"warning", "启动主任务失败", "“调度队列_1”与“脚本_1”均不存在", -1
)
@@ -354,21 +342,21 @@ class AUTO_MAA(MSFluentWindow):
def show_ui(self, mode: str, if_quick: bool = False) -> None:
"""配置窗口状态"""
self.switch_theme()
if mode == "显示主窗口":
# 配置主窗口
size = list(
map(
int,
Config.global_config.get(Config.global_config.ui_size).split("x"),
Config.get(Config.ui_size).split("x"),
)
)
location = list(
map(
int,
Config.global_config.get(Config.global_config.ui_location).split(
"x"
),
Config.get(Config.ui_location).split("x"),
)
)
self.window().setGeometry(location[0], location[1], size[0], size[1])
@@ -376,14 +364,14 @@ class AUTO_MAA(MSFluentWindow):
self.window().raise_()
self.window().activateWindow()
if not if_quick:
if Config.global_config.get(Config.global_config.ui_maximized):
if Config.get(Config.ui_maximized):
self.window().showMaximized()
self.set_min_method()
self.show_ui("配置托盘")
elif mode == "配置托盘":
if Config.global_config.get(Config.global_config.ui_IfShowTray):
if Config.get(Config.ui_IfShowTray):
self.tray.show()
else:
self.tray.hide()
@@ -393,18 +381,16 @@ class AUTO_MAA(MSFluentWindow):
# 保存窗口相关属性
if not self.window().isMaximized():
Config.global_config.set(
Config.global_config.ui_size,
Config.set(
Config.ui_size,
f"{self.geometry().width()}x{self.geometry().height()}",
)
Config.global_config.set(
Config.global_config.ui_location,
Config.set(
Config.ui_location,
f"{self.geometry().x()}x{self.geometry().y()}",
)
Config.global_config.set(
Config.global_config.ui_maximized, self.window().isMaximized()
)
Config.global_config.save()
Config.set(Config.ui_maximized, self.window().isMaximized())
Config.save()
# 隐藏主窗口
if not if_quick:
@@ -420,11 +406,10 @@ class AUTO_MAA(MSFluentWindow):
# 清理各功能线程
MainTimer.Timer.stop()
MainTimer.Timer.deleteLater()
MainTimer.LongTimer.stop()
MainTimer.LongTimer.deleteLater()
TaskManager.stop_task("ALL")
# 关闭数据库连接
Config.close_database()
# 关闭主题监听
self.themeListener.terminate()
self.themeListener.deleteLater()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -31,6 +31,7 @@ from PySide6.QtWidgets import (
QApplication,
QVBoxLayout,
)
from PySide6.QtCore import Qt
from qfluentwidgets import (
ScrollArea,
FluentIcon,
@@ -38,13 +39,12 @@ from qfluentwidgets import (
Dialog,
HyperlinkCard,
HeaderCardWidget,
SwitchSettingCard,
RangeSettingCard,
ExpandGroupSettingCard,
PushSettingCard,
ComboBoxSettingCard,
HyperlinkButton,
)
import os
import re
import json
import time
import shutil
@@ -56,8 +56,10 @@ from typing import Dict, List, Union
from app.core import Config, MainInfoBar
from app.services import Crypto, System, Notify
from app.utils import DownloadManager
from .Widget import (
SwitchSettingCard,
RangeSettingCard,
ComboBoxSettingCard,
LineEditMessageBox,
LineEditSettingCard,
PasswordLineEditSettingCard,
@@ -111,7 +113,7 @@ class Setting(QWidget):
def agree_bilibili(self) -> None:
"""授权bilibili游戏隐私政策"""
if Config.global_config.get(Config.global_config.function_IfAgreeBilibili):
if Config.get(Config.function_IfAgreeBilibili):
choice = MessageBox(
"授权声明",
@@ -124,9 +126,7 @@ class Setting(QWidget):
"success", "操作成功", "已确认授权bilibili游戏隐私政策", 3000
)
else:
Config.global_config.set(
Config.global_config.function_IfAgreeBilibili, False
)
Config.set(Config.function_IfAgreeBilibili, False)
else:
logger.info("取消授权bilibili游戏隐私政策")
@@ -141,7 +141,7 @@ class Setting(QWidget):
Path(os.getenv("APPDATA")) / "Netease/MuMuPlayer-12.0/data/startupImage"
)
if Config.global_config.get(Config.global_config.function_IfSkipMumuSplashAds):
if Config.get(Config.function_IfSkipMumuSplashAds):
choice = MessageBox(
"风险声明",
@@ -160,9 +160,7 @@ class Setting(QWidget):
"success", "操作成功", "已开启跳过MuMu启动广告功能", 3000
)
else:
Config.global_config.set(
Config.global_config.function_IfSkipMumuSplashAds, False
)
Config.set(Config.function_IfSkipMumuSplashAds, False)
else:
@@ -267,28 +265,17 @@ class Setting(QWidget):
def check_update(self) -> None:
"""检查版本更新,调起文件下载进程"""
# 从本地版本信息文件获取当前版本信息
with Config.version_path.open(mode="r", encoding="utf-8") as f:
version_current: Dict[
str, Union[str, Dict[str, Union[str, Dict[str, List[str]]]]]
] = json.load(f)
main_version_current = list(map(int, Config.VERSION.split(".")))
updater_version_current = list(
map(int, version_current["updater_version"].split("."))
)
# 检查更新器是否存在
if not (Config.app_path / "Updater.exe").exists():
updater_version_current = [0, 0, 0, 0]
current_version = list(map(int, Config.VERSION.split(".")))
# 从远程服务器获取最新版本信息
for _ in range(3):
try:
response = requests.get(
f"https://gitee.com/DLmaster_361/AUTO_MAA/raw/{Config.global_config.get(Config.global_config.update_UpdateType)}/resources/version.json"
f"https://mirrorchyan.com/api/resources/AUTO_MAA/latest?current_version={version_text(current_version)}&cdk={Crypto.win_decryptor(Config.get(Config.update_MirrorChyanCDK))}&channel={Config.get(Config.update_UpdateType)}"
)
version_info: Dict[str, Union[int, str, Dict[str, str]]] = (
response.json()
)
version_remote: Dict[
str, Union[str, Dict[str, Union[str, Dict[str, List[str]]]]]
] = response.json()
break
except Exception as e:
err = e
@@ -304,52 +291,77 @@ class Setting(QWidget):
if choice.exec():
return None
main_version_remote = list(map(int, version_remote["main_version"].split(".")))
updater_version_remote = list(
map(int, version_remote["updater_version"].split("."))
)
remote_proxy_list = version_remote["proxy_list"]
Config.global_config.set(
Config.global_config.update_ProxyUrlList,
list(
set(
Config.global_config.get(Config.global_config.update_ProxyUrlList)
+ remote_proxy_list
if version_info["code"] != 0:
logger.error(f"获取版本信息时出错:{version_info["msg"]}")
error_remark_dict = {
1001: "获取版本信息的URL参数不正确",
7001: "填入的 CDK 已过期",
7002: "填入的 CDK 错误",
7003: "填入的 CDK 今日下载次数已达上限",
7004: "填入的 CDK 类型和待下载的资源不匹配",
7005: "填入的 CDK 不合法",
8001: "对应架构和系统下的资源不存在",
8002: "错误的系统参数",
8003: "错误的架构参数",
8004: "错误的更新通道参数",
1: version_info["msg"],
}
if version_info["code"] in error_remark_dict:
MainInfoBar.push_info_bar(
"error",
"获取版本信息时出错",
error_remark_dict[version_info["code"]],
-1,
)
),
else:
MainInfoBar.push_info_bar(
"error",
"获取版本信息时出错",
"意料之外的错误,请及时联系项目组以获取来自 Mirror 酱的技术支持",
-1,
)
return None
remote_version = list(
map(
int,
version_info["data"]["version_name"][1:]
.replace("-beta", "")
.split("."),
)
)
version_info_json: Dict[str, Dict[str, str]] = json.loads(
re.sub(
r"^<!--\s*(.*?)\s*-->$",
r"\1",
version_info["data"]["release_note"].splitlines()[0],
)
)
# 有版本更新
if (main_version_remote > main_version_current) or (
updater_version_remote > updater_version_current
):
if remote_version > current_version:
# 生成版本更新信息
if main_version_remote > main_version_current:
main_version_info = f"## 主程序:{version_text(main_version_current)} --> {version_text(main_version_remote)}\n\n"
else:
main_version_info = (
f"## 主程序:{version_text(main_version_current)}\n\n"
)
if updater_version_remote > updater_version_current:
updater_version_info = f"## 更新器:{version_text(updater_version_current)} --> {version_text(updater_version_remote)}\n\n"
else:
updater_version_info = (
f"## 更新器:{version_text(updater_version_current)}\n\n"
)
main_version_info = f"## 主程序:{version_text(current_version)} --> {version_text(remote_version)}"
update_version_info = {}
all_version_info = {}
for v_i in [
info
for version, info in version_remote["version_info"].items()
if list(map(int, version.split("."))) > main_version_current
for version, info in version_info_json.items()
if list(map(int, version.split("."))) > current_version
]:
for key, value in v_i.items():
if key in update_version_info:
update_version_info[key] += value.copy()
else:
update_version_info[key] = value.copy()
for v_i in version_remote["version_info"].values():
for v_i in update_version_info.values():
for key, value in v_i.items():
if key in all_version_info:
all_version_info[key] += value.copy()
@@ -357,69 +369,48 @@ class Setting(QWidget):
all_version_info[key] = value.copy()
version_info = {
"更新总览": f"{main_version_info}{updater_version_info}{version_info_markdown(update_version_info)}",
"更新总览": f"{main_version_info}\n\n{version_info_markdown(update_version_info)}",
"ALL~版本信息": version_info_markdown(all_version_info),
**{
version_text(list(map(int, k.split(".")))): version_info_markdown(v)
for k, v in version_remote["version_info"].items()
for k, v in update_version_info.items()
},
}
# 询问是否开始版本更新
choice = NoticeMessageBox(self.window(), "版本更新", version_info)
if not choice.exec():
return None
if choice.exec():
# 更新更新器
if updater_version_remote > updater_version_current:
# 创建更新进程
self.updater = DownloadManager(
Config.app_path,
"AUTO_MAA更新器",
main_version_remote,
updater_version_remote,
{
"proxy_list": Config.global_config.get(
Config.global_config.update_ProxyUrlList
),
"download_dict": version_remote["download_dict"],
"thread_numb": Config.global_config.get(
Config.global_config.update_ThreadNumb
),
},
with Config.version_path.open(mode="r", encoding="utf-8") as f:
version_info = json.load(f)
version_info["main_version"] = Config.VERSION
with Config.version_path.open(mode="w", encoding="utf-8") as f:
json.dump(version_info, f, ensure_ascii=False, indent=4)
if (Config.app_path / "AUTO_Updater.exe").exists():
shutil.copy(
Config.app_path / "AUTO_Updater.exe",
Config.app_path / "AUTO_Updater.active.exe",
)
else:
logger.error("更新器文件不存在")
MainInfoBar.push_info_bar(
"error", "更新器不存在", "请手动前往 GitHub 获取最新版本", -1
)
return None
subprocess.Popen(
str(Config.app_path / "AUTO_Updater.active.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
# 完成更新器的更新后更新主程序
if main_version_remote > main_version_current:
self.updater.download_accomplish.connect(self.update_main)
# 显示更新页面
self.updater.show()
self.updater.run()
# 更新主程序
elif main_version_remote > main_version_current:
self.update_main()
self.close()
QApplication.quit()
# 无版本更新
else:
MainInfoBar.push_info_bar("success", "更新检查", "已是最新版本~", 3000)
def update_main(self) -> None:
"""更新主程序"""
with Config.version_path.open(mode="r", encoding="utf-8") as f:
version_info = json.load(f)
version_info["main_version"] = Config.VERSION
with Config.version_path.open(mode="w", encoding="utf-8") as f:
json.dump(version_info, f, ensure_ascii=False, indent=4)
subprocess.Popen(
str(Config.app_path / "Updater.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.close()
QApplication.quit()
def show_notice(self, if_show: bool = True):
"""显示公告"""
@@ -464,8 +455,9 @@ class Setting(QWidget):
}
if if_show or (
datetime.now() > datetime.strptime(notice["time"], "%Y-%m-%d %H:%M")
and datetime.strptime(notice["time"], "%Y-%m-%d %H:%M") > time_local
datetime.now()
> datetime.strptime(notice["time"], "%Y-%m-%d %H:%M")
> time_local
):
choice = NoticeMessageBox(self.window(), "公告", notice["notice_dict"])
@@ -485,37 +477,47 @@ class FunctionSettingCard(HeaderCardWidget):
self.setTitle("功能")
self.card_HomeImageMode = ComboBoxSettingCard(
configItem=Config.global_config.function_HomeImageMode,
icon=FluentIcon.PAGE_RIGHT,
title="主页背景图模式",
content="选择主页背景图的来源",
texts=["默认", "自定义", "主题图像"],
qconfig=Config,
configItem=Config.function_HomeImageMode,
parent=self,
)
self.card_HistoryRetentionTime = ComboBoxSettingCard(
configItem=Config.global_config.function_HistoryRetentionTime,
icon=FluentIcon.PAGE_RIGHT,
title="历史记录保留时间",
content="选择历史记录的保留时间,超期自动清理",
texts=["7 天", "15 天", "30 天", "60 天", "永久"],
qconfig=Config,
configItem=Config.function_HistoryRetentionTime,
parent=self,
)
self.card_IfAllowSleep = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="启动时阻止系统休眠",
content="仅阻止电脑自动休眠,不会影响屏幕是否熄灭",
configItem=Config.global_config.function_IfAllowSleep,
qconfig=Config,
configItem=Config.function_IfAllowSleep,
parent=self,
)
self.card_IfSilence = self.SilenceSettingCard(self)
self.card_IfAgreeBilibili = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="托管bilibili游戏隐私政策",
content="授权AUTO_MAA同意bilibili游戏隐私政策",
configItem=Config.global_config.function_IfAgreeBilibili,
qconfig=Config,
configItem=Config.function_IfAgreeBilibili,
parent=self,
)
self.card_IfSkipMumuSplashAds = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="跳过MuMu启动广告",
content="启动MuMu模拟器时屏蔽启动广告",
configItem=Config.global_config.function_IfSkipMumuSplashAds,
qconfig=Config,
configItem=Config.function_IfSkipMumuSplashAds,
parent=self,
)
Layout = QVBoxLayout()
@@ -541,14 +543,18 @@ class FunctionSettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="静默模式",
content="是否启用静默模式",
configItem=Config.global_config.function_IfSilence,
qconfig=Config,
configItem=Config.function_IfSilence,
parent=self,
)
self.card_BossKey = LineEditSettingCard(
text="请输入安卓模拟器老板键",
icon=FluentIcon.PAGE_RIGHT,
title="模拟器老板键",
content="输入模拟器老板快捷键,以“+”分隔",
configItem=Config.global_config.function_BossKey,
text="请输入安卓模拟器老板键",
qconfig=Config,
configItem=Config.function_BossKey,
parent=self,
)
widget = QWidget()
@@ -570,19 +576,25 @@ class StartSettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="开机时自动启动",
content="将AUTO_MAA添加到开机启动项",
configItem=Config.global_config.start_IfSelfStart,
qconfig=Config,
configItem=Config.start_IfSelfStart,
parent=self,
)
self.card_IfRunDirectly = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="启动后直接运行主任务",
content="启动AUTO_MAA后自动运行自动代理任务优先级调度队列 1 > 脚本 1",
configItem=Config.global_config.start_IfRunDirectly,
qconfig=Config,
configItem=Config.start_IfRunDirectly,
parent=self,
)
self.card_IfMinimizeDirectly = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="启动后直接最小化",
content="启动AUTO_MAA后直接最小化",
configItem=Config.global_config.start_IfMinimizeDirectly,
qconfig=Config,
configItem=Config.start_IfMinimizeDirectly,
parent=self,
)
Layout = QVBoxLayout()
@@ -602,13 +614,17 @@ class UiSettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="显示托盘图标",
content="常态显示托盘图标",
configItem=Config.global_config.ui_IfShowTray,
qconfig=Config,
configItem=Config.ui_IfShowTray,
parent=self,
)
self.card_IfToTray = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="最小化到托盘",
content="最小化时隐藏到托盘",
configItem=Config.global_config.ui_IfToTray,
qconfig=Config,
configItem=Config.ui_IfToTray,
parent=self,
)
Layout = QVBoxLayout()
@@ -634,6 +650,7 @@ class NotifySettingCard(HeaderCardWidget):
icon=FluentIcon.SEND,
title="测试通知",
content="发送测试通知到所有已启用的通知渠道",
parent=self,
)
self.card_TestNotification.clicked.connect(self.send_test_notification)
@@ -652,8 +669,8 @@ class NotifySettingCard(HeaderCardWidget):
MainInfoBar.push_info_bar(
"success",
"测试通知已发送",
"请检查已配置的通知渠道是否可以收到消息",
3000
"请检查已配置的通知渠道是否正常收到消息",
3000,
)
class NotifyContentSettingCard(ExpandGroupSettingCard):
@@ -664,23 +681,29 @@ class NotifySettingCard(HeaderCardWidget):
)
self.card_SendTaskResultTime = ComboBoxSettingCard(
configItem=Config.global_config.notify_SendTaskResultTime,
icon=FluentIcon.PAGE_RIGHT,
title="推送任务结果选项",
content="选择推送自动代理与人工排查任务结果的时机",
texts=["不推送", "任何时刻", "仅失败时"],
qconfig=Config,
configItem=Config.notify_SendTaskResultTime,
parent=self,
)
self.card_IfSendStatistic = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="推送统计信息",
content="推送自动代理统计信息的通知",
configItem=Config.global_config.notify_IfSendStatistic,
qconfig=Config,
configItem=Config.notify_IfSendStatistic,
parent=self,
)
self.card_IfSendSixStar = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="推送公招高资喜报",
content="公招出现六星词条时推送喜报",
configItem=Config.global_config.notify_IfSendSixStar,
qconfig=Config,
configItem=Config.notify_IfSendSixStar,
parent=self,
)
widget = QWidget()
@@ -703,7 +726,9 @@ class NotifySettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="推送系统通知",
content="使用Plyer推送系统级通知不会在通知中心停留",
configItem=Config.global_config.notify_IfPushPlyer,
qconfig=Config,
configItem=Config.notify_IfPushPlyer,
parent=self,
)
widget = QWidget()
@@ -724,35 +749,46 @@ class NotifySettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="推送邮件通知",
content="是否启用邮件通知功能",
configItem=Config.global_config.notify_IfSendMail,
qconfig=Config,
configItem=Config.notify_IfSendMail,
parent=self,
)
self.card_SMTPServerAddress = LineEditSettingCard(
text="请输入SMTP服务器地址",
icon=FluentIcon.PAGE_RIGHT,
title="SMTP服务器地址",
content="发信邮箱的SMTP服务器地址",
configItem=Config.global_config.notify_SMTPServerAddress,
text="请输入SMTP服务器地址",
qconfig=Config,
configItem=Config.notify_SMTPServerAddress,
parent=self,
)
self.card_FromAddress = LineEditSettingCard(
text="请输入发信邮箱地址",
icon=FluentIcon.PAGE_RIGHT,
title="发信邮箱地址",
content="发送通知的邮箱地址",
configItem=Config.global_config.notify_FromAddress,
text="请输入发信邮箱地址",
qconfig=Config,
configItem=Config.notify_FromAddress,
parent=self,
)
self.card_AuthorizationCode = PasswordLineEditSettingCard(
text="请输入发信邮箱授权码",
icon=FluentIcon.PAGE_RIGHT,
title="发信邮箱授权码",
content="发送通知的邮箱授权码",
configItem=Config.global_config.notify_AuthorizationCode,
text="请输入发信邮箱授权码",
algorithm="DPAPI",
qconfig=Config,
configItem=Config.notify_AuthorizationCode,
parent=self,
)
self.card_ToAddress = LineEditSettingCard(
text="请输入收信邮箱地址",
icon=FluentIcon.PAGE_RIGHT,
title="收信邮箱地址",
content="接收通知的邮箱地址",
configItem=Config.global_config.notify_ToAddress,
text="请输入收信邮箱地址",
qconfig=Config,
configItem=Config.notify_ToAddress,
parent=self,
)
widget = QWidget()
@@ -779,28 +815,36 @@ class NotifySettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="推送SeverChan通知",
content="是否启用SeverChan通知功能",
configItem=Config.global_config.notify_IfServerChan,
qconfig=Config,
configItem=Config.notify_IfServerChan,
parent=self,
)
self.card_ServerChanKey = LineEditSettingCard(
text="请输入SendKey",
icon=FluentIcon.PAGE_RIGHT,
title="SendKey",
content="Server酱的SendKeySC3与SCT都可以",
configItem=Config.global_config.notify_ServerChanKey,
text="请输入SendKey",
qconfig=Config,
configItem=Config.notify_ServerChanKey,
parent=self,
)
self.card_ServerChanChannel = LineEditSettingCard(
text="请输入需要推送的Channel代码SCT生效",
icon=FluentIcon.PAGE_RIGHT,
title="ServerChanChannel代码",
content="可以留空,留空则默认。可以多个,请使用“|”隔开",
configItem=Config.global_config.notify_ServerChanChannel,
text="请输入需要推送的Channel代码SCT生效",
qconfig=Config,
configItem=Config.notify_ServerChanChannel,
parent=self,
)
self.card_ServerChanTag = LineEditSettingCard(
text="请输入加入推送的TagSC3生效",
icon=FluentIcon.PAGE_RIGHT,
title="Tag内容",
content="可以留空,留空则默认。可以多个,请使用“|”隔开",
configItem=Config.global_config.notify_ServerChanTag,
text="请输入加入推送的TagSC3生效",
qconfig=Config,
configItem=Config.notify_ServerChanTag,
parent=self,
)
widget = QWidget()
@@ -826,14 +870,18 @@ class NotifySettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="推送企业微信机器人通知",
content="是否启用企业微信机器人通知功能",
configItem=Config.global_config.notify_IfCompanyWebHookBot,
qconfig=Config,
configItem=Config.notify_IfCompanyWebHookBot,
parent=self,
)
self.card_CompanyWebHookBotUrl = LineEditSettingCard(
text="请输入Webhook的Url",
icon=FluentIcon.PAGE_RIGHT,
title="WebhookUrl",
content="企业微信群机器人的Webhook地址",
configItem=Config.global_config.notify_CompanyWebHookBotUrl,
text="请输入Webhook的Url",
qconfig=Config,
configItem=Config.notify_CompanyWebHookBotUrl,
parent=self,
)
widget = QWidget()
@@ -856,6 +904,7 @@ class SecuritySettingCard(HeaderCardWidget):
icon=FluentIcon.VPN,
title="修改管理密钥",
content="修改用于解密用户密码的管理密钥",
parent=self,
)
Layout = QVBoxLayout()
@@ -869,45 +918,70 @@ class UpdaterSettingCard(HeaderCardWidget):
super().__init__(parent)
self.setTitle("更新")
self.card_IfAutoUpdate = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="自动检查更新",
content="将在启动时自动检查AUTO_MAA是否有新版本",
configItem=Config.global_config.update_IfAutoUpdate,
)
self.card_UpdateType = ComboBoxSettingCard(
configItem=Config.global_config.update_UpdateType,
icon=FluentIcon.PAGE_RIGHT,
title="版本更新类别",
content="选择AUTO_MAA的更新类别",
texts=["稳定版", "公测版"],
)
self.card_ThreadNumb = RangeSettingCard(
configItem=Config.global_config.update_ThreadNumb,
icon=FluentIcon.PAGE_RIGHT,
title="下载器线程数",
content="更新器的下载线程数,建议仅在下载速度较慢时适量拉高",
)
self.card_ProxyUrlList = UrlListSettingCard(
icon=FluentIcon.SETTING,
configItem=Config.global_config.update_ProxyUrlList,
title="代理地址列表",
content="更新器代理地址列表",
parent=self,
)
self.card_CheckUpdate = PushSettingCard(
text="检查更新",
icon=FluentIcon.UPDATE,
title="获取最新版本",
content="检查AUTO_MAA是否有新版本",
parent=self,
)
self.card_IfAutoUpdate = SwitchSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="自动检查更新",
content="将在启动时自动检查AUTO_MAA是否有新版本",
qconfig=Config,
configItem=Config.update_IfAutoUpdate,
parent=self,
)
self.card_UpdateType = ComboBoxSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="版本更新类别",
content="选择AUTO_MAA的更新类别",
texts=["稳定版", "公测版"],
qconfig=Config,
configItem=Config.update_UpdateType,
parent=self,
)
self.card_ThreadNumb = RangeSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="下载器线程数",
content="更新器的下载线程数,建议仅在下载速度较慢时适量拉高",
qconfig=Config,
configItem=Config.update_ThreadNumb,
parent=self,
)
self.card_ProxyUrlList = UrlListSettingCard(
icon=FluentIcon.SETTING,
title="代理地址列表",
content="更新器代理地址列表",
qconfig=Config,
configItem=Config.update_ProxyUrlList,
parent=self,
)
self.card_MirrorChyanCDK = PasswordLineEditSettingCard(
icon=FluentIcon.PAGE_RIGHT,
title="Mirror酱CDK",
content="填写后改为使用由Mirror酱提供的下载服务",
text="请输入Mirror酱CDK",
algorithm="DPAPI",
qconfig=Config,
configItem=Config.update_MirrorChyanCDK,
parent=self,
)
mirrorchyan_url = HyperlinkButton(
"https://mirrorchyan.com/", "购买Mirror酱CDK", self
)
self.card_MirrorChyanCDK.hBoxLayout.insertWidget(
5, mirrorchyan_url, 0, Qt.AlignRight
)
Layout = QVBoxLayout()
Layout.addWidget(self.card_CheckUpdate)
Layout.addWidget(self.card_IfAutoUpdate)
Layout.addWidget(self.card_UpdateType)
Layout.addWidget(self.card_ThreadNumb)
Layout.addWidget(self.card_ProxyUrlList)
Layout.addWidget(self.card_CheckUpdate)
Layout.addWidget(self.card_MirrorChyanCDK)
self.viewLayout.addLayout(Layout)
@@ -922,6 +996,7 @@ class OtherSettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="公告",
content="查看AUTO_MAA的最新公告",
parent=self,
)
self.card_UserDocs = HyperlinkCard(
url="https://clozya.github.io/AUTOMAA_docs",
@@ -929,8 +1004,9 @@ class OtherSettingCard(HeaderCardWidget):
icon=FluentIcon.PAGE_RIGHT,
title="AUTO_MAA官方文档站",
content="访问AUTO_MAA的官方文档站获取使用指南和项目相关信息",
parent=self,
)
self.card_Association = self.AssociationSettingCard()
self.card_Association = self.AssociationSettingCard(self)
Layout = QVBoxLayout()
Layout.addWidget(self.card_Notice)
@@ -954,6 +1030,7 @@ class OtherSettingCard(HeaderCardWidget):
icon=FluentIcon.GITHUB,
title="GitHub",
content="查看AUTO_MAA的源代码提交问题和建议欢迎参与开发",
parent=self,
)
self.card_QQGroup = HyperlinkCard(
url="https://qm.qq.com/q/bd9fISNoME",
@@ -961,6 +1038,7 @@ class OtherSettingCard(HeaderCardWidget):
icon=FluentIcon.CHAT,
title="QQ群",
content="与AUTO_MAA开发者和用户交流",
parent=self,
)
widget = QWidget()

View File

@@ -31,6 +31,8 @@ import zipfile
import requests
import subprocess
import time
import win32crypt
import base64
from functools import partial
from pathlib import Path
@@ -199,7 +201,6 @@ class DownloadManager(QDialog):
app_path: Path,
name: str,
main_version: list,
updater_version: list,
config: dict,
) -> None:
super().__init__()
@@ -207,7 +208,6 @@ class DownloadManager(QDialog):
self.app_path = app_path
self.name = name
self.main_version = main_version
self.updater_version = updater_version
self.config = config
self.download_path = app_path / "DOWNLOAD_TEMP.zip" # 临时下载文件的路径
self.version_path = app_path / "resources/version.json"
@@ -241,9 +241,12 @@ class DownloadManager(QDialog):
if self.name == "MAA":
self.download_task1()
else:
self.test_speed_task1()
self.speed_test_accomplish.connect(self.download_task1)
elif self.name == "AUTO_MAA":
if self.config["mode"] == "Proxy":
self.test_speed_task1()
self.speed_test_accomplish.connect(self.download_task1)
elif self.config["mode"] == "MirrorChyan":
self.download_task1()
def get_download_url(self, mode: str) -> Union[str, Dict[str, str]]:
"""获取下载链接"""
@@ -273,37 +276,33 @@ class DownloadManager(QDialog):
if self.name == "MAA":
return f"https://jp-download.fearr.xyz/MAA/MAA-{version_text(self.main_version)}-win-x64.zip"
if "selected" in self.config:
selected_url = self.config["selected"]
elif "speed_result" in self.config:
selected_url = max(
self.config["speed_result"], key=self.config["speed_result"].get
)
if self.name == "AUTO_MAA":
if self.name == "AUTO_MAA主程序":
if self.config["mode"] == "Proxy":
if selected_url == "GitHub站":
return f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif selected_url == "官方镜像站":
return f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif selected_url in self.config["download_dict"].keys():
return f"{self.config["download_dict"][selected_url]}AUTO_MAA_{version_text(self.main_version)}.zip"
else:
return f"{selected_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
if "selected" in self.config:
selected_url = self.config["selected"]
elif "speed_result" in self.config:
selected_url = max(
self.config["speed_result"],
key=self.config["speed_result"].get,
)
elif self.name == "AUTO_MAA更新器":
if selected_url == "GitHub站":
return f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif selected_url == "官方镜像站":
return f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif selected_url in self.config["download_dict"].keys():
return f"{self.config["download_dict"][selected_url]}AUTO_MAA_{version_text(self.main_version)}.zip"
else:
return f"{selected_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
if selected_url == "GitHub站":
return f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
elif selected_url == "官方镜像站":
return f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
elif selected_url in self.config["download_dict"].keys():
print(
f"{self.config["download_dict"][selected_url]}Updater_{version_text(self.updater_version)}.zip"
)
return f"{self.config["download_dict"][selected_url]}Updater_{version_text(self.updater_version)}.zip"
else:
return f"{selected_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
elif self.config["mode"] == "MirrorChyan":
with requests.get(
self.config["url"], allow_redirects=True, stream=True
) as response:
if response.status_code == 200:
return response.url
def test_speed_task1(self) -> None:
@@ -422,7 +421,11 @@ class DownloadManager(QDialog):
# 创建下载子线程
self.download_process_dict[f"part{i}"] = DownloadProcess(
url, start_byte, end_byte, self.download_path.with_suffix(f".part{i}")
url,
start_byte,
end_byte,
self.download_path.with_suffix(f".part{i}"),
1 if self.config["mode"] == "MirrorChyan" else -1,
)
self.downloaded_size_list.append([0, False])
self.download_process_dict[f"part{i}"].progress.connect(
@@ -500,29 +503,25 @@ class DownloadManager(QDialog):
self.zip_extract.start()
self.zip_loop.exec()
self.update_info("正在删除已弃用的文件")
if (app_path / "changes.json").exists():
with (app_path / "changes.json").open(mode="r", encoding="utf-8") as f:
info: Dict[str, List[str]] = json.load(f)
if "deleted" in info:
for file_path in info:
(self.app_path / file_path).unlink()
(app_path / "changes.json").unlink()
self.update_info("正在删除临时文件")
self.update_progress(0, 0, 0)
if self.download_path.exists():
self.download_path.unlink()
# 更新version文件
if not self.isInterruptionRequested and self.name in [
"AUTO_MAA主程序",
"AUTO_MAA更新器",
]:
with open(self.version_path, "r", encoding="utf-8") as f:
version_info = json.load(f)
if self.name == "AUTO_MAA主程序":
version_info["main_version"] = ".".join(map(str, self.main_version))
elif self.name == "AUTO_MAA更新器":
version_info["updater_version"] = ".".join(
map(str, self.updater_version)
)
with open(self.version_path, "w", encoding="utf-8") as f:
json.dump(version_info, f, ensure_ascii=False, indent=4)
# 主程序更新完成后打开对应程序
if not self.isInterruptionRequested and self.name == "AUTO_MAA主程序":
if not self.isInterruptionRequested and self.name == "AUTO_MAA":
subprocess.Popen(
str(self.app_path / "AUTO_MAA.exe"),
shell=True,
@@ -585,14 +584,11 @@ class AUTO_MAA_Downloader(QApplication):
app_path: Path,
name: str,
main_version: list,
updater_version: list,
config: dict,
) -> None:
super().__init__()
self.main = DownloadManager(
app_path, name, main_version, updater_version, config
)
self.main = DownloadManager(app_path, name, main_version, config)
self.main.show()
self.main.run()
@@ -607,45 +603,64 @@ if __name__ == "__main__":
with (app_path / "resources/version.json").open(
mode="r", encoding="utf-8"
) as f:
version_current = json.load(f)
main_version_current = list(
map(int, version_current["main_version"].split("."))
current_version_info = json.load(f)
current_version = list(
map(int, current_version_info["main_version"].split("."))
)
else:
main_version_current = [0, 0, 0, 0]
current_version = [0, 0, 0, 0]
# 从本地配置文件获取更新信息
if (app_path / "config/config.json").exists():
with (app_path / "config/config.json").open(mode="r", encoding="utf-8") as f:
config = json.load(f)
if "Update" in config and "UpdateType" in config["Update"]:
update_type = config["Update"]["UpdateType"]
else:
update_type = "main"
if "Update" in config and "ProxyUrlList" in config["Update"]:
proxy_list = config["Update"]["ProxyUrlList"]
if "Update" in config:
if "UpdateType" in config["Update"]:
update_type = config["Update"]["UpdateType"]
else:
update_type = "stable"
if "ProxyUrlList" in config["Update"]:
proxy_list = config["Update"]["ProxyUrlList"]
else:
proxy_list = []
if "ThreadNumb" in config["Update"]:
thread_numb = config["Update"]["ThreadNumb"]
else:
thread_numb = 8
if "MirrorChyanCDK" in config["Update"]:
mirrorchyan_CDK = (
win32crypt.CryptUnprotectData(
base64.b64decode(config["Update"]["MirrorChyanCDK"]),
None,
None,
None,
0,
)[1].decode("utf-8")
if config["Update"]["MirrorChyanCDK"]
else ""
)
else:
mirrorchyan_CDK = ""
else:
update_type = "stable"
proxy_list = []
if "Update" in config and "ThreadNumb" in config["Update"]:
thread_numb = config["Update"]["ThreadNumb"]
else:
thread_numb = 8
mirrorchyan_CDK = ""
else:
update_type = "main"
update_type = "stable"
proxy_list = []
thread_numb = 8
mirrorchyan_CDK = ""
# 从远程服务器获取最新版本信息
for _ in range(3):
try:
response = requests.get(
f"https://gitee.com/DLmaster_361/AUTO_MAA/raw/{update_type}/resources/version.json"
f"https://mirrorchyan.com/api/resources/AUTO_MAA/latest?current_version={version_text(current_version)}&cdk={mirrorchyan_CDK}&channel={update_type}"
)
version_remote = response.json()
main_version_remote = list(
map(int, version_remote["main_version"].split("."))
)
remote_proxy_list = version_remote["proxy_list"]
version_info: Dict[str, Union[int, str, Dict[str, str]]] = response.json()
break
except Exception as e:
err = e
@@ -653,20 +668,55 @@ if __name__ == "__main__":
else:
sys.exit(f"获取版本信息时出错:\n{err}")
# 合并代理列表
download_config = {
"proxy_list": list(set(proxy_list + remote_proxy_list)),
"download_dict": version_remote["download_dict"],
"thread_numb": thread_numb,
}
if version_info["code"] == 0:
if "url" in version_info["data"]:
download_config = {
"mode": "MirrorChyan",
"thread_numb": 1,
"url": version_info["data"]["url"],
}
else:
download_config = {"mode": "Proxy", "thread_numb": thread_numb}
else:
sys.exit(f"获取版本信息时出错:{version_info["msg"]}")
remote_version = list(
map(
int,
version_info["data"]["version_name"][1:].replace("-beta", "").split("."),
)
)
if download_config["mode"] == "Proxy":
for _ in range(3):
try:
response = requests.get(
"https://gitee.com/DLmaster_361/AUTO_MAA/raw/server/download_info.json"
)
download_info = response.json()
download_config["proxy_list"] = list(
set(proxy_list + download_info["proxy_list"])
)
download_config["download_dict"] = download_info["download_dict"]
break
except Exception as e:
err = e
time.sleep(0.1)
else:
sys.exit(f"获取代理信息时出错:{err}")
if (app_path / "changes.json").exists():
(app_path / "changes.json").unlink()
# 启动更新线程
if main_version_remote > main_version_current:
if remote_version > current_version:
app = AUTO_MAA_Downloader(
app_path,
"AUTO_MAA主程序",
main_version_remote,
[],
"AUTO_MAA",
remote_version,
download_config,
)
sys.exit(app.exec())

View File

@@ -68,9 +68,8 @@ if __name__ == "__main__":
print("Packaging AUTO_MAA main program ...")
os.system(
"powershell -Command python -m nuitka --standalone --onefile --mingw64"
"powershell -Command python -m nuitka --standalone --mingw64"
" --enable-plugins=pyside6 --windows-console-mode=disable"
" --onefile-tempdir-spec='{TEMP}\\AUTO_MAA'"
" --windows-icon-from-ico=resources\\icons\\AUTO_MAA.ico"
" --company-name='AUTO_MAA Team' --product-name=AUTO_MAA"
f" --file-version={version["main_version"]}"
@@ -83,27 +82,69 @@ if __name__ == "__main__":
print("AUTO_MAA main program packaging completed !")
shutil.copy(root_path / "app/utils/downloader.py", root_path)
print("Packaging AUTO_MAA update program ...")
shutil.copy(root_path / "app/utils/downloader.py", root_path)
os.system(
"powershell -Command python -m nuitka --standalone --onefile --mingw64"
"powershell -Command python -m nuitka --standalone --mingw64"
" --enable-plugins=pyside6 --windows-console-mode=disable"
" --onefile-tempdir-spec='{TEMP}\\AUTO_MAA_Updater'"
" --windows-icon-from-ico=resources\\icons\\AUTO_MAA_Updater.ico"
" --company-name='AUTO_MAA Team' --product-name=AUTO_MAA"
f" --file-version={version["updater_version"]}"
f" --product-version={version["main_version"]}"
" --file-description='AUTO_MAA Component'"
" --copyright='Copyright © 2024 DLmaster361'"
" --assume-yes-for-downloads --output-filename=Updater"
" --assume-yes-for-downloads --output-filename=AUTO_Updater"
" --remove-output downloader.py"
)
(root_path / "downloader.py").unlink()
print("AUTO_MAA update program packaging completed !")
(root_path / "downloader.py").unlink()
(root_path / "AUTO_MAA").mkdir(parents=True, exist_ok=True)
print("Start to copy AUTO_MAA main program ...")
for item in (root_path / "main.dist").iterdir():
if item.is_dir():
shutil.copytree(
item, root_path / "AUTO_MAA" / item.name, dirs_exist_ok=True
)
else:
shutil.copy(item, root_path / "AUTO_MAA" / item.name)
shutil.rmtree(root_path / "main.dist")
print("Start to copy AUTO_MAA update program ...")
for item in (root_path / "downloader.dist").iterdir():
if item.is_dir():
shutil.copytree(
item, root_path / "AUTO_MAA" / item.name, dirs_exist_ok=True
)
else:
shutil.copy(item, root_path / "AUTO_MAA" / item.name)
shutil.rmtree(root_path / "downloader.dist")
print("Start to copy rescourses ...")
shutil.copytree(root_path / "app", root_path / "AUTO_MAA/app")
shutil.copytree(root_path / "resources", root_path / "AUTO_MAA/resources")
shutil.copy(root_path / "main.py", root_path / "AUTO_MAA/main.py")
shutil.copy(root_path / "requirements.txt", root_path / "AUTO_MAA/requirements.txt")
shutil.copy(root_path / "README.md", root_path / "AUTO_MAA/README.md")
shutil.copy(root_path / "LICENSE", root_path / "AUTO_MAA/LICENSE")
print("Start to compress ...")
shutil.make_archive(
base_name=root_path / f"AUTO_MAA_{version_text(main_version_numb)}",
format="zip",
root_dir=root_path / "AUTO_MAA",
base_dir=".",
)
shutil.rmtree(root_path / "AUTO_MAA")
print("compress completed !")
all_version_info = {}
for v_i in version["version_info"].values():
@@ -114,6 +155,6 @@ if __name__ == "__main__":
all_version_info[key] = value.copy()
(root_path / "version_info.txt").write_text(
f"{version_text(main_version_numb)}\n{version_text(updater_version_numb)}\n{version_info_markdown(all_version_info)}",
f"{version_text(main_version_numb)}\n{version_text(updater_version_numb)}\n<!--{json.dumps(version["version_info"], ensure_ascii=False)}-->\n{version_info_markdown(all_version_info)}",
encoding="utf-8",
)

View File

@@ -1,14 +0,0 @@
{
"main_version": "4.2.0.0",
"updater_version": "1.1.0.0",
"announcement": "\n# 这是一个中转版本,此版本后更换程序架构方式。\n# 由于更新方法无法通用,您需要在完成本次更新后再次检查更新以获取最新版本。\n",
"proxy_list":[
"",
"https://gitproxy.click/",
"https://cdn.moran233.xyz/",
"https://gh.llkk.cc/",
"https://github.akams.cn/",
"https://www.ghproxy.cn/",
"https://ghfast.top/"
]
}

View File

@@ -11,6 +11,8 @@
"TaskQueue.AutoRoguelike.IsChecked": "False" #自动肉鸽
"TaskQueue.Reclamation.IsChecked": "False" #生息演算
#刷理智
"MainFunction.UseMedicine": "True" #吃理智药
"MainFunction.UseMedicine.Quantity": "999" #吃理智药数量
"MainFunction.Stage1": "" #主关卡
"MainFunction.Stage2": "" #备选关卡1
"MainFunction.Stage3": "" #备选关卡2

View File

@@ -1,8 +1,20 @@
{
"main_version": "4.2.5.8",
"main_version": "4.2.5.9",
"updater_version": "1.2.0.2",
"announcement": "\n## 新增功能\n- 屏蔽MuMu模拟器开屏广告功能上线\n- 更新器支持多线程下载\n- 添加强制关闭ADB与模拟器等增强任务项\n## 修复BUG\n- 修复统计信息HTML模板公招匹配错误\n- 修复密码显示按钮动画异常\n- 修复`检测到MAA未能实际执行任务`报错被异常屏蔽\n- 修复MAA超时判定异常失效\n## 程序优化\n- 关机等电源操作添加100s倒计时\n- 人工排查弹窗方法优化\n- 人工排查时自动屏蔽静默操作\n- 公告样式优化",
"version_info": {
"4.2.5.9":{
"新增功能": [
"添加理智药设置选项 #34",
"预接入`mirrorc`"
],
"程序优化": [
"输入对话框添加回车键确认能力 #35",
"用户列表UI改版升级",
"配置类取消单例限制",
"配置读取方式与界面渲染方法优化"
]
},
"4.2.5.8":{
"修复BUG": [
"对win10主题进一步适配"