Files
AUTO-MAS-test/app/utils/LogMonitor.py
2025-08-30 20:30:43 +08:00

157 lines
5.2 KiB
Python

import asyncio
import aiofiles
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Callable, Optional, List, Awaitable
from .logger import get_logger
logger = get_logger("日志监控器")
TIME_FIELDS = {
"%Y": "year",
"%m": "month",
"%d": "day",
"%H": "hour",
"%M": "minute",
"%S": "second",
"%f": "microsecond",
}
"""时间字段映射表"""
def strptime(date_string: str, format: str, default_date: datetime) -> datetime:
"""根据指定格式解析日期字符串"""
date = datetime.strptime(date_string, format)
# 构建参数字典
datetime_kwargs = {}
for format_code, field_name in TIME_FIELDS.items():
if format_code in format:
datetime_kwargs[field_name] = getattr(date, field_name)
else:
datetime_kwargs[field_name] = getattr(default_date, field_name)
return datetime(**datetime_kwargs)
class LogMonitor:
def __init__(
self,
time_stamp_range: tuple[int, int],
time_format: str,
callback: Callable[[List[str]], Awaitable[None]],
encoding: str = "utf-8",
):
self.time_stamp_range = time_stamp_range
self.time_format = time_format
self.callback = callback
self.encoding = encoding
self.log_file_path: Optional[Path] = None
self.log_start_time: datetime = datetime.now()
self.last_callback_time: datetime = datetime.now()
self.log_contents: List[str] = []
self.task: Optional[asyncio.Task] = None
self.__is_running = False
async def monitor_log(self):
"""监控日志文件的主循环"""
if self.log_file_path is None or not self.log_file_path.exists():
raise ValueError("日志文件路径未设置或文件不存在")
logger.info(f"开始监控日志文件: {self.log_file_path}")
while self.__is_running:
logger.debug("正在检查日志文件...")
log_contents = []
if_log_start = False
# 检查文件是否仍然存在
if not self.log_file_path.exists():
logger.warning(f"日志文件不存在: {self.log_file_path}")
continue
# 尝试读取文件
try:
async with aiofiles.open(
self.log_file_path, "r", encoding=self.encoding
) as f:
async for line in f:
if not if_log_start:
try:
entry_time = strptime(
line[
self.time_stamp_range[
0
] : self.time_stamp_range[1]
],
self.time_format,
self.last_callback_time,
)
if entry_time > self.log_start_time:
if_log_start = True
log_contents.append(line)
except (ValueError, IndexError):
continue
else:
log_contents.append(line)
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"文件访问错误: {e}")
await asyncio.sleep(5)
continue
except UnicodeDecodeError as e:
logger.error(f"文件编码错误: {e}")
await asyncio.sleep(10)
continue
# 调用回调
if (
log_contents != self.log_contents
or datetime.now() - self.last_callback_time > timedelta(minutes=1)
):
self.log_contents = log_contents
self.last_callback_time = datetime.now()
# 安全调用回调函数
try:
await self.callback(log_contents)
except Exception as e:
logger.error(f"回调函数执行失败: {e}")
await asyncio.sleep(1)
async def start(self, log_file_path: Path, start_time: datetime) -> None:
"""启动监控"""
if log_file_path.is_dir():
raise ValueError(f"日志文件不能是目录: {log_file_path}")
if self.task is not None and not self.task.done():
await self.stop()
self.__is_running = True
self.log_contents = []
self.log_file_path = log_file_path
self.log_start_time = start_time
self.task = asyncio.create_task(self.monitor_log())
logger.info(f"日志监控已启动: {self.log_file_path}")
async def stop(self):
"""停止监控"""
logger.info("请求取消日志监控任务")
if self.task is not None and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
logger.info("日志监控任务已中止")
logger.success("日志监控任务已停止")
self.task = None