# # Copyright © <2024> # This file is part of AUTO_MAA. # AUTO_MAA is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, # or (at your option) any later version. # AUTO_MAA is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty # of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with AUTO_MAA. If not, see . # DLmaster_361@163.com """ AUTO_MAA AUTO_MAA安全服务 v4.2 作者:DLmaster_361 """ from loguru import logger import sqlite3 import hashlib import random import secrets import base64 import win32crypt from pathlib import Path from Crypto.Cipher import AES from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_OAEP from Crypto.Util.Padding import pad, unpad from typing import List, Dict, Union from app.core import Config class CryptoHandler: def get_PASSWORD(self, PASSWORD: str) -> None: """配置管理密钥""" # 生成目录 Config.key_path.mkdir(parents=True, exist_ok=True) # 生成RSA密钥对 key = RSA.generate(2048) public_key_local = key.publickey() private_key = key # 保存RSA公钥 (Config.app_path / "data/key/public_key.pem").write_bytes( public_key_local.exportKey() ) # 生成密钥转换与校验随机盐 PASSWORD_salt = secrets.token_hex(random.randint(32, 1024)) (Config.app_path / "data/key/PASSWORDsalt.txt").write_text( PASSWORD_salt, encoding="utf-8", ) verify_salt = secrets.token_hex(random.randint(32, 1024)) (Config.app_path / "data/key/verifysalt.txt").write_text( verify_salt, encoding="utf-8", ) # 将管理密钥转化为AES-256密钥 AES_password = hashlib.sha256( (PASSWORD + PASSWORD_salt).encode("utf-8") ).digest() # 生成AES-256密钥校验哈希值并保存 AES_password_verify = hashlib.sha256( AES_password + verify_salt.encode("utf-8") ).digest() (Config.app_path / "data/key/AES_password_verify.bin").write_bytes( AES_password_verify ) # AES-256加密RSA私钥并保存密文 AES_key = AES.new(AES_password, AES.MODE_ECB) private_key_local = AES_key.encrypt(pad(private_key.exportKey(), 32)) (Config.app_path / "data/key/private_key.bin").write_bytes(private_key_local) def AUTO_encryptor(self, note: str) -> bytes: """使用AUTO_MAA的算法加密数据""" # 读取RSA公钥 public_key_local = RSA.import_key( (Config.app_path / "data/key/public_key.pem").read_bytes() ) # 使用RSA公钥对数据进行加密 cipher = PKCS1_OAEP.new(public_key_local) encrypted = cipher.encrypt(note.encode("utf-8")) return encrypted def AUTO_decryptor(self, note: bytes, PASSWORD: str) -> str: """使用AUTO_MAA的算法解密数据""" # 读入RSA私钥密文、盐与校验哈希值 private_key_local = ( (Config.app_path / "data/key/private_key.bin").read_bytes().strip() ) PASSWORD_salt = ( (Config.app_path / "data/key/PASSWORDsalt.txt") .read_text(encoding="utf-8") .strip() ) verify_salt = ( (Config.app_path / "data/key/verifysalt.txt") .read_text(encoding="utf-8") .strip() ) AES_password_verify = ( (Config.app_path / "data/key/AES_password_verify.bin").read_bytes().strip() ) # 将管理密钥转化为AES-256密钥并验证 AES_password = hashlib.sha256( (PASSWORD + PASSWORD_salt).encode("utf-8") ).digest() AES_password_SHA = hashlib.sha256( AES_password + verify_salt.encode("utf-8") ).digest() if AES_password_SHA != AES_password_verify: return "管理密钥错误" else: # AES解密RSA私钥 AES_key = AES.new(AES_password, AES.MODE_ECB) private_key_pem = unpad(AES_key.decrypt(private_key_local), 32) private_key = RSA.import_key(private_key_pem) # 使用RSA私钥解密数据 decrypter = PKCS1_OAEP.new(private_key) note = decrypter.decrypt(note) return note.decode("utf-8") def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None: """修改管理密钥""" member_list = self.search_member() for user_data in member_list: # 读取用户数据 db = sqlite3.connect(user_data["Path"]) cur = db.cursor() cur.execute("SELECT * FROM adminx WHERE True") data = cur.fetchall() # 使用旧管理密钥解密 user_data["Password"] = [] for i in range(len(data)): user_data["Password"].append( self.AUTO_decryptor(data[i][12], PASSWORD_old) ) cur.close() db.close() self.get_PASSWORD(PASSWORD_new) for user_data in member_list: # 读取用户数据 db = sqlite3.connect(user_data["Path"]) cur = db.cursor() cur.execute("SELECT * FROM adminx WHERE True") data = cur.fetchall() # 使用新管理密钥重新加密 for i in range(len(data)): cur.execute( "UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?", ( self.AUTO_encryptor(user_data["Password"][i]), data[i][15], data[i][16], ), ) db.commit() user_data["Password"][i] = None del user_data["Password"] cur.close() db.close() def win_encryptor( self, note: str, description: str = None, entropy: bytes = None ) -> str: """使用Windows DPAPI加密数据""" encrypted = win32crypt.CryptProtectData( note.encode("utf-8"), description, entropy, None, None, 0 ) return base64.b64encode(encrypted).decode("utf-8") def win_decryptor(self, note: str, entropy: bytes = None) -> str: """使用Windows DPAPI解密数据""" if note == "": return "" decrypted = win32crypt.CryptUnprotectData( base64.b64decode(note), entropy, None, None, 0 ) return decrypted[1].decode("utf-8") def search_member(self) -> List[Dict[str, Union[Path, list]]]: """搜索所有脚本实例及其用户数据库路径""" member_list = [] if (Config.app_path / "config/MaaConfig").exists(): for subdir in (Config.app_path / "config/MaaConfig").iterdir(): if subdir.is_dir(): member_list.append({"Path": subdir / "user_data.db"}) return member_list def check_PASSWORD(self, PASSWORD: str) -> bool: """验证管理密钥""" return bool( self.AUTO_decryptor(self.AUTO_encryptor(""), PASSWORD) != "管理密钥错误" ) Crypto = CryptoHandler()