用户密码加密部分上线,SQL指令更新防止注入攻击

This commit is contained in:
DLmaster
2024-02-11 19:45:28 +08:00
parent 863b6dd220
commit 3b0b50670d
7 changed files with 156 additions and 36 deletions

145
manage.py
View File

@@ -1,6 +1,107 @@
import sqlite3
import datetime
import msvcrt
import sys
import os
import hashlib
import random
import secrets
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Util.Padding import pad,unpad
#读入密码
def readpass(text):
sys.stdout=sys.__stdout__
sys.stdout.write(text)
sys.stdout.flush()
p=''
while True:
typed=msvcrt.getch()
if len(p)!=0:
if typed==b'\r':
sys.stdout.write('\b*')
sys.stdout.flush()
break
elif typed==b'\b':
p=p[:-1]
sys.stdout.write('\b \b')
sys.stdout.flush()
else:
p+=typed.decode("utf-8")
sys.stdout.write('\b*'+typed.decode("utf-8"))
sys.stdout.flush()
elif typed!=b'\r' and typed!=b'\b':
p+=typed.decode("utf-8")
sys.stdout.write(typed.decode("utf-8"))
sys.stdout.flush()
print('')
return p
#配置密钥
def getPASSWORD(PASSWORD):
#生成RSA密钥对
key=RSA.generate(2048)
public_key_local=key.publickey()
private_key=key
#保存RSA公钥
with open('data/key/public_key.pem','wb') as f:
f.write(public_key_local.exportKey())
#生成密钥转换与校验随机盐
PASSWORDsalt=secrets.token_hex(random.randint(32,1024))
with open("data/key/PASSWORDsalt.txt","w",encoding="utf-8") as f:
print(PASSWORDsalt,file=f)
verifysalt=secrets.token_hex(random.randint(32,1024))
with open("data/key/verifysalt.txt","w",encoding="utf-8") as f:
print(verifysalt,file=f)
#将管理密钥转化为AES-256密钥
AES_password=hashlib.sha256((PASSWORD+PASSWORDsalt).encode("utf-8")).digest()
#生成AES-256密钥校验哈希值并保存
AES_password_verify=hashlib.sha256(AES_password+verifysalt.encode("utf-8")).digest()
with open("data/key/AES_password_verify.bin","wb") as f:
f.write(AES_password_verify)
#AES-256加密RSA私钥并保存密文
AES_key=AES.new(AES_password,AES.MODE_ECB)
private_key_local=AES_key.encrypt(pad(private_key.exportKey(),32))
with open("data/key/private_key.bin","wb") as f:
f.write(private_key_local)
#加密
def encryptx(note):
#读取RSA公钥
with open('data/key/public_key.pem','rb') as f:
public_key_local=RSA.import_key(f.read())
#使用RSA公钥对数据进行加密
cipher=PKCS1_OAEP.new(public_key_local)
encrypted=cipher.encrypt(note.encode("utf-8"))
return encrypted
#解密
def decryptx(note,PASSWORD):
#读入RSA私钥密文、盐与校验哈希值
with open("data/key/private_key.bin","rb") as f:
private_key_local=f.read().strip()
with open("data/key/PASSWORDsalt.txt","r",encoding="utf-8") as f:
PASSWORDsalt=f.read().strip()
with open("data/key/verifysalt.txt","r",encoding="utf-8") as f:
verifysalt=f.read().strip()
with open("data/key/AES_password_verify.bin","rb") as f:
AES_password_verify=f.read().strip()
#将管理密钥转化为AES-256密钥并验证
AES_password=hashlib.sha256((PASSWORD+PASSWORDsalt).encode("utf-8")).digest()
AES_password_SHA=hashlib.sha256(AES_password+verifysalt.encode("utf-8")).digest()
if AES_password_SHA!=AES_password_verify:
return "管理密钥错误"
else:
#AES解密RSA私钥
AES_key=AES.new(AES_password,AES.MODE_ECB)
private_key_pem=unpad(AES_key.decrypt(private_key_local),32)
private_key=RSA.import_key(private_key_pem)
#使用RSA私钥解密数据
decrypter=PKCS1_OAEP.new(private_key)
return decrypter.decrypt(note)
#添加用户
def add():
@@ -14,9 +115,10 @@ def add():
numberx=input("手机号码:")
dayx=int(input("代理天数:"))
gamex=input("关卡号:")
passwordx=input("密码:")
passwordx=readpass("密码:")
passwordx=encryptx(passwordx)
#应用更新
cur.execute("INSERT INTO adminx(admin,number,day,status,last,game,password) VALUES('%s','%s',%d,'y','2000-01-01','%s','%s')" %(adminx,numberx,dayx,gamex,passwordx))
cur.execute("INSERT INTO adminx VALUES(?,?,?,'y','2000-01-01',?,?)",(adminx,numberx,dayx,gamex,passwordx))
db.commit()
cur.close()
db.close()
@@ -27,12 +129,12 @@ def delete(id):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
#检查用户是否存在
cur.execute("SELECT * FROM adminx WHERE admin='%s'" %(id))
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
return "未找到"+id
#应用更新
cur.execute("DELETE FROM adminx WHERE admin='%s'" %(id))
cur.execute("DELETE FROM adminx WHERE admin=?",(id,))
db.commit()
cur.close()
db.close()
@@ -71,7 +173,7 @@ def search(id,book):
if id=="all":
cur.execute("SELECT * FROM adminx WHERE True")
else:
cur.execute("SELECT * FROM adminx WHERE admin='%s'" %(id))
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
#处理全部信息查询时的MAA路径与启动时间查询
if id=="all":
@@ -109,6 +211,12 @@ def search(id,book):
data[i][3]="禁用"
if id=="all":
data[i][6]="******"
else:
#解密
global PASSWORD
if PASSWORD==0:
PASSWORD=input("请输入管理密钥:")
data[i][6]=decryptx(data[i][6],PASSWORD).decode("utf-8")
#制表输出
if book==1:
print('')
@@ -132,14 +240,14 @@ def renewal(readxx):
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin='%s'" %(id))
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
db.close()
return "未找到"+id
#应用更新
cur.execute("UPDATE adminx SET day=%d WHERE admin='%s'" %(data[0][2]+dayp,id))
cur.execute("UPDATE adminx SET day=? WHERE admin=?",(data[0][2]+dayp,id))
db.commit()
cur.close()
db.close()
@@ -150,14 +258,14 @@ def turn(id,t):
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin='%s'" %(id))
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
db.close()
return "未找到"+id
#应用更新
cur.execute("UPDATE adminx SET status='%s' WHERE admin='%s'" %(t,id))
cur.execute("UPDATE adminx SET status=? WHERE admin=?",(t,id))
db.commit()
cur.close()
db.close()
@@ -177,7 +285,7 @@ def gameid(readxx):
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin='%s'" %(id))
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
@@ -197,7 +305,7 @@ def gameid(readxx):
if gamep in games:
gamep=games[gamep]
#应用更新
cur.execute("UPDATE adminx SET game='%s' WHERE admin='%s'" %(gamep,id))
cur.execute("UPDATE adminx SET game=? WHERE admin=?",(gamep,id))
db.commit()
cur.close()
db.close()
@@ -210,9 +318,9 @@ def setpath(pathx):
cur.execute("SELECT * FROM pathset WHERE True")
pathold=cur.fetchall()
if len(pathold)>0:
cur.execute("UPDATE pathset SET path='%s' WHERE True" %(pathx))
cur.execute("UPDATE pathset SET path=? WHERE True",(pathx,))
else:
cur.execute("INSERT INTO pathset(path) VALUES('%s')" %(pathx))
cur.execute("INSERT INTO pathset VALUES(?)",(pathx,))
db.commit()
cur.close()
db.close()
@@ -235,7 +343,7 @@ def settime(book,timex):
db.close()
return "已存在"+timex
else:
cur.execute("INSERT INTO timeset(time) VALUES('%s')" %(timex))
cur.execute("INSERT INTO timeset VALUES(?)",(timex,))
db.commit()
cur.close()
db.close()
@@ -243,7 +351,7 @@ def settime(book,timex):
#删除时间设置
elif book=='-':
if timenew in timeold:
cur.execute("DELETE FROM timeset WHERE time='%s'" %(timex))
cur.execute("DELETE FROM timeset WHERE time=?",(timex,))
db.commit()
cur.close()
db.close()
@@ -265,17 +373,20 @@ def unit(x,m):
#初期检查
DATABASE="data/data.db"
PASSWORD=0
if not os.path.exists(DATABASE):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
db.execute("CREATE TABLE adminx(admin text,number text,day int,status text,last date,game text,password text)")
db.execute("CREATE TABLE adminx(admin text,number text,day int,status text,last date,game text,password byte)")
db.execute("CREATE TABLE pathset(path text)")
db.execute("CREATE TABLE timeset(time text)")
readx=input("首次启动请设置MAA路径")
cur.execute("INSERT INTO pathset(path) VALUES('%s')" %(readx))
cur.execute("INSERT INTO pathset VALUES(?)",(readx,))
db.commit()
cur.close()
db.close()
PASSWORD=readpass("请设置管理密钥(密钥与数据库绑定且唯一不可变):")
getPASSWORD(PASSWORD)
#初始界面
print("Good evening!")