diff --git a/app/api/__init__.py b/app/api/__init__.py
index 9a3f295..7b0eb8c 100644
--- a/app/api/__init__.py
+++ b/app/api/__init__.py
@@ -31,6 +31,7 @@ from .queue import router as queue_router
from .dispatch import router as dispatch_router
from .history import router as history_router
from .setting import router as setting_router
+from .update import router as update_router
__all__ = [
"core_router",
@@ -41,4 +42,5 @@ __all__ = [
"dispatch_router",
"history_router",
"setting_router",
+ "update_router",
]
diff --git a/app/api/update.py b/app/api/update.py
new file mode 100644
index 0000000..e6e4754
--- /dev/null
+++ b/app/api/update.py
@@ -0,0 +1,81 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+# Copyright © 2025 MoeSnowyFox
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+
+import asyncio
+from fastapi import APIRouter, Body
+
+from app.core import Config
+from app.services import Updater
+from app.models.schema import *
+
+router = APIRouter(prefix="/api/update", tags=["软件更新"])
+
+
+@router.post(
+ "/check", summary="检查更新", response_model=UpdateCheckOut, status_code=200
+)
+async def check_update(version: UpdateCheckIn = Body(...)) -> UpdateCheckOut:
+
+ try:
+ if_need, latest_version, update_info = await Updater.check_update(
+ current_version=version.current_version
+ )
+ except Exception as e:
+ return UpdateCheckOut(
+ code=500,
+ status="error",
+ message=f"{type(e).__name__}: {str(e)}",
+ if_need_update=False,
+ latest_version="",
+ update_info={},
+ )
+ return UpdateCheckOut(
+ if_need_update=if_need, latest_version=latest_version, update_info=update_info
+ )
+
+
+@router.post("/download", summary="下载更新", response_model=OutBase, status_code=200)
+async def download_update() -> OutBase:
+
+ try:
+ task = asyncio.create_task(Updater.download_update())
+ Config.temp_task.append(task)
+ task.add_done_callback(lambda t: Config.temp_task.remove(t))
+ except Exception as e:
+ return OutBase(
+ code=500, status="error", message=f"{type(e).__name__}: {str(e)}"
+ )
+ return OutBase()
+
+
+@router.post("/install", summary="安装更新", response_model=OutBase, status_code=200)
+async def install_update() -> OutBase:
+
+ try:
+ task = asyncio.create_task(Updater.install_update())
+ Config.temp_task.append(task)
+ task.add_done_callback(lambda t: Config.temp_task.remove(t))
+ except Exception as e:
+ return OutBase(
+ code=500, status="error", message=f"{type(e).__name__}: {str(e)}"
+ )
+ return OutBase()
diff --git a/app/models/schema.py b/app/models/schema.py
index 9aa87f2..b520343 100644
--- a/app/models/schema.py
+++ b/app/models/schema.py
@@ -777,3 +777,13 @@ class SettingGetOut(OutBase):
class SettingUpdateIn(BaseModel):
data: GlobalConfig = Field(..., description="全局设置需要更新的数据")
+
+
+class UpdateCheckIn(BaseModel):
+ current_version: str = Field(..., description="当前前端版本号")
+
+
+class UpdateCheckOut(OutBase):
+ if_need_update: bool = Field(..., description="是否需要更新前端")
+ latest_version: str = Field(..., description="最新前端版本号")
+ update_info: Dict[str, List[str]] = Field(..., description="版本更新信息字典")
diff --git a/app/services/__init__.py b/app/services/__init__.py
index c55fcd6..33c156a 100644
--- a/app/services/__init__.py
+++ b/app/services/__init__.py
@@ -25,5 +25,6 @@ __license__ = "GPL-3.0 license"
from .matomo import Matomo
from .notification import Notify
from .system import System
+from .update import Updater
-__all__ = ["Notify", "System"]
+__all__ = ["Matomo", "Notify", "System", "Updater"]
diff --git a/app/services/update.py b/app/services/update.py
new file mode 100644
index 0000000..e92e092
--- /dev/null
+++ b/app/services/update.py
@@ -0,0 +1,365 @@
+# AUTO_MAA:A MAA Multi Account Management and Automation Tool
+# Copyright © 2024-2025 DLmaster361
+
+# This file is part of AUTO_MAA.
+
+# AUTO_MAA is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or (at your option) any later version.
+
+# AUTO_MAA is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty
+# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
+# the GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with AUTO_MAA. If not, see .
+
+# Contact: DLmaster_361@163.com
+
+
+import re
+import time
+import json
+import zipfile
+import requests
+import subprocess
+from packaging import version
+from typing import List, Dict, Optional
+from pathlib import Path
+
+from app.core import Config
+from app.models.schema import WebSocketMessage
+from app.utils.constants import MIRROR_ERROR_INFO
+from app.utils.logger import get_logger
+
+logger = get_logger("更新服务")
+
+
+class _UpdateHandler:
+
+ def __init__(self) -> None:
+ self.is_locked: bool = False
+ self.remote_version: Optional[str] = None
+ self.mirror_chyan_download_url: Optional[str] = None
+
+ async def check_update(
+ self, current_version: str
+ ) -> tuple[bool, str, Dict[str, List[str]]]:
+
+ logger.info("开始检查更新")
+
+ response = requests.get(
+ f"https://mirrorchyan.com/api/resources/AUTO_MAA/latest?user_agent=AutoMaaGui¤t_version={current_version}&cdk={Config.get('Update', 'MirrorChyanCDK')}&channel={Config.get('Update', 'UpdateType')}",
+ timeout=10,
+ proxies=Config.get_proxies(),
+ )
+ if response.status_code == 200:
+ version_info = response.json()
+ else:
+ result = response.json()
+
+ if result["code"] != 0:
+ if result["code"] in MIRROR_ERROR_INFO:
+ raise Exception(
+ f"获取版本信息时出错: {MIRROR_ERROR_INFO[result['code']]}"
+ )
+ else:
+ raise Exception(
+ "获取版本信息时出错: 意料之外的错误, 请及时联系项目组以获取来自 Mirror 酱的技术支持"
+ )
+
+ logger.success("获取版本信息成功")
+
+ remote_version = version_info["data"]["version_name"]
+ self.remote_version = remote_version
+ if "url" in version_info["data"]:
+ self.mirror_chyan_download_url = version_info["data"]["url"]
+
+ if version.parse(remote_version) > version.parse(current_version):
+
+ # 版本更新信息
+ version_info_json: Dict[str, Dict[str, List[str]]] = json.loads(
+ re.sub(
+ r"^$",
+ r"\1",
+ version_info["data"]["release_note"].splitlines()[0],
+ )
+ )
+
+ update_version_info = {}
+ for v_i in [
+ info
+ for ver, info in version_info_json.items()
+ if version.parse(ver) > version.parse(current_version)
+ ]:
+
+ for key, value in v_i.items():
+ if key not in update_version_info:
+ update_version_info[key] = []
+ update_version_info[key] += value
+
+ return True, remote_version, update_version_info
+
+ else:
+ return False, current_version, {}
+
+ async def download_update(self) -> None:
+
+ if self.is_locked:
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={"Failed": "已有更新任务在进行中, 请勿重复操作"},
+ ).model_dump()
+ )
+ return None
+
+ self.is_locked = True
+
+ if self.remote_version is None:
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={"Failed": "未检测到可用的远程版本, 请先检查更新"},
+ ).model_dump()
+ )
+ self.is_locked = False
+ return None
+
+ if (Path.cwd() / f"UpdatePack_{self.remote_version}.zip").exists():
+ logger.info(
+ f"更新包已存在: {Path.cwd() / f'UpdatePack_{self.remote_version}.zip'}"
+ )
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={
+ "Accomplish": str(
+ Path.cwd() / f"UpdatePack_{self.remote_version}.zip"
+ )
+ },
+ ).model_dump()
+ )
+ self.is_locked = False
+ return None
+
+ if Config.get("Update", "Source") == "GitHub":
+
+ download_url = f"https://github.com/DLmaster361/AUTO_MAA/releases/download/{self.remote_version}/AUTO_MAA_{self.remote_version}.zip"
+
+ elif Config.get("Update", "Source") == "MirrorChyan":
+
+ if self.mirror_chyan_download_url is None:
+ logger.warning("MirrorChyan 未返回下载链接, 使用自建下载站")
+ download_url = f"https://download.auto-mas.top/d/AUTO_MAA/AUTO_MAA_{self.remote_version}.zip"
+
+ else:
+ with requests.get(
+ self.mirror_chyan_download_url,
+ allow_redirects=True,
+ timeout=10,
+ stream=True,
+ proxies=Config.get_proxies(),
+ ) as response:
+ if response.status_code == 200:
+ download_url = response.url
+ elif Config.get("Update", "Source") == "AutoSite":
+ download_url = f"https://download.auto-mas.top/d/AUTO_MAA/AUTO_MAA_{self.remote_version}.zip"
+
+ else:
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={
+ "Failed": f"未知的下载源: {Config.get('Update', 'Source')}, 请检查配置文件"
+ },
+ ).model_dump()
+ )
+ self.is_locked = False
+ return None
+
+ logger.info(f"开始下载: {download_url}")
+
+ check_times = 3
+ while check_times != 0:
+
+ try:
+ # 清理可能存在的临时文件
+ if (Path.cwd() / "download.temp").exists():
+ (Path.cwd() / "download.temp").unlink()
+
+ start_time = time.time()
+
+ response = requests.get(
+ download_url, timeout=10, stream=True, proxies=Config.get_proxies()
+ )
+
+ if response.status_code not in [200, 206]:
+
+ if check_times != -1:
+ check_times -= 1
+
+ logger.warning(
+ f"连接失败: {download_url}, 状态码: {response.status_code}, 剩余重试次数: {check_times}"
+ )
+
+ time.sleep(1)
+ continue
+
+ logger.info(f"连接成功: {download_url}, 状态码: {response.status_code}")
+
+ file_size = int(response.headers.get("content-length", 0))
+ downloaded_size = 0
+ last_download_size = 0
+ speed = 0
+ last_time = time.time()
+ with (Path.cwd() / "download.temp").open(mode="wb") as f:
+
+ for chunk in response.iter_content(chunk_size=8192):
+
+ f.write(chunk)
+ downloaded_size += len(chunk)
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Update",
+ data={
+ "downloaded_size": downloaded_size,
+ "file_size": file_size,
+ "speed": speed,
+ },
+ ).model_dump()
+ )
+
+ # 更新指定线程的下载进度, 每秒更新一次
+ if time.time() - last_time >= 1.0:
+ speed = (
+ (downloaded_size - last_download_size)
+ / (time.time() - last_time)
+ / 1024
+ )
+ last_download_size = downloaded_size
+ last_time = time.time()
+
+ (Path.cwd() / "download.temp").rename(
+ Path.cwd() / f"UpdatePack_{self.remote_version}.zip"
+ )
+
+ logger.success(
+ f"下载完成: {download_url}, 实际下载大小: {downloaded_size} 字节, 耗时: {time.time() - start_time:.2f} 秒, 保存位置: {Path.cwd() / f'UpdatePack_{self.remote_version}.zip'}"
+ )
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={
+ "Accomplish": str(
+ Path.cwd() / f"UpdatePack_{self.remote_version}.zip"
+ )
+ },
+ ).model_dump()
+ )
+ self.is_locked = False
+ break
+
+ except Exception as e:
+
+ if check_times != -1:
+ check_times -= 1
+
+ logger.info(
+ f"下载出错: {download_url}, 错误信息: {e}, 剩余重试次数: {check_times}"
+ )
+ time.sleep(1)
+
+ else:
+
+ if (Path.cwd() / "download.temp").exists():
+ (Path.cwd() / "download.temp").unlink()
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={"Failed": f"下载失败: {download_url}"},
+ ).model_dump()
+ )
+ self.is_locked = False
+
+ async def install_update(self):
+
+ if self.is_locked:
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={"Failed": "已有更新任务在进行中, 请勿重复操作"},
+ ).model_dump()
+ )
+ return None
+
+ logger.info("开始应用更新")
+ self.is_locked = True
+
+ versions = {
+ version.parse(match.group(1)): f.name
+ for f in Path.cwd().glob("UpdatePack_*.zip")
+ if (match := re.match(r"UpdatePack_(.+)\.zip$", f.name))
+ }
+ logger.info(f"检测到的更新包: {versions.values()}")
+
+ if not versions:
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Signal",
+ data={"Failed": "未检测到更新包, 请先下载更新"},
+ ).model_dump()
+ )
+ self.is_locked = False
+ return None
+
+ update_package = Path.cwd() / versions[max(versions)]
+
+ logger.info(f"开始解压: {update_package} 到 {Path.cwd()}")
+
+ try:
+ with zipfile.ZipFile(update_package, "r") as zip_ref:
+ zip_ref.extractall(Path.cwd())
+ except Exception as e:
+ logger.error(f"解压失败, {type(e).__name__}: {e}")
+ await Config.send_json(
+ WebSocketMessage(
+ id="Update",
+ type="Message",
+ data={"Error": f"解压失败, {type(e).__name__}: {e}"},
+ ).model_dump()
+ )
+ self.is_locked = False
+ return None
+
+ logger.success(f"解压完成: {update_package} 到 {Path.cwd()}")
+
+ logger.info("正在删除临时文件与旧更新包文件")
+ if (Path.cwd() / "changes.json").exists():
+ (Path.cwd() / "changes.json").unlink()
+ for f in versions.values():
+ if (Path.cwd() / f).exists():
+ (Path.cwd() / f).unlink()
+
+ logger.info("启动更新程序")
+ self.is_locked = False
+ subprocess.Popen(
+ [Path.cwd() / "AUTO_MAA-Setup.exe"],
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
+ | subprocess.DETACHED_PROCESS
+ | subprocess.CREATE_NO_WINDOW,
+ )
+
+
+Updater = _UpdateHandler()
diff --git a/app/utils/constants.py b/app/utils/constants.py
index a54186e..0be6d3e 100644
--- a/app/utils/constants.py
+++ b/app/utils/constants.py
@@ -255,3 +255,18 @@ RESERVED_NAMES = {
ILLEGAL_CHARS = set('<>:"/\\|?*')
"""文件名非法字符集合"""
+
+MIRROR_ERROR_INFO = {
+ 1001: "获取版本信息的URL参数不正确",
+ 7001: "填入的 CDK 已过期",
+ 7002: "填入的 CDK 错误",
+ 7003: "填入的 CDK 今日下载次数已达上限",
+ 7004: "填入的 CDK 类型和待下载的资源不匹配",
+ 7005: "填入的 CDK 已被封禁",
+ 8001: "对应架构和系统下的资源不存在",
+ 8002: "错误的系统参数",
+ 8003: "错误的架构参数",
+ 8004: "错误的更新通道参数",
+ 1: "未知错误类型",
+}
+"""MirrorChyan错误代码映射表"""
diff --git a/main.py b/main.py
index 7836824..b1cf324 100644
--- a/main.py
+++ b/main.py
@@ -113,6 +113,7 @@ def main():
dispatch_router,
history_router,
setting_router,
+ update_router,
)
app = FastAPI(
@@ -138,6 +139,7 @@ def main():
app.include_router(dispatch_router)
app.include_router(history_router)
app.include_router(setting_router)
+ app.include_router(update_router)
app.mount(
"/api/res/materials",