feat(utils): 更新器初步支持多线程下载

This commit is contained in:
DLmaster
2025-03-13 20:21:25 +08:00
parent 6372ad4e0a
commit 528925b969
11 changed files with 856 additions and 509 deletions

View File

@@ -33,7 +33,7 @@ from .core import AppConfig, QueueConfig, MaaConfig, Task, TaskManager, MainTime
from .models import MaaManager
from .services import Notify, Crypto, System
from .ui import AUTO_MAA
from .utils import Updater
from .utils import DownloadManager
__all__ = [
"AppConfig",
@@ -47,5 +47,5 @@ __all__ = [
"Crypto",
"System",
"AUTO_MAA",
"Updater",
"DownloadManager",
]

View File

@@ -39,12 +39,14 @@ from qfluentwidgets import (
ConfigItem,
OptionsConfigItem,
RangeConfigItem,
ConfigValidator,
FolderValidator,
BoolValidator,
RangeValidator,
OptionsValidator,
qconfig,
)
from urllib.parse import urlparse
from typing import Union, Dict, List, Tuple
@@ -760,6 +762,30 @@ class AppConfig:
self.queue_config.set(self.queue_config.queue_Member_10, "禁用")
class UrlListValidator(ConfigValidator):
"""Url list validator"""
def validate(self, value):
try:
result = urlparse(value)
return all([result.scheme, result.netloc])
except ValueError:
return False
def correct(self, value: List[str]):
urls = []
for url in [_ for _ in value if _ != ""]:
if url[-1] != "/":
urls.append(f"{url}/")
else:
urls.append(url)
return list(set([_ for _ in urls if self.validate(_)]))
class GlobalConfig(QConfig):
"""全局配置"""
@@ -827,6 +853,7 @@ class GlobalConfig(QConfig):
update_UpdateType = OptionsConfigItem(
"Update", "UpdateType", "main", OptionsValidator(["main", "dev"])
)
update_ProxyUrlList = ConfigItem("Update", "ProxyUrlList", [], UrlListValidator())
class QueueConfig(QConfig):

View File

@@ -25,13 +25,14 @@ v4.2
作者DLmaster_361
"""
from PySide6.QtWidgets import QWidget, QHBoxLayout
from PySide6.QtCore import Qt, QTime, QEvent
from PySide6.QtWidgets import QWidget, QWidget, QLabel, QHBoxLayout, QSizePolicy
from PySide6.QtCore import Qt, QTime, QEvent, QSize
from PySide6.QtGui import QIcon, QPixmap, QPainter, QPainterPath
from qfluentwidgets import (
LineEdit,
PasswordLineEdit,
MessageBoxBase,
MessageBox,
SubtitleLabel,
SettingCard,
SpinBox,
@@ -50,9 +51,13 @@ from qfluentwidgets import (
TeachingTip,
TransparentToolButton,
TeachingTipTailPosition,
ExpandSettingCard,
ToolButton,
PushButton,
)
from qfluentwidgets.common.overload import singledispatchmethod
import os
from urllib.parse import urlparse
from typing import Optional, Union, List
from app.services import Crypto
@@ -330,6 +335,142 @@ class TimeEditSettingCard(SettingCard):
self.TimeEdit.setTime(QTime.fromString(value, "HH:mm"))
class UrlItem(QWidget):
"""Url item"""
removed = Signal(QWidget)
def __init__(self, url: str, parent=None):
super().__init__(parent=parent)
self.url = url
self.hBoxLayout = QHBoxLayout(self)
self.folderLabel = QLabel(url, self)
self.removeButton = ToolButton(FluentIcon.CLOSE, self)
self.removeButton.setFixedSize(39, 29)
self.removeButton.setIconSize(QSize(12, 12))
self.setFixedHeight(53)
self.setSizePolicy(QSizePolicy.Ignored, QSizePolicy.Fixed)
self.hBoxLayout.setContentsMargins(48, 0, 60, 0)
self.hBoxLayout.addWidget(self.folderLabel, 0, Qt.AlignLeft)
self.hBoxLayout.addSpacing(16)
self.hBoxLayout.addStretch(1)
self.hBoxLayout.addWidget(self.removeButton, 0, Qt.AlignRight)
self.hBoxLayout.setAlignment(Qt.AlignVCenter)
self.removeButton.clicked.connect(lambda: self.removed.emit(self))
class UrlListSettingCard(ExpandSettingCard):
"""Url list setting card"""
urlChanged = Signal(list)
def __init__(
self,
icon: Union[str, QIcon, FluentIconBase],
configItem: ConfigItem,
title: str,
content: str = None,
parent=None,
):
"""
Parameters
----------
configItem: RangeConfigItem
configuration item operated by the card
title: str
the title of card
content: str
the content of card
parent: QWidget
parent widget
"""
super().__init__(icon, title, content, parent)
self.configItem = configItem
self.addUrlButton = PushButton("添加代理网址", self)
self.urls: List[str] = qconfig.get(configItem).copy()
self.__initWidget()
def __initWidget(self):
self.addWidget(self.addUrlButton)
# initialize layout
self.viewLayout.setSpacing(0)
self.viewLayout.setAlignment(Qt.AlignTop)
self.viewLayout.setContentsMargins(0, 0, 0, 0)
for url in self.urls:
self.__addUrlItem(url)
self.addUrlButton.clicked.connect(self.__showUrlDialog)
def __showUrlDialog(self):
"""show url dialog"""
choice = LineEditMessageBox(
self.window(), "添加代理网址", "请输入代理网址", "明文"
)
if choice.exec() and self.__validate(choice.input.text()):
if choice.input.text()[-1] == "/":
url = choice.input.text()
else:
url = f"{choice.input.text()}/"
if url in self.urls:
return
self.__addUrlItem(url)
self.urls.append(url)
qconfig.set(self.configItem, self.urls)
self.urlChanged.emit(self.urls)
def __addUrlItem(self, url: str):
"""add url item"""
item = UrlItem(url, self.view)
item.removed.connect(self.__showConfirmDialog)
self.viewLayout.addWidget(item)
item.show()
self._adjustViewSize()
def __showConfirmDialog(self, item: UrlItem):
"""show confirm dialog"""
choice = MessageBox(
"确认",
f"确定要删除 {item.url} 代理网址吗?",
self.window(),
)
if choice.exec():
self.__removeFolder(item)
def __removeFolder(self, item: UrlItem):
"""remove folder"""
if item.url not in self.urls:
return
self.urls.remove(item.url)
self.viewLayout.removeWidget(item)
item.deleteLater()
self._adjustViewSize()
self.urlChanged.emit(self.urls)
qconfig.set(self.configItem, self.urls)
def __validate(self, value):
try:
result = urlparse(value)
return all([result.scheme, result.netloc])
except ValueError:
return False
class StatefulItemCard(CardWidget):
def __init__(self, item: list, parent=None):

View File

@@ -61,7 +61,7 @@ import shutil
from app.core import Config, MainInfoBar, TaskManager
from app.services import Crypto
from app.utils import Updater
from app.utils import DownloadManager
from .Widget import (
LineEditMessageBox,
LineEditSettingCard,
@@ -355,8 +355,11 @@ class MemberManager(QWidget):
while len(maa_version) < 4:
maa_version.append(0)
self.downloader = Updater(Path(folder), "MAA", maa_version, [])
self.downloader = DownloadManager(
Path(folder), "MAA", maa_version, [], {}
)
self.downloader.show()
self.downloader.run()
def show_password(self):

View File

@@ -55,8 +55,13 @@ from pathlib import Path
from app.core import Config, MainInfoBar
from app.services import Crypto, System
from app.utils import Updater
from .Widget import LineEditMessageBox, LineEditSettingCard, PasswordLineEditSettingCard
from app.utils import DownloadManager
from .Widget import (
LineEditMessageBox,
LineEditSettingCard,
PasswordLineEditSettingCard,
UrlListSettingCard,
)
class Setting(QWidget):
@@ -335,6 +340,16 @@ class Setting(QWidget):
updater_version_remote = list(
map(int, version_remote["updater_version"].split("."))
)
remote_proxy_list = version_remote["proxy_list"]
Config.global_config.set(
Config.global_config.update_ProxyUrlList,
list(
set(
Config.global_config.get(Config.global_config.update_ProxyUrlList)
+ remote_proxy_list
)
),
)
# 有版本更新
if (main_version_remote > main_version_current) or (
@@ -368,17 +383,23 @@ class Setting(QWidget):
# 更新更新器
if updater_version_remote > updater_version_current:
# 创建更新进程
self.updater = Updater(
self.updater = DownloadManager(
Config.app_path,
"AUTO_MAA更新器",
main_version_remote,
updater_version_remote,
{
"proxy_list": Config.global_config.get(
Config.global_config.update_ProxyUrlList
)
},
)
# 完成更新器的更新后更新主程序
if main_version_remote > main_version_current:
self.updater.update_process.accomplish.connect(self.update_main)
self.updater.download_accomplish.connect(self.update_main)
# 显示更新页面
self.updater.show()
self.updater.run()
# 更新主程序
elif main_version_remote > main_version_current:
@@ -836,6 +857,13 @@ class UpdaterSettingCard(HeaderCardWidget):
content="选择AUTO_MAA的更新类别",
texts=["稳定版", "公测版"],
)
self.card_ProxyUrlList = UrlListSettingCard(
icon=FluentIcon.SETTING,
configItem=Config.global_config.update_ProxyUrlList,
title="代理地址列表",
content="更新器代理地址列表",
parent=self,
)
self.card_CheckUpdate = PushSettingCard(
text="检查更新",
icon=FluentIcon.UPDATE,
@@ -846,6 +874,7 @@ class UpdaterSettingCard(HeaderCardWidget):
Layout = QVBoxLayout()
Layout.addWidget(self.card_IfAutoUpdate)
Layout.addWidget(self.card_UpdateType)
Layout.addWidget(self.card_ProxyUrlList)
Layout.addWidget(self.card_CheckUpdate)
self.viewLayout.addLayout(Layout)

View File

@@ -1,490 +0,0 @@
# <AUTO_MAA:A MAA Multi Account Management and Automation Tool>
# Copyright © <2024> <DLmaster361>
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA更新器
v1.1
作者DLmaster_361
"""
import sys
import json
import zipfile
import requests
import subprocess
import time
from pathlib import Path
from PySide6.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout
from qfluentwidgets import (
ProgressBar,
IndeterminateProgressBar,
BodyLabel,
PushButton,
EditableComboBox,
)
from PySide6.QtGui import QIcon, QCloseEvent
from PySide6.QtCore import QThread, Signal, QEventLoop
def version_text(version_numb: list) -> str:
"""将版本号列表转为可读的文本信息"""
if version_numb[3] == 0:
version = f"v{'.'.join(str(_) for _ in version_numb[0:3])}"
else:
version = (
f"v{'.'.join(str(_) for _ in version_numb[0:3])}-beta.{version_numb[3]}"
)
return version
class UpdateProcess(QThread):
info = Signal(str)
progress = Signal(int, int, int)
question = Signal(dict)
question_response = Signal(str)
accomplish = Signal()
def __init__(
self, app_path: Path, name: str, main_version: list, updater_version: list
) -> None:
super(UpdateProcess, self).__init__()
self.app_path = app_path
self.name = name
self.main_version = main_version
self.updater_version = updater_version
self.download_path = app_path / "DOWNLOAD_TEMP.zip" # 临时下载文件的路径
self.version_path = app_path / "resources/version.json"
self.response = None
self.question_response.connect(self._capture_response)
def run(self) -> None:
# 清理可能存在的临时文件
if self.download_path.exists():
self.download_path.unlink()
self.info.emit("正在获取下载链接")
url_list = self.get_download_url()
url_dict = {}
# 验证下载地址
for i, url in enumerate(url_list):
if self.isInterruptionRequested():
return None
self.progress.emit(0, len(url_list), i)
try:
self.info.emit(f"正在验证下载地址:{url}")
response = requests.get(url, stream=True)
if response.status_code != 200:
self.info.emit(f"连接失败,错误代码 {response.status_code}")
time.sleep(1)
continue
url_dict[url] = response.elapsed.total_seconds()
except requests.RequestException:
self.info.emit(f"请求超时")
time.sleep(1)
download_url = self.push_question(url_dict)
# 获取文件大小
try:
self.info.emit(f"正在连接下载地址:{download_url}")
self.progress.emit(0, 0, 0)
response = requests.get(download_url, stream=True)
if response.status_code != 200:
self.info.emit(f"连接失败,错误代码 {response.status_code}")
return None
file_size = response.headers.get("Content-Length")
except requests.RequestException:
self.info.emit(f"请求超时")
return None
if file_size is None:
file_size = 1
else:
file_size = int(file_size)
try:
# 下载文件
with open(self.download_path, "wb") as f:
downloaded_size = 0
last_download_size = 0
speed = 0
last_time = time.time()
for chunk in response.iter_content(chunk_size=8192):
if self.isInterruptionRequested():
break
# 写入已下载数据
f.write(chunk)
downloaded_size += len(chunk)
# 计算下载速度
if time.time() - last_time >= 1.0:
speed = (
(downloaded_size - last_download_size)
/ (time.time() - last_time)
/ 1024
)
last_download_size = downloaded_size
last_time = time.time()
# 更新下载进度
if speed >= 1024:
self.info.emit(
f"正在下载:{self.name} 已下载:{downloaded_size / 1048576:.2f}/{file_size / 1048576:.2f} MB {downloaded_size / file_size * 100:.2f}% 下载速度:{speed / 1024:.2f} MB/s",
)
else:
self.info.emit(
f"正在下载:{self.name} 已下载:{downloaded_size / 1048576:.2f}/{file_size / 1048576:.2f} MB {downloaded_size / file_size * 100:.2f}% 下载速度:{speed:.2f} KB/s",
)
self.progress.emit(0, 100, int(downloaded_size / file_size * 100))
if self.isInterruptionRequested() and self.download_path.exists():
self.download_path.unlink()
return None
except Exception as e:
e = str(e)
e = "\n".join([e[_ : _ + 75] for _ in range(0, len(e), 75)])
self.info.emit(f"下载{self.name}时出错:\n{e}")
return None
# 解压
try:
while True:
if self.isInterruptionRequested():
self.download_path.unlink()
return None
try:
self.info.emit("正在解压更新文件")
self.progress.emit(0, 0, 0)
with zipfile.ZipFile(self.download_path, "r") as zip_ref:
zip_ref.extractall(self.app_path)
break
except PermissionError:
self.info.emit(f"解压出错:{self.name}正在运行,正在等待其关闭")
time.sleep(1)
self.info.emit("正在删除临时文件")
self.progress.emit(0, 0, 0)
self.download_path.unlink()
self.info.emit(f"{self.name}更新成功!")
self.progress.emit(0, 100, 100)
except Exception as e:
e = str(e)
e = "\n".join([e[_ : _ + 75] for _ in range(0, len(e), 75)])
self.info.emit(f"解压更新时出错:\n{e}")
return None
# 更新version文件
if not self.isInterruptionRequested and self.name in [
"AUTO_MAA主程序",
"AUTO_MAA更新器",
]:
with open(self.version_path, "r", encoding="utf-8") as f:
version_info = json.load(f)
if self.name == "AUTO_MAA主程序":
version_info["main_version"] = ".".join(map(str, self.main_version))
elif self.name == "AUTO_MAA更新器":
version_info["updater_version"] = ".".join(
map(str, self.updater_version)
)
with open(self.version_path, "w", encoding="utf-8") as f:
json.dump(version_info, f, ensure_ascii=False, indent=4)
# 主程序更新完成后打开AUTO_MAA
if not self.isInterruptionRequested and self.name == "AUTO_MAA主程序":
subprocess.Popen(
str(self.app_path / "AUTO_MAA.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
elif not self.isInterruptionRequested and self.name == "MAA":
subprocess.Popen(
str(self.app_path / "MAA.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.accomplish.emit()
def get_download_url(self) -> list:
"""获取下载链接"""
try_num = 3
for i in range(try_num):
try:
response = requests.get(
"https://gitee.com/DLmaster_361/AUTO_MAA/raw/main/resources/version.json"
)
if response.status_code != 200:
self.info.emit(
f"连接失败,错误代码 {response.status_code} ,正在重试({i+1}/{try_num}"
)
time.sleep(0.1)
continue
version_remote = response.json()
PROXY_list = version_remote["proxy_list"]
break
except requests.RequestException:
self.info.emit(f"请求超时,正在重试({i+1}/{try_num}")
time.sleep(0.1)
except KeyError:
self.info.emit(f"未找到远端代理网址项,正在重试({i+1}/{try_num}")
time.sleep(0.1)
else:
self.info.emit("获取远端代理信息失败,将使用默认代理地址")
PROXY_list = [
"",
"https://gitproxy.click/",
"https://cdn.moran233.xyz/",
"https://gh.llkk.cc/",
"https://github.akams.cn/",
"https://www.ghproxy.cn/",
"https://ghfast.top/",
]
time.sleep(1)
url_list = []
if self.name == "AUTO_MAA主程序":
url_list.append(
f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
)
url_list.append(
f"https://jp-download.fearr.xyz/AUTO_MAA/AUTO_MAA_{version_text(self.main_version)}.zip"
)
for i in range(len(PROXY_list)):
url_list.append(
f"{PROXY_list[i]}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
)
elif self.name == "AUTO_MAA更新器":
url_list.append(
f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
)
url_list.append(
f"https://jp-download.fearr.xyz/AUTO_MAA/Updater_{version_text(self.updater_version)}.zip"
)
for i in range(len(PROXY_list)):
url_list.append(
f"{PROXY_list[i]}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
)
elif self.name == "MAA":
url_list.append(
f"https://jp-download.fearr.xyz/MAA/MAA-{version_text(self.main_version)}-win-x64.zip"
)
for i in range(len(PROXY_list)):
url_list.append(
f"{PROXY_list[i]}https://github.com/MaaAssistantArknights/MaaAssistantArknights/releases/download/{version_text(self.main_version)}/MAA-{version_text(self.main_version)}-win-x64.zip"
)
return url_list
def push_question(self, url_dict: dict) -> str:
self.question.emit(url_dict)
loop = QEventLoop()
self.question_response.connect(loop.quit)
loop.exec()
return self.response
def _capture_response(self, response: str) -> None:
self.response = response
class Updater(QDialog):
def __init__(
self, app_path: Path, name: str, main_version: list, updater_version: list
) -> None:
super().__init__()
self.setWindowTitle("AUTO_MAA更新器")
self.setWindowIcon(
QIcon(
str(
Path(sys.argv[0]).resolve().parent
/ "resources/icons/AUTO_MAA_Updater.ico"
)
)
)
# 创建垂直布局
self.Layout = QVBoxLayout(self)
self.info = BodyLabel("正在初始化", self)
self.progress_1 = IndeterminateProgressBar(self)
self.progress_2 = ProgressBar(self)
self.combo_box = EditableComboBox(self)
self.button = PushButton("继续", self)
self.h_layout = QHBoxLayout()
self.h_layout.addStretch(1)
self.h_layout.addWidget(self.button)
self.update_progress(0, 0, 0)
self.Layout.addWidget(self.info)
self.Layout.addStretch(1)
self.Layout.addWidget(self.progress_1)
self.Layout.addWidget(self.progress_2)
self.Layout.addWidget(self.combo_box)
self.Layout.addLayout(self.h_layout)
self.Layout.addStretch(1)
self.update_process = UpdateProcess(
app_path, name, main_version, updater_version
)
self.update_process.info.connect(self.update_info)
self.update_process.progress.connect(self.update_progress)
self.update_process.question.connect(self.question)
self.update_process.start()
def update_info(self, text: str) -> None:
self.info.setText(text)
def update_progress(
self, begin: int, end: int, current: int, if_show_combo_box: bool = False
) -> None:
self.combo_box.setVisible(if_show_combo_box)
self.button.setVisible(if_show_combo_box)
if if_show_combo_box:
self.progress_1.setVisible(False)
self.progress_2.setVisible(False)
self.resize(1000, 90)
elif begin == 0 and end == 0:
self.progress_2.setVisible(False)
self.progress_1.setVisible(True)
self.resize(700, 70)
else:
self.progress_1.setVisible(False)
self.progress_2.setVisible(True)
self.progress_2.setRange(begin, end)
self.progress_2.setValue(current)
self.resize(700, 70)
def question(self, url_dict: dict) -> None:
self.update_info("测速完成,请选择或自行输入一个合适下载地址:")
self.update_progress(0, 0, 0, True)
url_dict = dict(sorted(url_dict.items(), key=lambda item: item[1]))
for url, time in url_dict.items():
self.combo_box.addItem(f"{url} | 响应时间:{time:.3f}")
self.button.clicked.connect(
lambda: self.update_process.question_response.emit(
self.combo_box.currentText().split(" | ")[0]
)
)
def closeEvent(self, event: QCloseEvent):
"""清理残余进程"""
self.update_process.requestInterruption()
self.update_process.quit()
self.update_process.wait()
event.accept()
class AUTO_MAA_Updater(QApplication):
def __init__(
self, app_path: Path, name: str, main_version: list, updater_version: list
) -> None:
super().__init__()
self.main = Updater(app_path, name, main_version, updater_version)
self.main.show()
if __name__ == "__main__":
# 获取软件自身的路径
app_path = Path(sys.argv[0]).resolve().parent
# 从本地版本信息文件获取当前版本信息
if (app_path / "resources/version.json").exists():
with (app_path / "resources/version.json").open(
mode="r", encoding="utf-8"
) as f:
version_current = json.load(f)
main_version_current = list(
map(int, version_current["main_version"].split("."))
)
else:
main_version_current = [0, 0, 0, 0]
# 从本地配置文件获取更新类型
if (app_path / "config/config.json").exists():
with (app_path / "config/config.json").open(mode="r", encoding="utf-8") as f:
config = json.load(f)
if "Update" in config and "UpdateType" in config["Update"]:
update_type = config["Update"]["UpdateType"]
else:
update_type = "main"
else:
update_type = "main"
# 从远程服务器获取最新版本信息
for _ in range(3):
try:
response = requests.get(
f"https://gitee.com/DLmaster_361/AUTO_MAA/raw/{update_type}/resources/version.json"
)
version_remote = response.json()
main_version_remote = list(
map(int, version_remote["main_version"].split("."))
)
break
except Exception as e:
err = e
time.sleep(0.1)
else:
sys.exit(f"获取版本信息时出错:\n{err}")
# 启动更新线程
if main_version_remote > main_version_current:
app = AUTO_MAA_Updater(
app_path,
"AUTO_MAA主程序",
main_version_remote,
[],
)
sys.exit(app.exec())

View File

@@ -29,6 +29,6 @@ __version__ = "4.2.0"
__author__ = "DLmaster361 <DLmaster_361@163.com>"
__license__ = "GPL-3.0 license"
from .Updater import Updater
from .downloader import DownloadManager
__all__ = ["Updater"]
__all__ = ["DownloadManager"]

637
app/utils/downloader.py Normal file
View File

@@ -0,0 +1,637 @@
# <AUTO_MAA:A MAA Multi Account Management and Automation Tool>
# Copyright © <2024> <DLmaster361>
# This file is part of AUTO_MAA.
# AUTO_MAA is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
# AUTO_MAA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
# DLmaster_361@163.com
"""
AUTO_MAA
AUTO_MAA更新器
v1.2
作者DLmaster_361
"""
import sys
import json
import zipfile
import requests
import subprocess
import time
from functools import partial
from pathlib import Path
from PySide6.QtWidgets import QApplication, QDialog, QVBoxLayout
from qfluentwidgets import (
ProgressBar,
IndeterminateProgressBar,
BodyLabel,
setTheme,
Theme,
)
from PySide6.QtGui import QIcon, QCloseEvent
from PySide6.QtCore import QThread, Signal, QTimer, QEventLoop
from typing import List, Dict, Union
def version_text(version_numb: list) -> str:
"""将版本号列表转为可读的文本信息"""
if version_numb[3] == 0:
version = f"v{'.'.join(str(_) for _ in version_numb[0:3])}"
else:
version = (
f"v{'.'.join(str(_) for _ in version_numb[0:3])}-beta.{version_numb[3]}"
)
return version
class DownloadProcess(QThread):
"""分段下载子线程"""
progress = Signal(int)
accomplish = Signal(float)
def __init__(
self,
url: str,
start_byte: int,
end_byte: int,
download_path: Path,
check_times: int = -1,
) -> None:
super(DownloadProcess, self).__init__()
self.url = url
self.start_byte = start_byte
self.end_byte = end_byte
self.download_path = download_path
self.check_times = check_times
def run(self) -> None:
# 清理可能存在的临时文件
if self.download_path.exists():
self.download_path.unlink()
headers = {"Range": f"bytes={self.start_byte}-{self.end_byte}"}
while not self.isInterruptionRequested() and self.check_times != 0:
try:
start_time = time.time()
response = requests.get(
self.url, headers=headers, timeout=10, stream=True
)
print(self.download_path.suffix, response.status_code)
if response.status_code != 206:
if self.check_times != -1:
self.check_times -= 1
time.sleep(1)
continue
downloaded_size = 0
with self.download_path.open(mode="wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if self.isInterruptionRequested():
print(
f"{self.download_path.suffix}: isInterruptionRequested"
)
break
f.write(chunk)
downloaded_size += len(chunk)
self.progress.emit(downloaded_size)
if self.isInterruptionRequested():
if self.download_path.exists():
self.download_path.unlink()
self.accomplish.emit(0)
else:
self.accomplish.emit(time.time() - start_time)
break
except Exception as e:
if self.check_times != -1:
self.check_times -= 1
time.sleep(1)
else:
print(
f"{self.download_path.suffix}: isInterruptionRequested---{self.check_times}/{self.isInterruptionRequested()}"
)
if self.download_path.exists():
self.download_path.unlink()
self.accomplish.emit(0)
class DownloadManager(QDialog):
"""下载管理器"""
speed_test_accomplish = Signal()
download_accomplish = Signal()
download_process_clear = Signal()
isInterruptionRequested = False
def __init__(
self,
app_path: Path,
name: str,
main_version: list,
updater_version: list,
config: dict,
) -> None:
super().__init__()
self.app_path = app_path
self.name = name
self.main_version = main_version
self.updater_version = updater_version
self.config = config
self.download_path = app_path / "DOWNLOAD_TEMP.zip" # 临时下载文件的路径
self.version_path = app_path / "resources/version.json"
self.download_process_dict: Dict[str, DownloadProcess] = {}
self.timer_dict: Dict[str, QTimer] = {}
self.setWindowTitle("AUTO_MAA更新器")
self.setWindowIcon(
QIcon(str(app_path / "resources/icons/AUTO_MAA_Updater.ico"))
)
self.setFixedSize(700, 70)
setTheme(Theme.AUTO, lazy=True)
# 创建垂直布局
self.Layout = QVBoxLayout(self)
self.info = BodyLabel("正在初始化", self)
self.progress_1 = IndeterminateProgressBar(self)
self.progress_2 = ProgressBar(self)
self.update_progress(0, 0, 0)
self.Layout.addWidget(self.info)
self.Layout.addStretch(1)
self.Layout.addWidget(self.progress_1)
self.Layout.addWidget(self.progress_2)
self.Layout.addStretch(1)
def run(self) -> None:
if self.name == "MAA":
self.download_task1()
else:
self.test_speed_task1()
self.speed_test_accomplish.connect(self.download_task1)
def get_download_url(self, mode: str) -> Union[str, Dict[str, str]]:
"""获取下载链接"""
url_dict = {}
if mode == "测速":
url_dict["GitHub站"] = (
f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
)
url_dict["官方镜像站"] = (
f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
)
url_dict["官方下载站"] = (
f"https://jp-download.fearr.xyz/AUTO_MAA/AUTO_MAA_{version_text(self.main_version)}.zip"
)
for proxy_url in self.config["proxy_list"]:
url_dict[proxy_url] = (
f"{proxy_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
)
return url_dict
elif mode == "下载":
if self.name == "MAA":
return f"https://jp-download.fearr.xyz/MAA/MAA-{version_text(self.main_version)}-win-x64.zip"
if "selected" in self.config:
proxy_url = self.config["selected"]
elif "speed_result" in self.config:
proxy_url = max(
self.config["speed_result"], key=self.config["speed_result"].get
)
if self.name == "AUTO_MAA主程序":
if proxy_url == "GitHub站":
return f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif proxy_url == "官方镜像站":
return f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif proxy_url == "官方下载站":
return f"https://jp-download.fearr.xyz/AUTO_MAA/AUTO_MAA_{version_text(self.main_version)}.zip"
else:
return f"{proxy_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/AUTO_MAA_{version_text(self.main_version)}.zip"
elif self.name == "AUTO_MAA更新器":
if proxy_url == "GitHub站":
return f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
elif proxy_url == "官方镜像站":
return f"https://gitee.com/DLmaster_361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
elif proxy_url == "官方下载站":
return f"https://jp-download.fearr.xyz/AUTO_MAA/Updater_{version_text(self.updater_version)}.zip"
else:
return f"{proxy_url}https://github.com/DLmaster361/AUTO_MAA/releases/download/{version_text(self.main_version)}/Updater_{version_text(self.updater_version)}.zip"
def test_speed_task1(self) -> None:
if self.isInterruptionRequested:
return None
url_dict = self.get_download_url("测速")
self.test_speed_result: Dict[str, float] = {}
for name, url in url_dict.items():
if self.isInterruptionRequested:
break
# 创建测速线程下载4MB文件以测试下载速度
self.download_process_dict[name] = DownloadProcess(
url,
0,
4194304,
self.app_path / f"{name.replace('/','').replace(':','')}.zip",
10,
)
self.test_speed_result[name] = -1
self.download_process_dict[name].accomplish.connect(
partial(self.test_speed_task2, name)
)
self.download_process_dict[name].start()
timer = QTimer(self)
timer.setSingleShot(True)
timer.timeout.connect(partial(self.kill_speed_test, name))
timer.start(30000)
self.timer_dict[name] = timer
self.update_info("正在测速预计用时30秒")
self.update_progress(0, 1, 0)
def kill_speed_test(self, name: str) -> None:
print(f"time: {name}")
if name in self.download_process_dict:
self.download_process_dict[name].requestInterruption()
print(f"kill_speed_test: {name}")
def test_speed_task2(self, name: str, t: float) -> None:
# 计算下载速度
if self.isInterruptionRequested:
self.update_info(f"已中止测速进程:{name}")
self.test_speed_result[name] = 0
elif t != 0:
self.update_info(f"{name}{ 4 / t:.2f} MB/s")
self.test_speed_result[name] = 4 / t
else:
self.update_info(f"{name}{ 0:.2f} MB/s")
self.test_speed_result[name] = 0
self.update_progress(
0,
len(self.test_speed_result),
sum(1 for speed in self.test_speed_result.values() if speed != -1),
)
# 删除临时文件
if (self.app_path / f"{name.replace('/','').replace(':','')}.zip").exists():
(self.app_path / f"{name.replace('/','').replace(':','')}.zip").unlink()
# 清理下载线程
self.timer_dict[name].stop()
self.timer_dict[name].deleteLater()
self.timer_dict.pop(name)
self.download_process_dict[name].requestInterruption()
self.download_process_dict[name].quit()
self.download_process_dict[name].wait()
self.download_process_dict[name].deleteLater()
self.download_process_dict.pop(name)
if not self.download_process_dict:
self.download_process_clear.emit()
print(self.download_process_dict.keys())
if any(speed == -1 for _, speed in self.test_speed_result.items()):
return None
# 保存测速结果
self.config["speed_result"] = self.test_speed_result
self.update_info("测速完成!")
self.speed_test_accomplish.emit()
def download_task1(self) -> None:
if self.isInterruptionRequested:
return None
url = self.get_download_url("下载")
self.downloaded_size_list: List[List[int, bool]] = []
response = requests.head(url)
self.file_size = int(response.headers.get("content-length", 0))
part_size = self.file_size // 8
self.downloaded_size = 0
self.last_download_size = 0
self.last_time = time.time()
self.speed = 0
# 拆分下载任务,启用多线程下载
for i in range(8):
if self.isInterruptionRequested:
break
# 计算单任务下载范围
start_byte = i * part_size
end_byte = (i + 1) * part_size - 1 if i != 7 else self.file_size - 1
# 创建下载子线程
self.download_process_dict[f"part{i}"] = DownloadProcess(
url, start_byte, end_byte, self.download_path.with_suffix(f".part{i}")
)
self.downloaded_size_list.append([0, False])
self.download_process_dict[f"part{i}"].progress.connect(
partial(self.download_task2, i)
)
self.download_process_dict[f"part{i}"].accomplish.connect(
partial(self.download_task3, i)
)
self.download_process_dict[f"part{i}"].start()
def download_task2(self, index: str, current: int) -> None:
"""更新下载进度"""
self.downloaded_size_list[index][0] = current
self.downloaded_size = sum([_[0] for _ in self.downloaded_size_list])
self.update_progress(0, self.file_size, self.downloaded_size)
if time.time() - self.last_time >= 1.0:
self.speed = (
(self.downloaded_size - self.last_download_size)
/ (time.time() - self.last_time)
/ 1024
)
self.last_download_size = self.downloaded_size
self.last_time = time.time()
if self.speed >= 1024:
self.update_info(
f"正在下载:{self.name} 已下载:{self.downloaded_size / 1048576:.2f}/{self.file_size / 1048576:.2f} MB {self.downloaded_size / self.file_size * 100:.2f}% 下载速度:{self.speed / 1024:.2f} MB/s",
)
else:
self.update_info(
f"正在下载:{self.name} 已下载:{self.downloaded_size / 1048576:.2f}/{self.file_size / 1048576:.2f} MB {self.downloaded_size / self.file_size * 100:.2f}% 下载速度:{self.speed:.2f} KB/s",
)
def download_task3(self, index: str, t: float) -> None:
# 标记下载线程完成
self.downloaded_size_list[index][1] = True
# 清理下载线程
self.download_process_dict[f"part{index}"].requestInterruption()
self.download_process_dict[f"part{index}"].quit()
self.download_process_dict[f"part{index}"].wait()
self.download_process_dict[f"part{index}"].deleteLater()
self.download_process_dict.pop(f"part{index}")
if not self.download_process_dict:
self.download_process_clear.emit()
if (
any([not _[1] for _ in self.downloaded_size_list])
or self.isInterruptionRequested
):
return None
# 合并下载的分段文件
with self.download_path.open(mode="wb") as outfile:
for i in range(8):
with self.download_path.with_suffix(f".part{i}").open(
mode="rb"
) as infile:
outfile.write(infile.read())
self.download_path.with_suffix(f".part{i}").unlink()
# # 解压
try:
while True:
if self.isInterruptionRequested:
self.download_path.unlink()
return None
try:
self.update_info("正在解压更新文件")
self.update_progress(0, 0, 0)
with zipfile.ZipFile(self.download_path, "r") as zip_ref:
zip_ref.extractall(self.app_path)
break
except PermissionError:
self.info.emit(f"解压出错:{self.name}正在运行,正在等待其关闭")
time.sleep(1)
self.update_info("正在删除临时文件")
self.update_progress(0, 0, 0)
self.download_path.unlink()
self.update_info(f"{self.name}更新成功!")
self.update_progress(0, 100, 100)
except Exception as e:
e = str(e)
e = "\n".join([e[_ : _ + 75] for _ in range(0, len(e), 75)])
self.update_info(f"解压更新时出错:\n{e}")
return None
# 更新version文件
if not self.isInterruptionRequested and self.name in [
"AUTO_MAA主程序",
"AUTO_MAA更新器",
]:
with open(self.version_path, "r", encoding="utf-8") as f:
version_info = json.load(f)
if self.name == "AUTO_MAA主程序":
version_info["main_version"] = ".".join(map(str, self.main_version))
elif self.name == "AUTO_MAA更新器":
version_info["updater_version"] = ".".join(
map(str, self.updater_version)
)
with open(self.version_path, "w", encoding="utf-8") as f:
json.dump(version_info, f, ensure_ascii=False, indent=4)
# 主程序更新完成后打开对应程序
if not self.isInterruptionRequested and self.name == "AUTO_MAA主程序":
subprocess.Popen(
str(self.app_path / "AUTO_MAA.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
print(str(self.app_path / "AUTO_MAA.exe"))
elif not self.isInterruptionRequested and self.name == "MAA":
subprocess.Popen(
str(self.app_path / "MAA.exe"),
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
self.download_accomplish.emit()
def update_info(self, text: str) -> None:
self.info.setText(text)
def update_progress(self, begin: int, end: int, current: int) -> None:
if begin == 0 and end == 0:
self.progress_2.setVisible(False)
self.progress_1.setVisible(True)
else:
self.progress_1.setVisible(False)
self.progress_2.setVisible(True)
self.progress_2.setRange(begin, end)
self.progress_2.setValue(current)
def requestInterruption(self) -> None:
self.isInterruptionRequested = True
print(self.download_process_dict)
for process in self.download_process_dict.values():
process.requestInterruption()
if self.download_process_dict:
loop = QEventLoop()
self.download_process_clear.connect(loop.quit)
loop.exec()
def closeEvent(self, event: QCloseEvent):
"""清理残余进程"""
self.requestInterruption()
print(self.download_process_dict)
event.accept()
class AUTO_MAA_Downloader(QApplication):
def __init__(
self,
app_path: Path,
name: str,
main_version: list,
updater_version: list,
config: dict,
) -> None:
super().__init__()
self.main = DownloadManager(
app_path, name, main_version, updater_version, config
)
self.main.show()
self.main.run()
if __name__ == "__main__":
# 获取软件自身的路径
app_path = Path(sys.argv[0]).resolve().parent
# 从本地版本信息文件获取当前版本信息
if (app_path / "resources/version.json").exists():
with (app_path / "resources/version.json").open(
mode="r", encoding="utf-8"
) as f:
version_current = json.load(f)
main_version_current = list(
map(int, version_current["main_version"].split("."))
)
else:
main_version_current = [0, 0, 0, 0]
# 从本地配置文件获取更新类型
if (app_path / "config/config.json").exists():
with (app_path / "config/config.json").open(mode="r", encoding="utf-8") as f:
config = json.load(f)
if "Update" in config and "UpdateType" in config["Update"]:
update_type = config["Update"]["UpdateType"]
else:
update_type = "main"
if "Update" in config and "ProxyUrlList" in config["Update"]:
proxy_list = config["Update"]["ProxyUrlList"]
else:
proxy_list = []
else:
update_type = "main"
proxy_list = []
# 从远程服务器获取最新版本信息
for _ in range(3):
try:
response = requests.get(
f"https://gitee.com/DLmaster_361/AUTO_MAA/raw/{update_type}/resources/version.json"
)
version_remote = response.json()
main_version_remote = list(
map(int, version_remote["main_version"].split("."))
)
remote_proxy_list = version_remote["proxy_list"]
break
except Exception as e:
err = e
time.sleep(0.1)
else:
sys.exit(f"获取版本信息时出错:\n{err}")
# 合并代理列表
download_config = {"proxy_list": list(set(proxy_list + remote_proxy_list))}
# 启动更新线程
if main_version_remote > main_version_current:
app = AUTO_MAA_Downloader(
app_path,
"AUTO_MAA主程序",
main_version_remote,
[],
download_config,
)
sys.exit(app.exec())

View File

@@ -72,11 +72,11 @@ if __name__ == "__main__":
print("AUTO_MAA main program packaging completed !")
shutil.copy(root_path / "app/utils/Updater.py", root_path)
shutil.copy(root_path / "app/utils/downloader.py", root_path)
file_content = (root_path / "Updater.py").read_text(encoding="utf-8")
file_content = (root_path / "downloader.py").read_text(encoding="utf-8")
(root_path / "Updater.py").write_text(
(root_path / "downloader.py").write_text(
file_content.replace(
"from .version import version_text", "from app import version_text"
),
@@ -96,12 +96,12 @@ if __name__ == "__main__":
" --file-description='AUTO_MAA Component'"
" --copyright='Copyright © 2024 DLmaster361'"
" --assume-yes-for-downloads --output-filename=Updater"
" --remove-output Updater.py"
" --remove-output downloader.py"
)
print("AUTO_MAA update program packaging completed !")
(root_path / "Updater.py").unlink()
(root_path / "downloader.py").unlink()
(root_path / "version_info.txt").write_text(
f"{version_text(main_version_numb)}\n{version_text(updater_version_numb)}{version["announcement"]}",

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.8 MiB

View File

@@ -1,7 +1,7 @@
{
"main_version": "4.2.5.2",
"updater_version": "1.1.2.1",
"announcement": "\n## 新增功能\n- 屏蔽MuMu模拟器开屏广告功能上线\n## 修复BUG\n- 修复统计信息HTML模板公招匹配错误\n## 程序优化\n- 暂无",
"updater_version": "1.1.2.2",
"announcement": "\n## 新增功能\n- 屏蔽MuMu模拟器开屏广告功能上线\n- 更新器支持多线程下载\n## 修复BUG\n- 修复统计信息HTML模板公招匹配错误\n## 程序优化\n- 暂无",
"proxy_list": [
"",
"https://gitproxy.click/",