feat: 实现脚本查询

This commit is contained in:
DLmaster361
2025-08-04 13:50:31 +08:00
parent 76438459f7
commit 9f8f411c33
3 changed files with 85 additions and 14 deletions

View File

@@ -1,3 +1,5 @@
import uuid
from fastapi import FastAPI, HTTPException, Path, Body
from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional, Literal
@@ -45,6 +47,19 @@ class ScriptCreateOut(BaseModel):
data: Dict[str, Any] = Field(..., description="脚本配置数据")
class ScriptGetIn(BaseModel):
mode: Literal["ALL", "Index", "Single"]
ScriptId: Optional[str] = None
class ScriptGetOut(BaseModel):
code: int = Field(default=200, description="状态码")
status: str = Field(default="success", description="操作状态")
message: str = Field(default="脚本查询成功", description="操作消息")
index: List[Dict[str, str]] = Field(..., description="脚本索引列表")
data: Dict[str, Any] = Field(..., description="脚本列表或单个脚本数据")
class ScriptUpdate(BaseModel):
name: Optional[str] = None
content: Optional[str] = None
@@ -151,22 +166,35 @@ async def add_script(script: ScriptCreateIn = Body(...)) -> ScriptCreateOut:
return ScriptCreateOut(scriptId=str(uid), data=await config.toDict())
@app.post("/api/get/scripts", summary="查询脚本")
async def get_scripts():
"""
查询脚本
"""
# 实现查询脚本的逻辑
return {"status": "success", "data": []}
@app.post(
"/api/get/scripts", summary="查询脚本", response_model=ScriptGetOut, status_code=200
)
async def get_scripts(script: ScriptGetIn = Body(...)) -> ScriptGetOut:
"""查询脚本"""
try:
index, data = await Config.get_script(
script.mode, uuid.UUID(script.ScriptId) if script.ScriptId else None
)
except Exception as e:
return ScriptGetOut(code=500, status="error", message=str(e), index=[], data={})
return ScriptGetOut(index=index, data=data)
@app.post("/api/get/scripts/{scriptId}", summary="查询单个脚本")
async def get_script(scriptId: str = Path(..., description="脚本ID")):
@app.post(
"/api/get/scripts/{scriptId}",
summary="查询单个脚本",
response_model=ScriptGetOut,
status_code=200,
)
async def get_script(scriptId: str = Path(..., description="脚本ID")) -> ScriptGetOut:
"""
查询单个脚本
"""
# 实现查询单个脚本的逻辑
return {"status": "success", "data": {}}
try:
index, data = await Config.get_script("Single", uuid.UUID(scriptId))
except Exception as e:
return ScriptGetOut(code=500, status="error", message=str(e), index=[], data={})
return ScriptGetOut(index=index, data=data)
@app.post("/api/update/scripts/{scriptId}", summary="更新脚本")

View File

@@ -28,11 +28,10 @@ import re
import base64
import calendar
from datetime import datetime, timedelta, date
from collections import defaultdict
from pathlib import Path
from typing import Union, Dict, List, Literal
from typing import Union, Dict, List, Literal, Optional, Any, Tuple, Callable, TypeVar
from app.utils import get_logger
from app.models.ConfigBase import *
@@ -633,6 +632,22 @@ class AppConfig(GlobalConfig):
return await self.ScriptConfig.add(class_book[script])
async def get_script(
self, mode: Literal["ALL", "Index", "Single"], script_id: Optional[uuid.UUID]
) -> tuple[list, dict]:
"""获取脚本配置"""
if mode in ["ALL", "Index"]:
data = await self.ScriptConfig.toDict()
elif mode == "Single":
if script_id is None:
raise ValueError("script_id cannot be None when mode is 'Single'")
data = await self.ScriptConfig.get(script_id)
index = data.pop("instances", [])
return list(index), data if mode != "Index" else {}
# def check_data(self) -> None:
# """检查用户数据文件并处理数据文件版本更新"""

View File

@@ -453,7 +453,7 @@ class MultipleConfig:
if self.file:
await self.save()
async def toDict(self):
async def toDict(self) -> Dict[str, Union[list, dict]]:
"""
将配置项转换为字典
@@ -470,6 +470,34 @@ class MultipleConfig:
data[str(uid)] = await config.toDict()
return data
async def get(self, uid: uuid.UUID) -> Dict[str, Union[list, dict]]:
"""
获取指定 UID 的配置项
Parameters
----------
uid: uuid.UUID
要获取的配置项的唯一标识符
Returns
-------
Dict[str, Union[list, dict]]
对应的配置项数据字典
"""
if uid not in self.data:
raise ValueError(f"Config item with uid {uid} does not exist.")
data: Dict[str, Union[list, dict]] = {
"instances": [
{"uid": str(_), "type": self.data[_].__class__.__name__}
for _ in self.order
if _ == uid
]
}
data[str(uid)] = await self.data[uid].toDict()
return data
async def save(self):
"""保存配置"""