fix(services): 修复管理密钥修改逻辑

This commit is contained in:
DLmaster
2025-01-28 17:28:51 +08:00
parent 29536003a4
commit e78f3973be
4 changed files with 107 additions and 117 deletions

View File

@@ -25,13 +25,17 @@ v4.2
作者DLmaster_361
"""
from loguru import logger
import sqlite3
import hashlib
import random
import secrets
from pathlib import Path
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Util.Padding import pad, unpad
from typing import List, Dict, Union
from app.core import Config
@@ -130,27 +134,65 @@ class CryptoHandler:
note = decrypter.decrypt(note)
return note.decode("utf-8")
def change_PASSWORD(self, data: list, PASSWORD_old: str, PASSWORD_new: str) -> None:
def change_PASSWORD(self, PASSWORD_old: str, PASSWORD_new: str) -> None:
"""修改管理密钥"""
# 使用旧管理密钥解密
new_data = []
for i in range(len(data)):
new_data.append(self.decryptx(data[i][12], PASSWORD_old))
# 使用新管理密钥重新加密
member_list = self.search_member()
for user_data in member_list:
# 读取用户数据
db = sqlite3.connect(user_data["Path"])
cur = db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
# 使用旧管理密钥解密
user_data["Password"] = []
for i in range(len(data)):
user_data["Password"].append(self.decryptx(data[i][12], PASSWORD_old))
cur.close()
db.close()
self.get_PASSWORD(PASSWORD_new)
for i in range(len(data)):
Config.cur.execute(
"UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?",
(
self.encryptx(new_data[i]),
data[i][15],
data[i][16],
),
)
Config.db.commit(),
new_data[i] = None
del new_data
for user_data in member_list:
# 读取用户数据
db = sqlite3.connect(user_data["Path"])
cur = db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
data = cur.fetchall()
# 使用新管理密钥重新加密
for i in range(len(data)):
cur.execute(
"UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?",
(
self.encryptx(user_data["Password"][i]),
data[i][15],
data[i][16],
),
)
db.commit()
user_data["Password"][i] = None
del user_data["Password"]
cur.close()
db.close()
def search_member(self) -> List[Dict[str, Union[Path, list]]]:
"""搜索所有脚本实例及其用户数据库路径"""
member_list = []
if (Config.app_path / "config/MaaConfig").exists():
for subdir in (Config.app_path / "config/MaaConfig").iterdir():
if subdir.is_dir():
member_list.append({"Path": subdir / "user_data.db"})
return member_list
def check_PASSWORD(self, PASSWORD: str) -> bool:
"""验证管理密钥"""