Files
ca_auto_table/app/utils/logs.py
2025-11-18 16:46:04 +08:00

219 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
from logging import Logger
from concurrent_log_handler import ConcurrentRotatingFileHandler
from logging.handlers import TimedRotatingFileHandler
import gzip
import shutil
import glob
from datetime import datetime, timedelta
from pathlib import Path
def getLogger(name: str = 'root') -> Logger:
"""
创建一个按2小时滚动、支持多进程安全、自动压缩日志的 Logger
:param name: 日志器名称
:return: 单例 Logger 对象
"""
logger: Logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# 日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 日志文件路径
log_file = os.path.join(log_dir, f"{name}.log")
# 文件处理器每2小时滚动一次保留7天共84个文件支持多进程写入
file_handler = TimedRotatingFileHandler(
filename=log_file,
when='H',
interval=2, # 每2小时切一次
backupCount=84, # 保留7天 = 7 * 24 / 2 = 84个文件
encoding='utf-8',
delay=False,
utc=False # 你也可以改成 True 表示按 UTC 时间切
)
# 设置 Formatter - 简化格式,去掉路径信息
formatter = logging.Formatter(
fmt="{name}{levelname} {asctime} {message}",
datefmt="%Y-%m-%d %H:%M:%S",
style="{"
)
console_formatter = logging.Formatter(
fmt="{levelname} {asctime} {message}",
datefmt="%Y-%m-%d %H:%M:%S",
style="{"
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 添加压缩功能(在第一次创建 logger 时执行一次)
_compress_old_logs(log_dir, name)
return logger
def _compress_old_logs(log_dir: str, name: str):
"""
将旧日志压缩成 .gz 格式
"""
pattern = os.path.join(log_dir, f"{name}.log.*")
for filepath in glob.glob(pattern):
if filepath.endswith('.gz'):
continue
try:
with open(filepath, 'rb') as f_in:
with gzip.open(filepath + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(filepath)
except Exception as e:
print(f"日志压缩失败: {filepath}, 原因: {e}")
def compress_old_logs(log_dir: str = None, name: str = "root"):
"""
压缩旧的日志文件(公共接口)
Args:
log_dir: 日志目录,如果不指定则使用默认目录
name: 日志器名称
"""
if log_dir is None:
log_dir = "logs"
_compress_old_logs(log_dir, name)
def log_api_call(logger: Logger, user_id: str = None, endpoint: str = None, method: str = None, params: dict = None, response_status: int = None, client_ip: str = None):
"""
记录API调用信息包含用户ID、接口路径、请求方法、参数、响应状态和来源IP
Args:
logger: 日志器对象
user_id: 用户ID
endpoint: 接口路径
method: 请求方法 (GET, POST, PUT, DELETE等)
params: 请求参数
response_status: 响应状态码
client_ip: 客户端IP地址
"""
try:
# 构建日志信息
log_parts = []
if user_id:
log_parts.append(f"用户={user_id}")
if client_ip:
log_parts.append(f"IP={client_ip}")
if method and endpoint:
log_parts.append(f"{method} {endpoint}")
elif endpoint:
log_parts.append(f"接口={endpoint}")
if params:
# 过滤敏感信息
safe_params = {k: v for k, v in params.items()
if k.lower() not in ['password', 'token', 'secret', 'key']}
if safe_params:
log_parts.append(f"参数={safe_params}")
if response_status:
log_parts.append(f"状态码={response_status}")
if log_parts:
log_message = " ".join(log_parts)
logger.info(log_message)
except Exception as e:
logger.error(f"记录API调用日志失败: {e}")
def delete_old_compressed_logs(log_dir: str = None, days: int = 7):
"""
删除超过指定天数的压缩日志文件
Args:
log_dir: 日志目录,如果不指定则使用默认目录
days: 保留天数默认7天
"""
try:
if log_dir is None:
log_dir = "logs"
log_path = Path(log_dir)
if not log_path.exists():
return
# 计算截止时间
cutoff_time = datetime.now() - timedelta(days=days)
# 获取所有压缩日志文件
gz_files = [f for f in log_path.iterdir()
if f.is_file() and f.name.endswith('.log.gz')]
deleted_count = 0
for gz_file in gz_files:
# 获取文件修改时间
file_mtime = datetime.fromtimestamp(gz_file.stat().st_mtime)
# 如果文件超过保留期限,删除它
if file_mtime < cutoff_time:
gz_file.unlink()
print(f"删除旧压缩日志文件: {gz_file}")
deleted_count += 1
if deleted_count > 0:
print(f"总共删除了 {deleted_count} 个旧压缩日志文件")
except Exception as e:
print(f"删除旧压缩日志文件失败: {e}")
if __name__ == '__main__':
logger = getLogger('WebAPI')
# 基础日志测试
logger.info("系统启动")
logger.debug("调试信息")
logger.warning("警告信息")
logger.error("错误信息")
# API调用日志测试
log_api_call(
logger=logger,
user_id="user123",
endpoint="/api/users/info",
method="GET",
params={"id": 123, "fields": ["name", "email"]},
response_status=200,
client_ip="192.168.1.100"
)
log_api_call(
logger=logger,
user_id="user456",
endpoint="/api/users/login",
method="POST",
params={"username": "test", "password": "hidden"}, # password会被过滤
response_status=401,
client_ip="10.0.0.50"
)
# 单例验证
logger2 = getLogger('WebAPI')
print(f"Logger单例验证: {id(logger) == id(logger2)}")