feat: loki

This commit is contained in:
0Xiao0
2025-02-03 15:50:56 +08:00
parent 3119b19767
commit c74b9ae0d2
16 changed files with 744 additions and 11 deletions

22
main.py
View File

@ -2,9 +2,29 @@ import uvicorn
import logging
from injector import Injector,inject
from src.configuration import LogConf, singleton
from src.loki_logger_handler.loki_logger_handler import LokiLoggerHandler
import argparse
import yaml
with open(".env.yaml", "r") as file:
config = yaml.safe_load(file)
log_handler_confg = config["log"]["handlers"]
log_handlers = []
for handler_config in log_handler_confg:
if handler_config["type"] == "loki":
log_handlers.append(LokiLoggerHandler(
url=handler_config["url"],
additional_headers={"X-Odin-Auth": handler_config["x-odin-auth"]},
labels=handler_config["labels"],
label_keys={},
timeout=10,
enable_structured_loki_metadata=True,
loki_metadata={"service": "user-service", "version": "1.0.0"},
insecure_ssl_verify=False
))
logging.basicConfig(level=logging.DEBUG, handlers=log_handlers)
logging.info("logging init finish")
@singleton
class Main():

BIN
output.wav Normal file

Binary file not shown.

View File

@ -9,4 +9,7 @@ chromadb==0.5.0
langchain==0.1.17
langchain-community==0.0.36
sentence-transformers==2.7.0
requests>=2.27.1; python_version < '3.7'
requests>=2.31.0; python_version == '3.7'
requests>=2.32.3; python_version >= '3.8'
openai

View File

@ -0,0 +1,31 @@
TokenIDConverter:
token_path: ASR/resources/models/token_list.pkl
unk_symbol: <unk>
CharTokenizer:
symbol_value:
space_symbol: <space>
remove_non_linguistic_symbols: false
WavFrontend:
cmvn_file: ASR/resources/models/am.mvn
frontend_conf:
fs: 16000
window: hamming
n_mels: 80
frame_length: 25
frame_shift: 10
lfr_m: 7
lfr_n: 6
filter_length_max: -.inf
dither: 0.0
Model:
model_path: ASR/resources/models/model.onnx
use_cuda: false
CUDAExecutionProvider:
device_id: 0
arena_extend_strategy: kNextPowerOfTwo
cudnn_conv_algo_search: EXHAUSTIVE
do_copy_in_default_stream: true
batch_size: 3

View File

@ -49,7 +49,7 @@ class ASR(Blackbox):
config = read_yaml(".env.yaml")
self.paraformer = RapidParaformer(config)
model_dir = "/model/Voice/SenseVoice/SenseVoiceSmall"
model_dir = "/Workspace/Models/SenseVoice/SenseVoiceSmall"
self.speed = sensevoice_config.speed
self.device = sensevoice_config.device

View File

@ -100,7 +100,7 @@ class Chat(Blackbox):
#user_presence_penalty = 0.8
if user_model_url is None or user_model_url.isspace() or user_model_url == "":
user_model_url = "http://10.6.80.75:23333/v1/chat/completions"
user_model_url = "http://172.19.202.20:23333/v1/chat/completions"
if user_model_key is None or user_model_key.isspace() or user_model_key == "":
user_model_key = "YOUR_API_KEY"
@ -109,12 +109,12 @@ class Chat(Blackbox):
chroma_response = self.chroma_query(user_question, settings)
print("1.Chroma_response: \n", chroma_response)
if chroma_collection_id == 'boss':
user_prompt_template = "# 你的身份 #\n你是周家俊,澳门博维集团董事长。你擅长脑机接口回答。\n# OBJECTIVE目标 #\n回答游客的提问。\n# STYLE风格#\n成熟稳重,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n # 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与脑机接口澳门博维集团董事长周家俊G2ERELXBO VISION相关内容若遇到其他提问则回答“对不起我无法回答此问题哦。”"
elif chroma_collection_id == 'g2e':
user_prompt_template = "# 你的身份 #\n你是琪琪,你是康普可可的代言人,由博维开发。你擅长澳门文旅问答。\n# OBJECTIVE目标 #\n回答游客的提问。\n# STYLE风格#\n像少女一般开朗活泼,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n# 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与澳门文旅博维康普可可琪琪G2ERELXBO VISION相关内容若遇到其他提问则回答“对不起我无法回答此问题哦。”"
if chroma_response:
if chroma_collection_id == 'boss':
user_prompt_template = "# 你的身份 #\n你是周家俊,澳门博维集团董事长。你擅长脑机接口回答。\n# OBJECTIVE目标 #\n回答游客的提问。\n# STYLE风格#\n成熟稳重,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n # 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与脑机接口澳门博维集团董事长周家俊G2ERELXBO VISION相关内容若遇到其他提问则回答“对不起我无法回答此问题哦。”"
elif chroma_collection_id == 'g2e' or chroma_collection_id == 'kiki':
user_prompt_template = "# 你的身份 #\n你是琪琪,你是康普可可的代言人,由博维开发。你擅长澳门文旅问答。\n# OBJECTIVE目标 #\n回答游客的提问。\n# STYLE风格#\n像少女一般开朗活泼,回答简练。不要分条。\n# 回答方式 #\n首先自行判断下方问题与检索内容是否相关,若相关则根据检索内容总结概括相关信息进行回答;若检索内容与问题无关,则根据自身知识进行回答。\n# 回答 #\n如果检索内容与问题相关,则直接从检索内容中提炼出问题所需的信息。如果检索内容与问题不相关,则不参考检索内容,直接根据常识尝试回答问题,或者则回答:“对不起,我无法回答此问题哦。”\n# 回答限制 #\n回答内容限制总结在50字内。\n回答内容出不要出现“相关”等字眼,不要乱说或者多说,回答的内容需要与问题对应。常见的对话可以不采用检索内容,根据人物设定,直接进行回答。\n只回答与澳门文旅博维康普可可琪琪G2ERELXBO VISION相关内容若遇到其他提问则回答“对不起我无法回答此问题哦。”"
print(f"user_prompt_template: {type(user_prompt_template)}, user_question: {type(user_question)}, chroma_response: {type(chroma_response)}")
user_question = user_prompt_template + "问题: " + user_question + "。检索内容: " + chroma_response + ""
else:
user_question = user_prompt_template + "问题: " + user_question + ""

View File

@ -51,7 +51,7 @@ class Modelscope(Blackbox):
"question": query
}
}
url = "http://10.6.80.75:7003"
url = "http://172.19.202.20:7003"
response = requests.post(f"{url}/api/chroma_query", json=query_data)
result = response.json()['response']
return str({'result': f'Chroma ID为{id}的用户,查询结果为{response}'})

View File

@ -152,7 +152,7 @@ class VLMS(Blackbox):
# }
# ]
user_context = self.keep_last_k_images(user_context,k = 1)
user_context = self.keep_last_k_images(user_context,k = 3)
if self.model_url is None: self.model_url = self._get_model_url(model_name)
api_client = APIClient(self.model_url)

View File

@ -0,0 +1,94 @@
import traceback
import time
class LoggerFormatter:
"""
A custom formatter for log records that formats the log record into a structured dictionary.
"""
LOG_RECORD_FIELDS = {
"msg",
"levelname",
"msecs",
"name",
"pathname",
"filename",
"module",
"lineno",
"funcName",
"created",
"thread",
"threadName",
"process",
"processName",
"relativeCreated",
"stack_info",
"args",
"exc_info",
"levelno",
"exc_text",
}
def __init__(self):
pass
def format(self, record):
"""
Format a log record into a structured dictionary.
Args:
record (logging.LogRecord): The log record to format.
Returns:
dict: A dictionary representation of the log record.
"""
formatted = {
"message": record.getMessage(),
"timestamp": record.created,
"process": record.process,
"thread": record.thread,
"function": record.funcName,
"module": record.module,
"name": record.name,
"level": record.levelname,
}
# Capture any custom fields added to the log record
custom_fields = {
key: value for key, value in record.__dict__.items()
if key not in self.LOG_RECORD_FIELDS
}
loki_metadata = {}
for key in custom_fields:
if "loki_metadata" == key:
value = getattr(record, key)
if isinstance(value, dict):
loki_metadata = value
else:
formatted[key] = getattr(record, key)
# Check if the log level indicates an error (case-insensitive and can be partial)
if record.levelname.upper().startswith("ER"):
formatted["file"] = record.filename
formatted["path"] = record.pathname
formatted["line"] = record.lineno
formatted["stacktrace"] = self._format_stacktrace(record.exc_info)
return formatted, loki_metadata
@staticmethod
def _format_stacktrace(exc_info):
"""
Format the stacktrace if exc_info is present.
Args:
exc_info (tuple or None): Exception info tuple as returned by sys.exc_info().
Returns:
str or None: Formatted stacktrace as a string, or None if exc_info is not provided.
"""
if exc_info:
return "".join(traceback.format_exception(*exc_info))
return None

View File

@ -0,0 +1,94 @@
import traceback
import sys
class LoguruFormatter:
"""
A custom formatter for log records generated by Loguru, formatting the record into a structured dictionary.
"""
def __init__(self):
pass
def format(self, record):
"""
Format a Loguru log record into a structured dictionary.
Args:
record (dict): The Loguru log record to format.
Returns:
dict: A dictionary representation of the log record.
"""
# Convert timestamp to a standard format across Python versions
timestamp = record.get("time")
if hasattr(timestamp, "timestamp"):
# Python 3.x
timestamp = timestamp.timestamp()
else:
# Python 2.7: Convert datetime to a Unix timestamp
timestamp = (timestamp - timestamp.utcoffset()).total_seconds()
formatted = {
"message": record.get("message"),
"timestamp": timestamp,
"process": record.get("process").id,
"thread": record.get("thread").id,
"function": record.get("function"),
"module": record.get("module"),
"name": record.get("name"),
"level": record.get("level").name.upper(),
}
loki_metadata = {}
# Update with extra fields if available
extra = record.get("extra", {})
if isinstance(extra, dict):
# Handle the nested "extra" key correctly
if "extra" in extra and isinstance(extra["extra"], dict):
formatted.update(extra["extra"])
else:
formatted.update(extra)
loki_metadata = formatted.get("loki_metadata")
if loki_metadata:
if not isinstance(loki_metadata, dict):
loki_metadata = {}
del formatted["loki_metadata"]
# Check if the log level indicates an error (case-insensitive and can be partial)
if formatted["level"].startswith("ER"):
formatted["file"] = record.get("file").name
formatted["path"] = record.get("file").path
formatted["line"] = record.get("line")
self.add_exception_details(record, formatted)
return formatted, loki_metadata
def add_exception_details(self, record, formatted):
"""
Adds exception details to the formatted log record.
Args:
record (dict): The log record containing log information.
formatted (dict): The dictionary to which the formatted exception details will be added.
Notes:
- If the log record contains an exception, this method extracts the exception type,
value, and traceback, formats the traceback, and adds it to the 'stacktrace' key
in the formatted dictionary.
- Handles both Python 2.7 and Python 3.x versions for formatting exceptions.
"""
if record.get("exception"):
exc_type, exc_value, exc_traceback = record.get("exception")
if sys.version_info[0] == 2:
# Python 2.7: Use the older method for formatting exceptions
formatted_traceback = traceback.format_exception(
exc_type, exc_value, exc_traceback
)
else:
# Python 3.x: This is the same
formatted_traceback = traceback.format_exception(
exc_type, exc_value, exc_traceback
)
formatted["stacktrace"] = "".join(formatted_traceback)

View File

@ -0,0 +1,273 @@
# Compatibility for Python 2 and 3 queue module
try:
import queue # Python 3.x
except ImportError:
import Queue as queue # Python 2.7
import atexit
import logging
import threading
import requests
from .formatters.logger_formatter import LoggerFormatter
from .loki_request import LokiRequest
from .stream import Stream
from .streams import Streams
class LokiLoggerHandler(logging.Handler):
"""
A custom logging handler that sends logs to a Loki server.
"""
def __init__(
self,
url,
labels,
label_keys=None,
additional_headers=None,
message_in_json_format=True,
timeout=10,
compressed=True,
default_formatter=LoggerFormatter(),
enable_self_errors=False,
enable_structured_loki_metadata=False,
loki_metadata=None,
loki_metadata_keys=None,
insecure_ssl_verify=True
):
"""
Initialize the LokiLoggerHandler object.
Args:
url (str): The URL of the Loki server.
labels (dict): Default labels for the logs.
label_keys (dict, optional): Specific log record keys to extract as labels. Defaults to None.
additional_headers (dict, optional): Additional headers for the Loki request. Defaults to None.
message_in_json_format (bool): Whether to format log values as JSON.
timeout (int, optional): Timeout interval for flushing logs. Defaults to 10 seconds.
compressed (bool, optional): Whether to compress the logs using gzip. Defaults to True.
default_formatter (logging.Formatter, optional): Formatter for the log records. If not provided, LoggerFormatter or LoguruFormatter will be used.
enable_self_errors (bool, optional): Set to True to show Hanlder errors on console. Default False
enable_structured_loki_metadata (bool, optional): Whether to include structured loki_metadata in the logs. Defaults to False. Only supported for Loki 3.0 and above
loki_metadata (dict, optional): Default loki_metadata values. Defaults to None. Only supported for Loki 3.0 and above
loki_metadata_keys (arrray, optional): Specific log record keys to extract as loki_metadata. Only supported for Loki 3.0 and above
"""
super(LokiLoggerHandler, self).__init__()
self.labels = labels
self.label_keys = label_keys if label_keys is not None else {}
self.timeout = timeout
self.formatter = default_formatter
self.enable_self_errors = enable_self_errors
# Create a logger for self-errors if enabled
if self.enable_self_errors:
self.debug_logger = logging.getLogger("LokiHandlerDebug")
self.debug_logger.setLevel(logging.ERROR)
console_handler = logging.StreamHandler()
self.debug_logger.addHandler(console_handler)
self.request = LokiRequest(
url=url, compressed=compressed, additional_headers=additional_headers or {},
insecure_ssl_verify=insecure_ssl_verify
)
self.buffer = queue.Queue()
self.flush_thread = threading.Thread(target=self._flush)
self.flush_event = threading.Event()
# Set daemon for Python 2 and 3 compatibility
self.flush_thread.daemon = True
self.flush_thread.start()
self.message_in_json_format = message_in_json_format
# Halndler working with errors
self.error = False
self.enable_structured_loki_metadata = enable_structured_loki_metadata
self.loki_metadata = loki_metadata
self.loki_metadata_keys = loki_metadata_keys if loki_metadata_keys is not None else []
self.insecure_ssl_verify = insecure_ssl_verify
def emit(self, record):
"""
Emit a log record.
Args:
record (logging.LogRecord): The log record to be emitted.
"""
try:
formatted_record, log_loki_metadata = self.formatter.format(record)
self._put(formatted_record, log_loki_metadata)
except Exception as e:
self.handle_unexpected_error(e)
def _flush(self):
"""
Flush the buffer by sending the logs to the Loki server.
This function runs in a separate thread and periodically sends logs.
"""
atexit.register(self._send)
while True:
# Wait until flush_event is set or timeout elapses
self.flush_event.wait(timeout=self.timeout)
# Reset the event for the next cycle
self.flush_event.clear()
if not self.buffer.empty():
try:
self._send()
except Exception as e:
self.handle_unexpected_error(e)
def _send(self):
"""
Send the buffered logs to the Loki server.
"""
temp_streams = {}
while not self.buffer.empty():
log = self.buffer.get()
if log.key not in temp_streams:
stream = Stream(log.labels, self.loki_metadata,
self.message_in_json_format)
temp_streams[log.key] = stream
temp_streams[log.key].append_value(log.line, log.loki_metadata)
if temp_streams:
streams = Streams(list(temp_streams.values()))
try:
self.request.send(streams.serialize())
except requests.RequestException as e:
self.handle_unexpected_error(e)
def write(self, message):
"""
Write a message to the log.
Args:
message (str): The message to be logged.
"""
self.emit(message.record)
def _put(self, log_record, log_loki_metadata):
"""
Put a log record into the buffer.
Args:
log_record (dict): The formatted log record.
"""
labels = self.labels.copy()
self.assign_labels_from_log(log_record, labels)
if self.enable_structured_loki_metadata:
self.extract_and_clean_metadata(log_record, log_loki_metadata)
log_line = LogLine(labels, log_record, log_loki_metadata)
else:
log_line = LogLine(labels, log_record)
self.buffer.put(log_line)
def assign_labels_from_log(self, log_record, labels):
"""
This method iterates over the keys specified in `self.label_keys` and checks if each key is present in the `log_record`.
If a key is found in the `log_record`, it assigns the corresponding value from the `log_record` to the `labels` dictionary.
Args:
log_record (dict): The log record containing potential label keys and values.
labels (dict): The dictionary to which the labels will be assigned.
Returns:
None
"""
for key in self.label_keys:
if key in log_record:
labels[key] = log_record[key]
def extract_and_clean_metadata(self, log_record, log_loki_metadata):
"""
This method iterates over the keys defined in `self.loki_metadata_keys` and checks if they are present
in the `log_record`. If a key is found, it is added to the `log_loki_metadata` dictionary and marked
for deletion from the `log_record`. After collecting all keys to be deleted, it removes them from the
`log_record`.
Args:
log_record (dict): The original log record containing various log data.
log_loki_metadata (dict): The dictionary where extracted metadata will be stored.
Returns:
None
"""
keys_to_delete = []
for key in self.loki_metadata_keys:
if key in log_record:
log_loki_metadata[key] = log_record[key]
keys_to_delete.append(key)
for key in keys_to_delete:
del log_record[key]
def handle_unexpected_error(self, e):
"""
Handles unexpected errors by logging them and setting the error flag.
Args:
e (Exception): The exception that was raised.
Returns:
None
"""
if self.enable_self_errors:
self.debug_logger.error(
"Unexpected error: %s", e, exc_info=True)
self.error = True
class LogLine:
"""
Represents a single log line with associated labels.
Attributes:
labels (dict): Labels associated with the log line.
key (str): A unique key generated from the labels.
line (str): The actual log line content.
"""
def __init__(self, labels, line, loki_metadata=None):
"""
Initialize a LogLine object.
Args:
labels (dict): Labels associated with the log line.
line (str): The actual log line content.
"""
self.labels = labels
self.key = self._key_from_labels(labels)
self.line = line
self.loki_metadata = loki_metadata
@staticmethod
def _key_from_labels(labels):
"""
Generate a unique key from the labels values.
Args:
labels (dict): Labels to generate the key from.
Returns:
str: A unique key generated from the labels values.
"""
key_list = sorted(labels.values())
return "_".join(key_list)

View File

@ -0,0 +1,64 @@
import gzip
import requests
class LokiRequest:
"""
A class to send logs to a Loki server, with optional compression and custom headers.
Attributes:
url (str): The URL of the Loki server.
compressed (bool): Whether to compress the logs using gzip.
headers (dict): Additional headers to include in the request.
session (requests.Session): The session used for making HTTP requests.
"""
def __init__(self, url, compressed=False, additional_headers=None, insecure_ssl_verify=True):
"""
Initialize the LokiRequest object with the server URL, compression option, and additional headers.
Args:
url (str): The URL of the Loki server.
compressed (bool, optional): Whether to compress the logs using gzip. Defaults to False.
additional_headers (dict, optional): Additional headers to include in the request.
Defaults to an empty dictionary.
"""
self.url = url
self.compressed = compressed
self.headers = additional_headers if additional_headers is not None else {}
self.headers["Content-Type"] = "application/json"
self.session = requests.Session()
self.insecure_ssl_verify = insecure_ssl_verify
def send(self, data):
"""
Send the log data to the Loki server.
Args:
data (str): The log data to be sent.
Raises:
requests.RequestException: If the request fails.
"""
response = None
try:
if self.compressed:
self.headers["Content-Encoding"] = "gzip"
data = gzip.compress(data.encode("utf-8"))
response = self.session.post(self.url, data=data, headers=self.headers, verify=self.insecure_ssl_verify)
response.raise_for_status()
except requests.RequestException as e:
if response is not None:
response_message= f"Response status code: {response.status_code}, response text: {response.text}, post request URL: {response.request.url}"
raise requests.RequestException(f"Error while sending logs: {str(e)}\nCaptured error details:\n{response_message}") from e
raise requests.RequestException(f"Error while sending logs: {str(e)}") from e
finally:
if response:
response.close()

View File

@ -0,0 +1,95 @@
import time
import json
# Compatibility for Python 2 and 3
try:
from time import time_ns # Python 3.7+
except ImportError:
import datetime
def time_ns():
return int((time.time() + datetime.datetime.now().microsecond / 1e6) * 1e9)
class _StreamEncoder(json.JSONEncoder):
"""
A custom JSON encoder for the Stream class.
This is an internal class used to handle the serialization
of Stream objects into JSON format.
"""
def default(self, obj):
if isinstance(obj, Stream):
return obj.__dict__
return super(_StreamEncoder, self).default(obj)
class Stream(object):
"""
A class representing a data stream with associated labels and values.
Attributes:
stream (dict): A dictionary containing the labels for the stream.
values (list): A list of timestamped values associated with the stream.
message_in_json_format (bool): Whether to format log values as JSON.
"""
def __init__(self, labels=None, loki_metadata=None, message_in_json_format=True):
"""
Initialize a Stream object with optional labels.
Args:
labels (dict, optional): A dictionary of labels for the stream. Defaults to an empty dictionary.
"""
self.stream = labels or {}
self.values = []
self.message_in_json_format = message_in_json_format
if loki_metadata and not isinstance(loki_metadata, dict):
raise TypeError("loki_metadata must be a dictionary")
self.loki_metadata = loki_metadata
def add_label(self, key, value):
"""
Add a label to the stream.
Args:
key (str): The label's key.
value (str): The label's value.
"""
self.stream[key] = value
def append_value(self, value, metadata=None):
"""
Append a value to the stream with a timestamp.
Args:
value (dict): A dictionary representing the value to be appended.
It should contain a 'timestamp' key.
"""
try:
# Convert the timestamp to nanoseconds and ensure it's a string
timestamp = str(int(value.get("timestamp") * 1e9))
except (TypeError, ValueError):
# Fallback to the current time in nanoseconds if the timestamp is missing or invalid
timestamp = str(time_ns())
formatted_value = json.dumps(value, ensure_ascii=False) if self.message_in_json_format else value
if metadata or self.loki_metadata:
# Ensure both metadata and self.loki_metadata are dictionaries (default to empty dict if None)
metadata = metadata if metadata is not None else {}
loki_metadata = self.loki_metadata if self.loki_metadata is not None else {}
# Merge metadata into global metadata, override global values from log line metadata values
loki_metadata.update(metadata)
# Transform all non-string values to strings, Grafana Loki does not accept non str values
formatted_metadata = {key: str(value) for key, value in metadata.items()}
self.values.append([timestamp, formatted_value, formatted_metadata])
else:
self.values.append([timestamp, formatted_value])
def serialize(self):
"""
Serialize the Stream object to a JSON string.
Returns:
str: The JSON string representation of the Stream object.
"""
return json.dumps(self, cls=_StreamEncoder)

View File

@ -0,0 +1,59 @@
import json
class _LokiRequestEncoder(json.JSONEncoder):
"""
A custom JSON encoder for the Streams class.
This internal class is used to handle the serialization
of Streams objects into the JSON format expected by Loki.
"""
def default(self, o):
if isinstance(o, Streams):
# Convert the Streams object to a dictionary format suitable for Loki
return {"streams": [stream.__dict__ for stream in o.streams]}
# Use the default serialization method for other objects
return super(_LokiRequestEncoder, self).default(o)
class Streams(object): # Explicitly inherit from object for Python 2 compatibility
"""
A class representing a collection of Stream objects.
Attributes:
streams (list): A list of Stream objects.
"""
def __init__(self, streams=None):
"""
Initialize a Streams object with an optional list of Stream objects.
Args:
streams (list, optional): A list of Stream objects. Defaults to an empty list.
"""
self.streams = streams if streams is not None else []
def add_stream(self, stream):
"""
Add a single Stream object to the streams list.
Args:
stream (Stream): The Stream object to be added.
"""
self.streams.append(stream)
def set_streams(self, streams):
"""
Set the streams list to a new list of Stream objects.
Args:
streams (list): A list of Stream objects to replace the current streams.
"""
self.streams = streams
def serialize(self):
"""
Serialize the Streams object to a JSON string.
Returns:
str: The JSON string representation of the Streams object.
"""
return json.dumps(self, cls=_LokiRequestEncoder)