From 55e943e5c37a7f6ad838405fdd0d9ce4d36e1265 Mon Sep 17 00:00:00 2001 From: Ivan087 Date: Wed, 30 Jul 2025 18:13:10 +0800 Subject: [PATCH 1/2] support loki in server.py --- requirements.txt | 3 +- server.py | 51 ++++++++++++++++++++- src/log/loki_config.py | 102 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 src/log/loki_config.py diff --git a/requirements.txt b/requirements.txt index 58d7037..d654b8b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ chromadb==0.5.0 langchain==0.1.17 langchain-community==0.0.36 sentence-transformers==2.7.0 -openai \ No newline at end of file +openai +python-logging-loki \ No newline at end of file diff --git a/server.py b/server.py index c5c7990..5dcca0d 100644 --- a/server.py +++ b/server.py @@ -4,6 +4,9 @@ from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse, Response, StreamingResponse from src.blackbox.blackbox_factory import BlackboxFactory +from src.log.loki_config import LokiLogger +import logging + from fastapi.middleware.cors import CORSMiddleware from injector import Injector @@ -20,6 +23,39 @@ app.add_middleware( injector = Injector() blackbox_factory = injector.get(BlackboxFactory) +logger = LokiLogger( + tags={"application": "Server", "env": "Development", "source": "python-fastapi-app"}, + level=logging.DEBUG +).get_logger() + +@app.exception_handler(Exception) +async def catch_all_exceptions(request: Request, exc: Exception): + """ + 捕获所有在 ASGI 应用中未被处理的异常,并记录到 Loki。 + """ + logger.error( + f"Unhandled exception in ASGI application for path: {request.url.path}, method: {request.method} - {exc}", + exc_info=True, # 必须为 True,才能获取完整的堆栈信息 + extra={ + "path": request.url.path, + "method": request.method, + "error_type": type(exc).__name__, + "error_message": str(exc) + } + ) + return JSONResponse( + status_code=500, + content={"message": "Internal Server Error", "detail": "An unexpected error occurred."} + ) + +@app.get("/trigger-error") +async def trigger_error(): + """ + 这个端点会故意抛出一个异常来测试全局异常处理器。 + """ + logger.info("尝试触发错误...") + raise ValueError("这是一个测试错误,用于演示全局异常处理。") + @app.post("/") @app.get("/") async def blackbox(blackbox_name: Union[str, None] = None, request: Request = None): @@ -29,7 +65,20 @@ async def blackbox(blackbox_name: Union[str, None] = None, request: Request = No box = blackbox_factory.get_blackbox(blackbox_name) except ValueError: return await JSONResponse(content={"error": "value error"}, status_code=status.HTTP_400_BAD_REQUEST) - return await box.fast_api_handler(request) + try: + response = await box.fast_api_handler(request) + # 检查响应的状态码,如果 >= 400,则认为是错误响应并记录 + if response.status_code >= 400: + try: + response_content_bytes = await response.body() + decoded_content = response_content_bytes.decode('utf-8', errors='ignore') + logger.error(f"Blackbox 返回错误响应: URL={request.url}, Method={request.method}, Status={response.status_code}, Content={decoded_content}") + except Exception as body_exc: + logger.error(f"Blackbox {blackbox_name} 返回错误响应: URL={request.url}, Method={request.method}, Status={response.status_code}, 无法解析响应内容: {body_exc}") + return response + except Exception as e: + logger.error(f"Blackbox handler 内部抛出异常: URL={request.url}, Method={request.method}, 异常: {e}", exc_info=True) + raise e # @app.get("/audio/{filename}") # async def serve_audio(filename: str): diff --git a/src/log/loki_config.py b/src/log/loki_config.py new file mode 100644 index 0000000..7d59112 --- /dev/null +++ b/src/log/loki_config.py @@ -0,0 +1,102 @@ +import logging +import logging_loki +import os +from requests.auth import HTTPBasicAuth +from typing import Dict, Optional +class LokiLogger: + """ + 一个用于配置和获取Loki日志记录器的类。 + 可以在其他文件里import后直接使用。 + + 用法示例: + from loki_config import LokiLogger + + # 获取日志记录器实例 (使用默认配置或环境变量) + loki_logger_instance = LokiLogger().get_logger() + loki_logger_instance.info("这条日志会推送到Loki!") + + # 或者使用自定义配置 + my_loki_logger = LokiLogger( + url="https://your-custom-loki-url.com/loki/api/v1/push", + username="myuser", + password="mypassword", + tags={"app_name": "my-custom-app", "environment": "prod"}, + level=logging.DEBUG + ).get_logger() + my_loki_logger.debug("这条调试日志也推送到Loki。") + """ + def __init__(self, + url: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + tags: Optional[Dict[str, str]] = None, + level: int = logging.INFO, + logger_name: str = "app_loki_logger"): + """ + 初始化LokiLogger。 + + Args: + url (str, optional): Loki的推送URL。默认从环境变量LOKI_URL获取, + 如果不存在则使用"https://loki.bwgdi.com/loki/api/v1/push"。 + username (str, optional): Loki的认证用户名。默认从环境变量LOKI_USERNAME获取。 + password (str, optional): Loki的认证密码。默认从环境变量LOKI_PASSWORD获取。 + tags (Dict[str, str], optional): 发送到Loki的默认标签。例如:{"application": "my-app"}。 + 如果未提供,则默认为{"application": "default-app", "source": "python-app"}。 + level (int, optional): 日志级别(如logging.INFO, logging.DEBUG)。默认为logging.INFO。 + logger_name (str, optional): 日志记录器的名称。默认为"app_loki_logger"。 + """ + # 从环境变量获取配置,如果未通过参数提供 + self._url = url if url else os.getenv("LOKI_URL", "https://loki.bwgdi.com/loki/api/v1/push") + self._username = username if username else os.getenv("LOKI_USERNAME",'admin') + self._password = password if password else os.getenv("LOKI_PASSWORD",'admin') + self._tags = tags if tags is not None else {"app": "jarvis", "env": "dev", "location": "gdi", "layer": "models"} + + # 获取或创建指定名称的日志记录器 + self._logger = logging.getLogger(logger_name) + self._logger.setLevel(level) + + for handler in self._logger.handlers[:]: + self._logger.removeHandler(handler) + # 检查是否已存在LokiHandler,避免重复添加导致重复日志 + # 在多文件或多次初始化的情况下,同一个logger_name可能会获取到同一个logger实例 + if not any(isinstance(h, logging_loki.LokiHandler) for h in self._logger.handlers): + try: + auth = None + if self._username and self._password: + auth = HTTPBasicAuth(self._username, self._password) + + loki_handler = logging_loki.LokiHandler( + url=self._url, + tags=self._tags, + version="1", # 通常Loki API版本为1 + # auth=auth + ) + self._logger.addHandler(loki_handler) + # 同时添加一个StreamHandler到控制台,以便在本地调试时也能看到日志输出 + if not any(isinstance(h, logging.StreamHandler) for h in self._logger.handlers): + self._logger.addHandler(logging.StreamHandler()) + self._logger.info(f"LokiLogger: 已成功配置Loki日志处理器,目标地址:{self._url}") + except Exception as e: + # 如果Loki配置失败,确保仍然有StreamHandler将日志输出到控制台 + if not any(isinstance(h, logging.StreamHandler) for h in self._logger.handlers): + self._logger.addHandler(logging.StreamHandler()) + self._logger.error(f"LokiLogger: 配置LokiHandler失败:{e}。将回退到控制台日志记录。", exc_info=True) + else: + # 如果LokiHandler已经存在,只确保StreamHandler存在 + if not any(isinstance(h, logging.StreamHandler) for h in self._logger.handlers): + self._logger.addHandler(logging.StreamHandler()) + self._logger.debug(f"LokiLogger: '{logger_name}' 记录器已配置LokiHandler,跳过重新配置。") + + + def get_logger(self) -> logging.Logger: + """ + 返回已配置的日志记录器实例。 + """ + self._logger.debug("LokiLogger: 获取已配置的日志记录器实例。") + return self._logger + +# 可选:如果希望有一个默认的全局LokiLogger实例 +# 你可以在这里实例化,然后在其他文件直接从这里导入 `loki_logger` +# 例如: +# DEFAULT_LOKI_LOGGER_INSTANCE = LokiLogger().get_logger() +# 然后在其他文件里: `from loki_config import DEFAULT_LOKI_LOGGER_INSTANCE as logger` From 50d4a38d6f0c6069a0163b7a2f94d8f93a6732e6 Mon Sep 17 00:00:00 2001 From: Ivan087 Date: Fri, 1 Aug 2025 17:39:00 +0800 Subject: [PATCH 2/2] fix loki for server --- README.md | 8 ++++++++ server.py | 37 ++++++++++++++++++++----------------- src/log/loki_config.py | 2 +- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 06a9b92..344e03b 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,14 @@ log: time_format: "%Y-%m-%d %H:%M:%S" filename: "D:/Workspace/Logging/jarvis/jarvis-models.log" +loki: + url: "https://loki.bwgdi.com/loki/api/v1/push" + labels: + app: jarvis + env: dev + location: "gdi" + layer: models + melotts: mode: local # or docker url: http://10.6.44.141:18080/convert/tts diff --git a/server.py b/server.py index 5dcca0d..5e46253 100644 --- a/server.py +++ b/server.py @@ -9,6 +9,7 @@ import logging from fastapi.middleware.cors import CORSMiddleware from injector import Injector +import yaml app = FastAPI() @@ -23,11 +24,18 @@ app.add_middleware( injector = Injector() blackbox_factory = injector.get(BlackboxFactory) +with open(".env.yaml", "r") as f: + config = yaml.safe_load(f) logger = LokiLogger( - tags={"application": "Server", "env": "Development", "source": "python-fastapi-app"}, + # url=config["loki"]["url"], + # username=config["loki"]["username"], + # password=config["loki"]["password"], + tags=config["loki"]["labels"], + logger_name=__name__, level=logging.DEBUG ).get_logger() + @app.exception_handler(Exception) async def catch_all_exceptions(request: Request, exc: Exception): """ @@ -48,37 +56,32 @@ async def catch_all_exceptions(request: Request, exc: Exception): content={"message": "Internal Server Error", "detail": "An unexpected error occurred."} ) -@app.get("/trigger-error") -async def trigger_error(): - """ - 这个端点会故意抛出一个异常来测试全局异常处理器。 - """ - logger.info("尝试触发错误...") - raise ValueError("这是一个测试错误,用于演示全局异常处理。") - @app.post("/") @app.get("/") async def blackbox(blackbox_name: Union[str, None] = None, request: Request = None): if not blackbox_name: - return await JSONResponse(content={"error": "blackbox_name is required"}, status_code=status.HTTP_400_BAD_REQUEST) + return JSONResponse(content={"error": "blackbox_name is required"}, status_code=status.HTTP_400_BAD_REQUEST) try: box = blackbox_factory.get_blackbox(blackbox_name) - except ValueError: - return await JSONResponse(content={"error": "value error"}, status_code=status.HTTP_400_BAD_REQUEST) + except ValueError as e: + logger.error(f"获取 blackbox 失败: {blackbox_name} - {e}", exc_info=True) + return JSONResponse(content={"error": "value error"}, status_code=status.HTTP_400_BAD_REQUEST) try: response = await box.fast_api_handler(request) # 检查响应的状态码,如果 >= 400,则认为是错误响应并记录 if response.status_code >= 400: - try: - response_content_bytes = await response.body() - decoded_content = response_content_bytes.decode('utf-8', errors='ignore') + try: + decoded_content = response.body.decode('utf-8', errors='ignore') logger.error(f"Blackbox 返回错误响应: URL={request.url}, Method={request.method}, Status={response.status_code}, Content={decoded_content}") except Exception as body_exc: logger.error(f"Blackbox {blackbox_name} 返回错误响应: URL={request.url}, Method={request.method}, Status={response.status_code}, 无法解析响应内容: {body_exc}") return response except Exception as e: - logger.error(f"Blackbox handler 内部抛出异常: URL={request.url}, Method={request.method}, 异常: {e}", exc_info=True) - raise e + logger.error(f"Blackbox 内部抛出异常: URL={request.url}, Method={request.method}, 异常报错: {type(e).__name__}: {e}", exc_info=True) + return JSONResponse( + status_code=500, + content={"message": "Internal Server Error", "detail": "An unexpected error occurred."} + ) # @app.get("/audio/{filename}") # async def serve_audio(filename: str): diff --git a/src/log/loki_config.py b/src/log/loki_config.py index 7d59112..121b4a0 100644 --- a/src/log/loki_config.py +++ b/src/log/loki_config.py @@ -69,7 +69,7 @@ class LokiLogger: url=self._url, tags=self._tags, version="1", # 通常Loki API版本为1 - # auth=auth + auth=auth ) self._logger.addHandler(loki_handler) # 同时添加一个StreamHandler到控制台,以便在本地调试时也能看到日志输出