diff --git a/main.py b/main.py index 7cb9e87..3afe242 100644 --- a/main.py +++ b/main.py @@ -2,9 +2,29 @@ import uvicorn import logging from injector import Injector,inject from src.configuration import LogConf, singleton - +from src.loki_logger_handler.loki_logger_handler import LokiLoggerHandler import argparse +import yaml +with open(".env.yaml", "r") as file: + config = yaml.safe_load(file) + log_handler_confg = config["log"]["handlers"] + log_handlers = [] + for handler_config in log_handler_confg: + if handler_config["type"] == "loki": + log_handlers.append(LokiLoggerHandler( + url=handler_config["url"], + additional_headers={"X-Odin-Auth": handler_config["x-odin-auth"]}, + labels=handler_config["labels"], + label_keys={}, + timeout=10, + enable_structured_loki_metadata=True, + loki_metadata={"service": "user-service", "version": "1.0.0"}, + insecure_ssl_verify=False + )) + + logging.basicConfig(level=logging.DEBUG, handlers=log_handlers) + logging.info("logging init finish") @singleton class Main(): diff --git a/output.wav b/output.wav new file mode 100644 index 0000000..d759a07 Binary files /dev/null and b/output.wav differ diff --git a/requirements.txt b/requirements.txt index 58d7037..abbb8f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,7 @@ chromadb==0.5.0 langchain==0.1.17 langchain-community==0.0.36 sentence-transformers==2.7.0 +requests>=2.27.1; python_version < '3.7' +requests>=2.31.0; python_version == '3.7' +requests>=2.32.3; python_version >= '3.8' openai \ No newline at end of file diff --git a/src/asr/resources/config.yaml b/src/asr/resources/config.yaml new file mode 100644 index 0000000..8757326 --- /dev/null +++ b/src/asr/resources/config.yaml @@ -0,0 +1,31 @@ +TokenIDConverter: + token_path: ASR/resources/models/token_list.pkl + unk_symbol: + +CharTokenizer: + symbol_value: + space_symbol: + remove_non_linguistic_symbols: false + +WavFrontend: + cmvn_file: ASR/resources/models/am.mvn + frontend_conf: + fs: 16000 + window: hamming + n_mels: 80 + frame_length: 25 + frame_shift: 10 + lfr_m: 7 + lfr_n: 6 + filter_length_max: -.inf + dither: 0.0 + +Model: + model_path: ASR/resources/models/model.onnx + use_cuda: false + CUDAExecutionProvider: + device_id: 0 + arena_extend_strategy: kNextPowerOfTwo + cudnn_conv_algo_search: EXHAUSTIVE + do_copy_in_default_stream: true + batch_size: 3 \ No newline at end of file diff --git a/src/blackbox/asr.py b/src/blackbox/asr.py index 00d5ac6..608d58c 100644 --- a/src/blackbox/asr.py +++ b/src/blackbox/asr.py @@ -49,7 +49,7 @@ class ASR(Blackbox): config = read_yaml(".env.yaml") self.paraformer = RapidParaformer(config) - model_dir = "/model/Voice/SenseVoice/SenseVoiceSmall" + model_dir = "/Workspace/Models/SenseVoice/SenseVoiceSmall" self.speed = sensevoice_config.speed self.device = sensevoice_config.device diff --git a/src/blackbox/chat.py b/src/blackbox/chat.py index b7d2527..9c6d2f3 100644 --- a/src/blackbox/chat.py +++ b/src/blackbox/chat.py @@ -100,7 +100,7 @@ class Chat(Blackbox): #user_presence_penalty = 0.8 if user_model_url is None or user_model_url.isspace() or user_model_url == "": - user_model_url = "http://10.6.80.75:23333/v1/chat/completions" + user_model_url = "http://172.19.202.20:23333/v1/chat/completions" if user_model_key is None or user_model_key.isspace() or user_model_key == "": user_model_key = "YOUR_API_KEY" @@ -109,12 +109,12 @@ class Chat(Blackbox): chroma_response = self.chroma_query(user_question, settings) print("1.Chroma_response: \n", chroma_response) - if chroma_response: - if chroma_collection_id == 'boss': - user_prompt_template = "# 你的身份 #\n你是周家俊,澳门博维集团董事长。你擅长脑机接口回答。\n# OBJECTIVE(目标) #\n回答游客的提问。\n# STYLE(风格)#\n成熟稳重,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n # 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与脑机接口,澳门博维集团董事长,周家俊,G2E,RELX,BO VISION相关内容,若遇到其他提问则回答:“对不起,我无法回答此问题哦。”" - elif chroma_collection_id == 'g2e' or chroma_collection_id == 'kiki': - user_prompt_template = "# 你的身份 #\n你是琪琪,你是康普可可的代言人,由博维开发。你擅长澳门文旅问答。\n# OBJECTIVE(目标) #\n回答游客的提问。\n# STYLE(风格)#\n像少女一般开朗活泼,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n# 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与澳门文旅,博维,康普可可,琪琪,G2E,RELX,BO VISION相关内容,若遇到其他提问则回答:“对不起,我无法回答此问题哦。”" - print(f"user_prompt_template: {type(user_prompt_template)}, user_question: {type(user_question)}, chroma_response: {type(chroma_response)}") + if chroma_collection_id == 'boss': + user_prompt_template = "# 你的身份 #\n你是周家俊,澳门博维集团董事长。你擅长脑机接口回答。\n# OBJECTIVE(目标) #\n回答游客的提问。\n# STYLE(风格)#\n成熟稳重,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n # 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与脑机接口,澳门博维集团董事长,周家俊,G2E,RELX,BO VISION相关内容,若遇到其他提问则回答:“对不起,我无法回答此问题哦。”" + elif chroma_collection_id == 'g2e': + user_prompt_template = "# 你的身份 #\n你是琪琪,你是康普可可的代言人,由博维开发。你擅长澳门文旅问答。\n# OBJECTIVE(目标) #\n回答游客的提问。\n# STYLE(风格)#\n像少女一般开朗活泼,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n# 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与澳门文旅,博维,康普可可,琪琪,G2E,RELX,BO VISION相关内容,若遇到其他提问则回答:“对不起,我无法回答此问题哦。”" + + if chroma_response: user_question = user_prompt_template + "问题: " + user_question + "。检索内容: " + chroma_response + "。" else: user_question = user_prompt_template + "问题: " + user_question + "。" diff --git a/src/blackbox/modelscope.py b/src/blackbox/modelscope.py index 9e5e61d..c9a57a2 100755 --- a/src/blackbox/modelscope.py +++ b/src/blackbox/modelscope.py @@ -51,7 +51,7 @@ class Modelscope(Blackbox): "question": query } } - url = "http://10.6.80.75:7003" + url = "http://172.19.202.20:7003" response = requests.post(f"{url}/api/chroma_query", json=query_data) result = response.json()['response'] return str({'result': f'Chroma ID为{id}的用户,查询结果为{response}。'}) diff --git a/src/blackbox/vlms.py b/src/blackbox/vlms.py index a5b3370..24932ca 100644 --- a/src/blackbox/vlms.py +++ b/src/blackbox/vlms.py @@ -152,7 +152,7 @@ class VLMS(Blackbox): # } # ] - user_context = self.keep_last_k_images(user_context,k = 1) + user_context = self.keep_last_k_images(user_context,k = 3) if self.model_url is None: self.model_url = self._get_model_url(model_name) api_client = APIClient(self.model_url) diff --git a/= b/src/loki_logger_handler/__init__.py similarity index 100% rename from = rename to src/loki_logger_handler/__init__.py diff --git a/src/loki_logger_handler/formatters/__init__.py b/src/loki_logger_handler/formatters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/loki_logger_handler/formatters/logger_formatter.py b/src/loki_logger_handler/formatters/logger_formatter.py new file mode 100644 index 0000000..5f668f7 --- /dev/null +++ b/src/loki_logger_handler/formatters/logger_formatter.py @@ -0,0 +1,94 @@ +import traceback +import time + +class LoggerFormatter: + """ + A custom formatter for log records that formats the log record into a structured dictionary. + """ + LOG_RECORD_FIELDS = { + "msg", + "levelname", + "msecs", + "name", + "pathname", + "filename", + "module", + "lineno", + "funcName", + "created", + "thread", + "threadName", + "process", + "processName", + "relativeCreated", + "stack_info", + "args", + "exc_info", + "levelno", + "exc_text", + } + + def __init__(self): + pass + + def format(self, record): + """ + Format a log record into a structured dictionary. + + Args: + record (logging.LogRecord): The log record to format. + + Returns: + dict: A dictionary representation of the log record. + """ + + formatted = { + "message": record.getMessage(), + "timestamp": record.created, + "process": record.process, + "thread": record.thread, + "function": record.funcName, + "module": record.module, + "name": record.name, + "level": record.levelname, + } + + # Capture any custom fields added to the log record + custom_fields = { + key: value for key, value in record.__dict__.items() + if key not in self.LOG_RECORD_FIELDS + } + + loki_metadata = {} + + for key in custom_fields: + if "loki_metadata" == key: + value = getattr(record, key) + if isinstance(value, dict): + loki_metadata = value + else: + formatted[key] = getattr(record, key) + + # Check if the log level indicates an error (case-insensitive and can be partial) + if record.levelname.upper().startswith("ER"): + formatted["file"] = record.filename + formatted["path"] = record.pathname + formatted["line"] = record.lineno + formatted["stacktrace"] = self._format_stacktrace(record.exc_info) + + return formatted, loki_metadata + + @staticmethod + def _format_stacktrace(exc_info): + """ + Format the stacktrace if exc_info is present. + + Args: + exc_info (tuple or None): Exception info tuple as returned by sys.exc_info(). + + Returns: + str or None: Formatted stacktrace as a string, or None if exc_info is not provided. + """ + if exc_info: + return "".join(traceback.format_exception(*exc_info)) + return None diff --git a/src/loki_logger_handler/formatters/loguru_formatter.py b/src/loki_logger_handler/formatters/loguru_formatter.py new file mode 100644 index 0000000..354edae --- /dev/null +++ b/src/loki_logger_handler/formatters/loguru_formatter.py @@ -0,0 +1,94 @@ +import traceback +import sys + + +class LoguruFormatter: + """ + A custom formatter for log records generated by Loguru, formatting the record into a structured dictionary. + """ + def __init__(self): + pass + + def format(self, record): + """ + Format a Loguru log record into a structured dictionary. + + Args: + record (dict): The Loguru log record to format. + + Returns: + dict: A dictionary representation of the log record. + """ + # Convert timestamp to a standard format across Python versions + timestamp = record.get("time") + if hasattr(timestamp, "timestamp"): + # Python 3.x + timestamp = timestamp.timestamp() + else: + # Python 2.7: Convert datetime to a Unix timestamp + timestamp = (timestamp - timestamp.utcoffset()).total_seconds() + + formatted = { + "message": record.get("message"), + "timestamp": timestamp, + "process": record.get("process").id, + "thread": record.get("thread").id, + "function": record.get("function"), + "module": record.get("module"), + "name": record.get("name"), + "level": record.get("level").name.upper(), + } + + loki_metadata = {} + # Update with extra fields if available + extra = record.get("extra", {}) + if isinstance(extra, dict): + # Handle the nested "extra" key correctly + if "extra" in extra and isinstance(extra["extra"], dict): + formatted.update(extra["extra"]) + else: + formatted.update(extra) + + loki_metadata = formatted.get("loki_metadata") + if loki_metadata: + if not isinstance(loki_metadata, dict): + loki_metadata = {} + del formatted["loki_metadata"] + + # Check if the log level indicates an error (case-insensitive and can be partial) + if formatted["level"].startswith("ER"): + formatted["file"] = record.get("file").name + formatted["path"] = record.get("file").path + formatted["line"] = record.get("line") + + self.add_exception_details(record, formatted) + + return formatted, loki_metadata + + def add_exception_details(self, record, formatted): + """ + Adds exception details to the formatted log record. + + Args: + record (dict): The log record containing log information. + formatted (dict): The dictionary to which the formatted exception details will be added. + + Notes: + - If the log record contains an exception, this method extracts the exception type, + value, and traceback, formats the traceback, and adds it to the 'stacktrace' key + in the formatted dictionary. + - Handles both Python 2.7 and Python 3.x versions for formatting exceptions. + """ + if record.get("exception"): + exc_type, exc_value, exc_traceback = record.get("exception") + if sys.version_info[0] == 2: + # Python 2.7: Use the older method for formatting exceptions + formatted_traceback = traceback.format_exception( + exc_type, exc_value, exc_traceback + ) + else: + # Python 3.x: This is the same + formatted_traceback = traceback.format_exception( + exc_type, exc_value, exc_traceback + ) + formatted["stacktrace"] = "".join(formatted_traceback) diff --git a/src/loki_logger_handler/loki_logger_handler.py b/src/loki_logger_handler/loki_logger_handler.py new file mode 100644 index 0000000..a0ac819 --- /dev/null +++ b/src/loki_logger_handler/loki_logger_handler.py @@ -0,0 +1,273 @@ +# Compatibility for Python 2 and 3 queue module +try: + import queue # Python 3.x +except ImportError: + import Queue as queue # Python 2.7 + +import atexit +import logging +import threading +import requests + +from .formatters.logger_formatter import LoggerFormatter +from .loki_request import LokiRequest +from .stream import Stream +from .streams import Streams + + +class LokiLoggerHandler(logging.Handler): + """ + A custom logging handler that sends logs to a Loki server. + """ + + def __init__( + self, + url, + labels, + label_keys=None, + additional_headers=None, + message_in_json_format=True, + timeout=10, + compressed=True, + default_formatter=LoggerFormatter(), + enable_self_errors=False, + enable_structured_loki_metadata=False, + loki_metadata=None, + loki_metadata_keys=None, + insecure_ssl_verify=True + ): + """ + Initialize the LokiLoggerHandler object. + + Args: + url (str): The URL of the Loki server. + labels (dict): Default labels for the logs. + label_keys (dict, optional): Specific log record keys to extract as labels. Defaults to None. + additional_headers (dict, optional): Additional headers for the Loki request. Defaults to None. + message_in_json_format (bool): Whether to format log values as JSON. + timeout (int, optional): Timeout interval for flushing logs. Defaults to 10 seconds. + compressed (bool, optional): Whether to compress the logs using gzip. Defaults to True. + default_formatter (logging.Formatter, optional): Formatter for the log records. If not provided, LoggerFormatter or LoguruFormatter will be used. + enable_self_errors (bool, optional): Set to True to show Hanlder errors on console. Default False + enable_structured_loki_metadata (bool, optional): Whether to include structured loki_metadata in the logs. Defaults to False. Only supported for Loki 3.0 and above + loki_metadata (dict, optional): Default loki_metadata values. Defaults to None. Only supported for Loki 3.0 and above + loki_metadata_keys (arrray, optional): Specific log record keys to extract as loki_metadata. Only supported for Loki 3.0 and above + """ + super(LokiLoggerHandler, self).__init__() + + self.labels = labels + self.label_keys = label_keys if label_keys is not None else {} + self.timeout = timeout + self.formatter = default_formatter + + self.enable_self_errors = enable_self_errors + + # Create a logger for self-errors if enabled + if self.enable_self_errors: + self.debug_logger = logging.getLogger("LokiHandlerDebug") + self.debug_logger.setLevel(logging.ERROR) + console_handler = logging.StreamHandler() + self.debug_logger.addHandler(console_handler) + + self.request = LokiRequest( + url=url, compressed=compressed, additional_headers=additional_headers or {}, + insecure_ssl_verify=insecure_ssl_verify + ) + + self.buffer = queue.Queue() + self.flush_thread = threading.Thread(target=self._flush) + + self.flush_event = threading.Event() + + # Set daemon for Python 2 and 3 compatibility + self.flush_thread.daemon = True + self.flush_thread.start() + + self.message_in_json_format = message_in_json_format + + # Halndler working with errors + self.error = False + self.enable_structured_loki_metadata = enable_structured_loki_metadata + self.loki_metadata = loki_metadata + self.loki_metadata_keys = loki_metadata_keys if loki_metadata_keys is not None else [] + self.insecure_ssl_verify = insecure_ssl_verify + + def emit(self, record): + """ + Emit a log record. + + Args: + record (logging.LogRecord): The log record to be emitted. + """ + try: + formatted_record, log_loki_metadata = self.formatter.format(record) + self._put(formatted_record, log_loki_metadata) + except Exception as e: + self.handle_unexpected_error(e) + + def _flush(self): + """ + Flush the buffer by sending the logs to the Loki server. + This function runs in a separate thread and periodically sends logs. + """ + atexit.register(self._send) + + while True: + + # Wait until flush_event is set or timeout elapses + self.flush_event.wait(timeout=self.timeout) + + # Reset the event for the next cycle + self.flush_event.clear() + + if not self.buffer.empty(): + try: + self._send() + except Exception as e: + self.handle_unexpected_error(e) + + + + def _send(self): + """ + Send the buffered logs to the Loki server. + """ + temp_streams = {} + + while not self.buffer.empty(): + log = self.buffer.get() + if log.key not in temp_streams: + stream = Stream(log.labels, self.loki_metadata, + self.message_in_json_format) + temp_streams[log.key] = stream + + temp_streams[log.key].append_value(log.line, log.loki_metadata) + + if temp_streams: + streams = Streams(list(temp_streams.values())) + try: + self.request.send(streams.serialize()) + except requests.RequestException as e: + self.handle_unexpected_error(e) + + + def write(self, message): + """ + Write a message to the log. + + Args: + message (str): The message to be logged. + """ + self.emit(message.record) + + def _put(self, log_record, log_loki_metadata): + """ + Put a log record into the buffer. + + Args: + log_record (dict): The formatted log record. + """ + labels = self.labels.copy() + + self.assign_labels_from_log(log_record, labels) + + if self.enable_structured_loki_metadata: + self.extract_and_clean_metadata(log_record, log_loki_metadata) + + log_line = LogLine(labels, log_record, log_loki_metadata) + else: + log_line = LogLine(labels, log_record) + + self.buffer.put(log_line) + + def assign_labels_from_log(self, log_record, labels): + """ + This method iterates over the keys specified in `self.label_keys` and checks if each key is present in the `log_record`. + If a key is found in the `log_record`, it assigns the corresponding value from the `log_record` to the `labels` dictionary. + + Args: + log_record (dict): The log record containing potential label keys and values. + labels (dict): The dictionary to which the labels will be assigned. + + Returns: + None + """ + for key in self.label_keys: + if key in log_record: + labels[key] = log_record[key] + + def extract_and_clean_metadata(self, log_record, log_loki_metadata): + """ + This method iterates over the keys defined in `self.loki_metadata_keys` and checks if they are present + in the `log_record`. If a key is found, it is added to the `log_loki_metadata` dictionary and marked + for deletion from the `log_record`. After collecting all keys to be deleted, it removes them from the + `log_record`. + + Args: + log_record (dict): The original log record containing various log data. + log_loki_metadata (dict): The dictionary where extracted metadata will be stored. + + Returns: + None + """ + keys_to_delete = [] + for key in self.loki_metadata_keys: + if key in log_record: + log_loki_metadata[key] = log_record[key] + keys_to_delete.append(key) + + for key in keys_to_delete: + del log_record[key] + + + def handle_unexpected_error(self, e): + """ + Handles unexpected errors by logging them and setting the error flag. + + Args: + e (Exception): The exception that was raised. + + Returns: + None + """ + if self.enable_self_errors: + self.debug_logger.error( + "Unexpected error: %s", e, exc_info=True) + self.error = True + +class LogLine: + """ + Represents a single log line with associated labels. + + Attributes: + labels (dict): Labels associated with the log line. + key (str): A unique key generated from the labels. + line (str): The actual log line content. + """ + + def __init__(self, labels, line, loki_metadata=None): + """ + Initialize a LogLine object. + + Args: + labels (dict): Labels associated with the log line. + line (str): The actual log line content. + """ + self.labels = labels + self.key = self._key_from_labels(labels) + self.line = line + self.loki_metadata = loki_metadata + + @staticmethod + def _key_from_labels(labels): + """ + Generate a unique key from the labels values. + + Args: + labels (dict): Labels to generate the key from. + + Returns: + str: A unique key generated from the labels values. + """ + key_list = sorted(labels.values()) + return "_".join(key_list) diff --git a/src/loki_logger_handler/loki_request.py b/src/loki_logger_handler/loki_request.py new file mode 100644 index 0000000..cfa5f51 --- /dev/null +++ b/src/loki_logger_handler/loki_request.py @@ -0,0 +1,64 @@ +import gzip +import requests + + + +class LokiRequest: + """ + A class to send logs to a Loki server, with optional compression and custom headers. + + Attributes: + url (str): The URL of the Loki server. + compressed (bool): Whether to compress the logs using gzip. + headers (dict): Additional headers to include in the request. + session (requests.Session): The session used for making HTTP requests. + """ + + def __init__(self, url, compressed=False, additional_headers=None, insecure_ssl_verify=True): + """ + Initialize the LokiRequest object with the server URL, compression option, and additional headers. + + Args: + url (str): The URL of the Loki server. + compressed (bool, optional): Whether to compress the logs using gzip. Defaults to False. + additional_headers (dict, optional): Additional headers to include in the request. + Defaults to an empty dictionary. + """ + self.url = url + self.compressed = compressed + self.headers = additional_headers if additional_headers is not None else {} + self.headers["Content-Type"] = "application/json" + self.session = requests.Session() + self.insecure_ssl_verify = insecure_ssl_verify + + def send(self, data): + """ + Send the log data to the Loki server. + + Args: + data (str): The log data to be sent. + + Raises: + requests.RequestException: If the request fails. + """ + response = None + try: + if self.compressed: + self.headers["Content-Encoding"] = "gzip" + data = gzip.compress(data.encode("utf-8")) + + response = self.session.post(self.url, data=data, headers=self.headers, verify=self.insecure_ssl_verify) + response.raise_for_status() + + except requests.RequestException as e: + + if response is not None: + response_message= f"Response status code: {response.status_code}, response text: {response.text}, post request URL: {response.request.url}" + raise requests.RequestException(f"Error while sending logs: {str(e)}\nCaptured error details:\n{response_message}") from e + + raise requests.RequestException(f"Error while sending logs: {str(e)}") from e + + + finally: + if response: + response.close() diff --git a/src/loki_logger_handler/stream.py b/src/loki_logger_handler/stream.py new file mode 100644 index 0000000..c939908 --- /dev/null +++ b/src/loki_logger_handler/stream.py @@ -0,0 +1,95 @@ +import time +import json + +# Compatibility for Python 2 and 3 +try: + from time import time_ns # Python 3.7+ +except ImportError: + import datetime + def time_ns(): + return int((time.time() + datetime.datetime.now().microsecond / 1e6) * 1e9) + + +class _StreamEncoder(json.JSONEncoder): + """ + A custom JSON encoder for the Stream class. + This is an internal class used to handle the serialization + of Stream objects into JSON format. + """ + def default(self, obj): + if isinstance(obj, Stream): + return obj.__dict__ + return super(_StreamEncoder, self).default(obj) + + +class Stream(object): + """ + A class representing a data stream with associated labels and values. + + Attributes: + stream (dict): A dictionary containing the labels for the stream. + values (list): A list of timestamped values associated with the stream. + message_in_json_format (bool): Whether to format log values as JSON. + """ + def __init__(self, labels=None, loki_metadata=None, message_in_json_format=True): + """ + Initialize a Stream object with optional labels. + + Args: + labels (dict, optional): A dictionary of labels for the stream. Defaults to an empty dictionary. + """ + self.stream = labels or {} + self.values = [] + self.message_in_json_format = message_in_json_format + if loki_metadata and not isinstance(loki_metadata, dict): + raise TypeError("loki_metadata must be a dictionary") + self.loki_metadata = loki_metadata + + def add_label(self, key, value): + """ + Add a label to the stream. + + Args: + key (str): The label's key. + value (str): The label's value. + """ + self.stream[key] = value + + def append_value(self, value, metadata=None): + """ + Append a value to the stream with a timestamp. + + Args: + value (dict): A dictionary representing the value to be appended. + It should contain a 'timestamp' key. + """ + try: + # Convert the timestamp to nanoseconds and ensure it's a string + timestamp = str(int(value.get("timestamp") * 1e9)) + except (TypeError, ValueError): + # Fallback to the current time in nanoseconds if the timestamp is missing or invalid + timestamp = str(time_ns()) + + formatted_value = json.dumps(value, ensure_ascii=False) if self.message_in_json_format else value + if metadata or self.loki_metadata: + # Ensure both metadata and self.loki_metadata are dictionaries (default to empty dict if None) + metadata = metadata if metadata is not None else {} + loki_metadata = self.loki_metadata if self.loki_metadata is not None else {} + + # Merge metadata into global metadata, override global values from log line metadata values + loki_metadata.update(metadata) + + # Transform all non-string values to strings, Grafana Loki does not accept non str values + formatted_metadata = {key: str(value) for key, value in metadata.items()} + self.values.append([timestamp, formatted_value, formatted_metadata]) + else: + self.values.append([timestamp, formatted_value]) + + def serialize(self): + """ + Serialize the Stream object to a JSON string. + + Returns: + str: The JSON string representation of the Stream object. + """ + return json.dumps(self, cls=_StreamEncoder) diff --git a/src/loki_logger_handler/streams.py b/src/loki_logger_handler/streams.py new file mode 100644 index 0000000..5c93a8d --- /dev/null +++ b/src/loki_logger_handler/streams.py @@ -0,0 +1,59 @@ +import json + + +class _LokiRequestEncoder(json.JSONEncoder): + """ + A custom JSON encoder for the Streams class. + This internal class is used to handle the serialization + of Streams objects into the JSON format expected by Loki. + """ + def default(self, o): + if isinstance(o, Streams): + # Convert the Streams object to a dictionary format suitable for Loki + return {"streams": [stream.__dict__ for stream in o.streams]} + # Use the default serialization method for other objects + return super(_LokiRequestEncoder, self).default(o) + + +class Streams(object): # Explicitly inherit from object for Python 2 compatibility + """ + A class representing a collection of Stream objects. + + Attributes: + streams (list): A list of Stream objects. + """ + def __init__(self, streams=None): + """ + Initialize a Streams object with an optional list of Stream objects. + + Args: + streams (list, optional): A list of Stream objects. Defaults to an empty list. + """ + self.streams = streams if streams is not None else [] + + def add_stream(self, stream): + """ + Add a single Stream object to the streams list. + + Args: + stream (Stream): The Stream object to be added. + """ + self.streams.append(stream) + + def set_streams(self, streams): + """ + Set the streams list to a new list of Stream objects. + + Args: + streams (list): A list of Stream objects to replace the current streams. + """ + self.streams = streams + + def serialize(self): + """ + Serialize the Streams object to a JSON string. + + Returns: + str: The JSON string representation of the Streams object. + """ + return json.dumps(self, cls=_LokiRequestEncoder)