创建全新模块化架构
This commit is contained in:
35
app/services/__init__.py
Normal file
35
app/services/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
||||
# <AUTO_MAA:A MAA Multi Account Management and Automation Tool>
|
||||
# Copyright © <2024> <DLmaster361>
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA服务包
|
||||
v4.2
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
__version__ = "4.2.0"
|
||||
__author__ = "DLmaster361 <DLmaster_361@163.com>"
|
||||
__license__ = "GPL-3.0 license"
|
||||
|
||||
from .notification import Notification
|
||||
from .security import CryptoHandler
|
||||
|
||||
__all__ = ["Notification", "CryptoHandler"]
|
||||
97
app/services/notification.py
Normal file
97
app/services/notification.py
Normal file
@@ -0,0 +1,97 @@
|
||||
# <AUTO_MAA:A MAA Multi Account Management and Automation Tool>
|
||||
# Copyright © <2024> <DLmaster361>
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA通知服务
|
||||
v4.2
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
from plyer import notification
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.header import Header
|
||||
from email.utils import formataddr
|
||||
import os
|
||||
|
||||
from app import AppConfig
|
||||
|
||||
|
||||
class Notification:
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
|
||||
self.config = config
|
||||
|
||||
def push_notification(self, title, message, ticker, t):
|
||||
"""推送系统通知"""
|
||||
|
||||
notification.notify(
|
||||
title=title,
|
||||
message=message,
|
||||
app_name="AUTO_MAA",
|
||||
app_icon=os.path.normpath(
|
||||
f"{self.config.app_path}/resources/icons/AUTO_MAA.ico"
|
||||
),
|
||||
timeout=t,
|
||||
ticker=ticker,
|
||||
toast=True,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def send_mail(self, title, content):
|
||||
"""使用官方专用邮箱推送邮件通知"""
|
||||
|
||||
# 声明:此邮箱为AUTO_MAA项目组资产,未经授权不得私自使用
|
||||
# 注意:此声明注释只有使用者更换发信邮箱时才能删除,本条规则优先级高于GPLv3
|
||||
|
||||
# 第三方 SMTP 服务配置
|
||||
mail_host = "smtp.163.com" # 设置服务器
|
||||
mail_sender = "AUTO_MAA_server@163.com" # 用户名
|
||||
mail_key = "SYrq87nDLD4RNB5T" # 授权码 24/11/15
|
||||
|
||||
# 定义邮件正文
|
||||
message = MIMEText(content, "plain", "utf-8")
|
||||
message["From"] = formataddr(
|
||||
(Header("AUTO_MAA通知服务", "utf-8").encode(), "AUTO_MAA_server@163.com")
|
||||
) # 发件人显示的名字
|
||||
message["To"] = formataddr(
|
||||
(
|
||||
Header("AUTO_MAA用户", "utf-8").encode(),
|
||||
self.config.content["Default"]["SelfSet.MailAddress"],
|
||||
)
|
||||
) # 收件人显示的名字
|
||||
message["Subject"] = Header(title, "utf-8")
|
||||
|
||||
try:
|
||||
smtpObj = smtplib.SMTP_SSL(mail_host, 465) # 465为SMTP_SSL默认端口
|
||||
smtpObj.login(mail_sender, mail_key)
|
||||
smtpObj.sendmail(
|
||||
mail_sender,
|
||||
self.config.content["Default"]["SelfSet.MailAddress"],
|
||||
message.as_string(),
|
||||
)
|
||||
return True
|
||||
except smtplib.SMTPException as e:
|
||||
return f"发送邮件时出错:\n{e}"
|
||||
finally:
|
||||
smtpObj.quit()
|
||||
183
app/services/security.py
Normal file
183
app/services/security.py
Normal file
@@ -0,0 +1,183 @@
|
||||
# <AUTO_MAA:A MAA Multi Account Management and Automation Tool>
|
||||
# Copyright © <2024> <DLmaster361>
|
||||
|
||||
# This file is part of AUTO_MAA.
|
||||
|
||||
# AUTO_MAA is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
# AUTO_MAA is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty
|
||||
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
|
||||
# the GNU General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with AUTO_MAA. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
# DLmaster_361@163.com
|
||||
|
||||
"""
|
||||
AUTO_MAA
|
||||
AUTO_MAA主程序
|
||||
v4.2
|
||||
作者:DLmaster_361
|
||||
"""
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
import random
|
||||
import secrets
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_OAEP
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
from app import AppConfig
|
||||
|
||||
|
||||
class CryptoHandler:
|
||||
|
||||
def __init__(self, config: AppConfig):
|
||||
|
||||
self.config = config
|
||||
|
||||
def get_PASSWORD(self, PASSWORD: str) -> None:
|
||||
"""配置管理密钥"""
|
||||
|
||||
# 生成目录
|
||||
os.makedirs(os.path.normpath(f"{self.config.app_path}/data/key"), exist_ok=True)
|
||||
|
||||
# 生成RSA密钥对
|
||||
key = RSA.generate(2048)
|
||||
public_key_local = key.publickey()
|
||||
private_key = key
|
||||
# 保存RSA公钥
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/public_key.pem"), "wb"
|
||||
) as f:
|
||||
f.write(public_key_local.exportKey())
|
||||
# 生成密钥转换与校验随机盐
|
||||
PASSWORD_salt = secrets.token_hex(random.randint(32, 1024))
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/PASSWORDsalt.txt"),
|
||||
"w",
|
||||
encoding="utf-8",
|
||||
) as f:
|
||||
print(PASSWORD_salt, file=f)
|
||||
verify_salt = secrets.token_hex(random.randint(32, 1024))
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/verifysalt.txt"),
|
||||
"w",
|
||||
encoding="utf-8",
|
||||
) as f:
|
||||
print(verify_salt, file=f)
|
||||
# 将管理密钥转化为AES-256密钥
|
||||
AES_password = hashlib.sha256(
|
||||
(PASSWORD + PASSWORD_salt).encode("utf-8")
|
||||
).digest()
|
||||
# 生成AES-256密钥校验哈希值并保存
|
||||
AES_password_verify = hashlib.sha256(
|
||||
AES_password + verify_salt.encode("utf-8")
|
||||
).digest()
|
||||
with open(
|
||||
os.path.normpath(
|
||||
f"{self.config.app_path}/data/key/AES_password_verify.bin"
|
||||
),
|
||||
"wb",
|
||||
) as f:
|
||||
f.write(AES_password_verify)
|
||||
# AES-256加密RSA私钥并保存密文
|
||||
AES_key = AES.new(AES_password, AES.MODE_ECB)
|
||||
private_key_local = AES_key.encrypt(pad(private_key.exportKey(), 32))
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/private_key.bin"), "wb"
|
||||
) as f:
|
||||
f.write(private_key_local)
|
||||
|
||||
def encryptx(self, note: str) -> bytes:
|
||||
"""加密数据"""
|
||||
|
||||
# 读取RSA公钥
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/public_key.pem"), "rb"
|
||||
) as f:
|
||||
public_key_local = RSA.import_key(f.read())
|
||||
# 使用RSA公钥对数据进行加密
|
||||
cipher = PKCS1_OAEP.new(public_key_local)
|
||||
encrypted = cipher.encrypt(note.encode("utf-8"))
|
||||
return encrypted
|
||||
|
||||
def decryptx(self, note: bytes, PASSWORD: str) -> str:
|
||||
"""解密数据"""
|
||||
|
||||
# 读入RSA私钥密文、盐与校验哈希值
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/private_key.bin"), "rb"
|
||||
) as f:
|
||||
private_key_local = f.read().strip()
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/PASSWORDsalt.txt"),
|
||||
"r",
|
||||
encoding="utf-8",
|
||||
) as f:
|
||||
PASSWORD_salt = f.read().strip()
|
||||
with open(
|
||||
os.path.normpath(f"{self.config.app_path}/data/key/verifysalt.txt"),
|
||||
"r",
|
||||
encoding="utf-8",
|
||||
) as f:
|
||||
verify_salt = f.read().strip()
|
||||
with open(
|
||||
os.path.normpath(
|
||||
f"{self.config.app_path}/data/key/AES_password_verify.bin"
|
||||
),
|
||||
"rb",
|
||||
) as f:
|
||||
AES_password_verify = f.read().strip()
|
||||
# 将管理密钥转化为AES-256密钥并验证
|
||||
AES_password = hashlib.sha256(
|
||||
(PASSWORD + PASSWORD_salt).encode("utf-8")
|
||||
).digest()
|
||||
AES_password_SHA = hashlib.sha256(
|
||||
AES_password + verify_salt.encode("utf-8")
|
||||
).digest()
|
||||
if AES_password_SHA != AES_password_verify:
|
||||
return "管理密钥错误"
|
||||
else:
|
||||
# AES解密RSA私钥
|
||||
AES_key = AES.new(AES_password, AES.MODE_ECB)
|
||||
private_key_pem = unpad(AES_key.decrypt(private_key_local), 32)
|
||||
private_key = RSA.import_key(private_key_pem)
|
||||
# 使用RSA私钥解密数据
|
||||
decrypter = PKCS1_OAEP.new(private_key)
|
||||
note = decrypter.decrypt(note)
|
||||
return note.decode("utf-8")
|
||||
|
||||
def change_PASSWORD(self, data: list, PASSWORD_old: str, PASSWORD_new: str) -> None:
|
||||
"""修改管理密钥"""
|
||||
|
||||
# 使用旧管理密钥解密
|
||||
new_data = []
|
||||
for i in range(len(data)):
|
||||
new_data.append(self.decryptx(data[i][12], PASSWORD_old))
|
||||
# 使用新管理密钥重新加密
|
||||
self.get_PASSWORD(PASSWORD_new)
|
||||
for i in range(len(data)):
|
||||
self.config.cur.execute(
|
||||
"UPDATE adminx SET password = ? WHERE mode = ? AND uid = ?",
|
||||
(
|
||||
self.encryptx(new_data[i]),
|
||||
data[i][15],
|
||||
data[i][16],
|
||||
),
|
||||
)
|
||||
self.config.db.commit(),
|
||||
new_data[i] = None
|
||||
del new_data
|
||||
|
||||
def check_PASSWORD(self, PASSWORD: str) -> bool:
|
||||
"""验证管理密钥"""
|
||||
|
||||
return bool(self.decryptx(self.encryptx(""), PASSWORD) != "管理密钥错误")
|
||||
Reference in New Issue
Block a user