Files
AUTO-MAS-test/manage.py
2024-02-15 20:15:05 +08:00

462 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
import datetime
import msvcrt
import sys
import os
import hashlib
import random
import secrets
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Util.Padding import pad,unpad
#读入密码
def readpass(text):
sys.stdout=sys.__stdout__
sys.stdout.write(text)
sys.stdout.flush()
p=''
while True:
typed=msvcrt.getch()
if len(p)!=0:
if typed==b'\r':
sys.stdout.write('\b*')
sys.stdout.flush()
break
elif typed==b'\b':
p=p[:-1]
sys.stdout.write('\b \b')
sys.stdout.flush()
else:
p+=typed.decode("utf-8")
sys.stdout.write('\b*'+typed.decode("utf-8"))
sys.stdout.flush()
elif typed!=b'\r' and typed!=b'\b':
p+=typed.decode("utf-8")
sys.stdout.write(typed.decode("utf-8"))
sys.stdout.flush()
print('')
return p
#配置密钥
def getPASSWORD(PASSWORD):
#生成RSA密钥对
key=RSA.generate(2048)
public_key_local=key.publickey()
private_key=key
#保存RSA公钥
with open('data/key/public_key.pem','wb') as f:
f.write(public_key_local.exportKey())
#生成密钥转换与校验随机盐
PASSWORDsalt=secrets.token_hex(random.randint(32,1024))
with open("data/key/PASSWORDsalt.txt","w",encoding="utf-8") as f:
print(PASSWORDsalt,file=f)
verifysalt=secrets.token_hex(random.randint(32,1024))
with open("data/key/verifysalt.txt","w",encoding="utf-8") as f:
print(verifysalt,file=f)
#将管理密钥转化为AES-256密钥
AES_password=hashlib.sha256((PASSWORD+PASSWORDsalt).encode("utf-8")).digest()
#生成AES-256密钥校验哈希值并保存
AES_password_verify=hashlib.sha256(AES_password+verifysalt.encode("utf-8")).digest()
with open("data/key/AES_password_verify.bin","wb") as f:
f.write(AES_password_verify)
#AES-256加密RSA私钥并保存密文
AES_key=AES.new(AES_password,AES.MODE_ECB)
private_key_local=AES_key.encrypt(pad(private_key.exportKey(),32))
with open("data/key/private_key.bin","wb") as f:
f.write(private_key_local)
#加密
def encryptx(note):
#读取RSA公钥
with open('data/key/public_key.pem','rb') as f:
public_key_local=RSA.import_key(f.read())
#使用RSA公钥对数据进行加密
cipher=PKCS1_OAEP.new(public_key_local)
encrypted=cipher.encrypt(note.encode("utf-8"))
return encrypted
#解密
def decryptx(note,PASSWORD):
#读入RSA私钥密文、盐与校验哈希值
with open("data/key/private_key.bin","rb") as f:
private_key_local=f.read().strip()
with open("data/key/PASSWORDsalt.txt","r",encoding="utf-8") as f:
PASSWORDsalt=f.read().strip()
with open("data/key/verifysalt.txt","r",encoding="utf-8") as f:
verifysalt=f.read().strip()
with open("data/key/AES_password_verify.bin","rb") as f:
AES_password_verify=f.read().strip()
#将管理密钥转化为AES-256密钥并验证
AES_password=hashlib.sha256((PASSWORD+PASSWORDsalt).encode("utf-8")).digest()
AES_password_SHA=hashlib.sha256(AES_password+verifysalt.encode("utf-8")).digest()
if AES_password_SHA!=AES_password_verify:
return "管理密钥错误"
else:
#AES解密RSA私钥
AES_key=AES.new(AES_password,AES.MODE_ECB)
private_key_pem=unpad(AES_key.decrypt(private_key_local),32)
private_key=RSA.import_key(private_key_pem)
#使用RSA私钥解密数据
decrypter=PKCS1_OAEP.new(private_key)
note=decrypter.decrypt(note)
return note.decode("utf-8")
#修改管理密钥
def changePASSWORD():
#获取用户信息
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE True")
data=cur.fetchall()
cur.close()
db.close()
data=[list(row) for row in data]
global PASSWORD
#验证管理密钥
PASSWORDold=readpass("请输入旧管理密钥:")
if len(data)==0:
print("当前无用户,验证自动通过")
PASSWORDnew=readpass("请输入新管理密钥:")
getPASSWORD(PASSWORDnew)
PASSWORD=PASSWORDnew
return "管理密钥修改成功"
while decryptx(data[0][6],PASSWORDold)=="管理密钥错误":
print("管理密钥错误")
PASSWORDold=readpass("请输入旧管理密钥:")
print("验证通过")
#修改管理密钥
PASSWORDnew=readpass("请输入新管理密钥:")
#使用旧管理密钥解密
for i in range(len(data)):
data[i][6]=decryptx(data[i][6],PASSWORDold)
#使用新管理密钥重新加密
getPASSWORD(PASSWORDnew)
db=sqlite3.connect(DATABASE)
cur=db.cursor()
for i in range(len(data)):
cur.execute("UPDATE adminx SET password=? WHERE admin=?",(encryptx(data[i][6]),data[i][0]))
db.commit()
cur.close()
db.close()
PASSWORD=PASSWORDnew
return "管理密钥修改成功"
#添加用户
def add():
db=sqlite3.connect(DATABASE)
cur=db.cursor()
adminx=input("用户名:")
#用户名重复验证
while search(adminx,0)=="":
print("该用户已存在,请重新输入")
adminx=input("用户名:")
numberx=input("手机号码:")
dayx=int(input("代理天数:"))
gamex=input("关卡号:")
passwordx=readpass("密码:")
passwordx=encryptx(passwordx)
#应用更新
cur.execute("INSERT INTO adminx VALUES(?,?,?,'y','2000-01-01',?,?)",(adminx,numberx,dayx,gamex,passwordx))
db.commit()
cur.close()
db.close()
return "操作成功"
#删除用户信息
def delete(id):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
#检查用户是否存在
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
return "未找到"+id
#应用更新
cur.execute("DELETE FROM adminx WHERE admin=?",(id,))
db.commit()
cur.close()
db.close()
return "成功删除"+id
#检索用户信息与配置
def search(id,book):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
#处理启动时间查询
if id=="time":
cur.execute("SELECT * FROM timeset WHERE True")
timex=cur.fetchall()
timex=[list(row) for row in timex]
cur.close()
db.close()
if len(timex)==0:
return "启动时间未设置"
else:
for i in range(len(timex)):
print(timex[i][0])
return ""
#处理MAA路径查询
if id=="maa":
cur.execute("SELECT * FROM pathset WHERE True")
pathx=cur.fetchall()
if len(pathx)>0:
cur.close()
db.close()
return pathx[0][0]
else:
cur.close()
db.close()
return "MAA路径未设置"
#处理用户查询与全部信息查询
if id=="all":
cur.execute("SELECT * FROM adminx WHERE True")
else:
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
#处理全部信息查询时的MAA路径与启动时间查询
if id=="all":
cur.execute("SELECT * FROM pathset WHERE True")
pathx=cur.fetchall()
if len(pathx)>0:
print("\nMAA路径"+pathx[0][0])
else:
print("\nMAA路径未设置")
cur.execute("SELECT * FROM timeset WHERE True")
timex=cur.fetchall()
timex=[list(row) for row in timex]
if len(timex)==0:
print("\n启动时间未设置")
else:
print("启动时间:",end='')
for i in range(len(timex)):
print(timex[i][0],end=' ')
print('')
cur.close()
db.close()
data=[list(row) for row in data]
if len(data)>0:
#转译执行情况、用户状态,对全部信息查询隐去密码
curdate=datetime.date.today()
curdate=curdate.strftime('%Y-%m-%d')
for i in range(len(data)):
if data[i][4]==curdate:
data[i][4]="今日已执行"
else:
data[i][4]="今日未执行"
if data[i][3]=='y':
data[i][3]="启用"
else:
data[i][3]="禁用"
if id=="all":
data[i][6]="******"
else:
#解密
global PASSWORD
if PASSWORD==0 or decryptx(data[i][6],PASSWORD)=="管理密钥错误":
PASSWORD=readpass("请输入管理密钥:")
data[i][6]=decryptx(data[i][6],PASSWORD)
#制表输出
if book==1:
print('')
print(unit("用户名",15),unit("手机号码",12),unit("代理天数",8),unit("状态",4),unit("执行情况",10),unit("关卡",10),unit("密码",25))
for i in range(len(data)):
print(unit(data[i][0],15),unit(data[i][1],12),unit(data[i][2],8),unit(data[i][3],4),unit(data[i][4],10),unit(data[i][5],10),unit(data[i][6],25))
return ""
elif id=="all":
return "\n当前没有用户记录"
else:
return "未找到"+id
#续期
def renewal(readxx):
#提取用户名与续期时间
for i in range(len(readxx)):
if readxx[i]==' ':
id=readxx[:i]
dayp=int(readxx[i+1:])
break
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
db.close()
return "未找到"+id
#应用更新
cur.execute("UPDATE adminx SET day=? WHERE admin=?",(data[0][2]+dayp,id))
db.commit()
cur.close()
db.close()
return '成功更新'+id+'的代理天数至'+str(data[0][2]+dayp)+''
#用户状态配置
def turn(id,t):
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
db.close()
return "未找到"+id
#应用更新
cur.execute("UPDATE adminx SET status=? WHERE admin=?",(t,id))
db.commit()
cur.close()
db.close()
if t=='y':
return '已启用'+id
else:
return '已禁用'+id
#修改刷取关卡
def gameid(readxx):
#提取用户名与修改值
for i in range(len(readxx)):
if readxx[i]==' ':
id=readxx[:i]
gamep=readxx[i+1:]
break
#检查用户是否存在
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM adminx WHERE admin=?",(id,))
data=cur.fetchall()
if len(data)==0:
cur.close()
db.close()
return "未找到"+id
#导入与应用特殊关卡规则
games={}
with open('data/gameid.txt',encoding='utf-8') as f:
gameids=f.readlines()
for i in range(len(gameids)):
for j in range(len(gameids[i])):
if gameids[i][j]=='':
gamein=gameids[i][:j]
gameout=gameids[i][j+1:]
break
games[gamein]=gameout.strip()
if gamep in games:
gamep=games[gamep]
#应用更新
cur.execute("UPDATE adminx SET game=? WHERE admin=?",(gamep,id))
db.commit()
cur.close()
db.close()
return '成功更新'+id+'的关卡为'+gamep
#设置MAA路径
def setpath(pathx):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
cur.execute("SELECT * FROM pathset WHERE True")
pathold=cur.fetchall()
if len(pathold)>0:
cur.execute("UPDATE pathset SET path=? WHERE True",(pathx,))
else:
cur.execute("INSERT INTO pathset VALUES(?)",(pathx,))
db.commit()
cur.close()
db.close()
return "MAA路径已设置为"+pathx
#设置启动时间
def settime(book,timex):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
#检查待操作对象存在情况
cur.execute("SELECT * FROM timeset WHERE True")
timeold=cur.fetchall()
timeold=[list(row) for row in timeold]
timenew=[]
timenew.append(timex)
#添加时间设置
if book=='+':
if timenew in timeold:
cur.close()
db.close()
return "已存在"+timex
else:
cur.execute("INSERT INTO timeset VALUES(?)",(timex,))
db.commit()
cur.close()
db.close()
return "已添加"+timex
#删除时间设置
elif book=='-':
if timenew in timeold:
cur.execute("DELETE FROM timeset WHERE time=?",(timex,))
db.commit()
cur.close()
db.close()
return "已删除"+timex
else:
cur.close()
db.close()
return "未找到"+timex
#统一制表单元
def unit(x,m):
#字母与连接符占1位中文占2位
x=str(x)
n=0
for i in x:
if 'a'<=i<='z' or 'A'<=i<='Z' or '0'<=i<='9' or i=='_' or i=='-':
n+=1
return ' '+x+' '*(m-2*len(x)+n)
#初期检查
DATABASE="data/data.db"
PASSWORD=0
if not os.path.exists(DATABASE):
db=sqlite3.connect(DATABASE)
cur=db.cursor()
db.execute("CREATE TABLE adminx(admin text,number text,day int,status text,last date,game text,password byte)")
db.execute("CREATE TABLE pathset(path text)")
db.execute("CREATE TABLE timeset(time text)")
readx=input("首次启动请设置MAA路径")
cur.execute("INSERT INTO pathset VALUES(?)",(readx,))
db.commit()
cur.close()
db.close()
PASSWORD=readpass("请设置管理密钥(密钥与数据库绑定):")
getPASSWORD(PASSWORD)
#初始界面
print("Good evening!")
print(search("all",1))
#主程序
while True:
read=input()
if len(read)==0:
print("无法识别的输入")
elif read[0]=='+' and len(read)==1:
print(add())
elif read[0]=='-' and len(read)==1:
exit()
elif read[0]=='/':
print(setpath(read[1:]))
elif read[0]=='*' and len(read)==1:
print(changePASSWORD())
elif read[0]==':' and (read[1]=='+' or read[1]=='-'):
print(settime(read[1],read[2:]))
else:
if read[-1]=='?' and read[-2]==' ':
print(search(read[:-2],1))
elif read[-1]=='+' and read[-2]==' ':
print(renewal(read[:-2]))
elif read[-1]=='-' and read[-2]==' ':
print(delete(read[:-2]))
elif read[-1]=='~' and read[-2]==' ':
print(gameid(read[:-2]))
elif (read[-1]=='y' or read[-1]=='n') and read[-2]==' ':
print(turn(read[:-2],read[-1]))
else:
print("无法识别的输入")