Files
memory-gateway/memory_gateway/services_v2.py

971 lines
39 KiB
Python

"""Service orchestration for the Memory Gateway v2 control plane."""
from __future__ import annotations
import hashlib
from datetime import datetime, timedelta, timezone
from typing import Any, Awaitable, Callable
from uuid import uuid4
from fastapi import HTTPException, status
from .backend_contracts import (
BackendOperation,
BackendCommitResult,
BackendProducedRef,
BackendResultStatus,
BackendWriteResult,
CommitJob,
OutboxEvent,
OutboxEventStatus,
)
from .evermemos_client import EverMemOSClient
from .openviking_client import get_openviking_client
from .repositories import MetadataRepository, repository
from .schemas import AuditLog
from .schemas_v2 import (
BackendRefStatus,
BackendType,
CommitJobView,
CommitRequest,
CommitResponse,
ContextItem,
FeedbackRequest,
FeedbackResponse,
IngestRequest,
IngestResponse,
MemoryRef,
MemoryRefView,
MemoryRefType,
OperationStatus,
OutboxProcessResponse,
OutboxSummary,
RetrieveRequest,
RetrieveResponse,
)
OpenVikingClientFactory = Callable[[], Awaitable[Any]]
class MemoryGatewayV2Service:
def __init__(
self,
repo: MetadataRepository = repository,
openviking_client_factory: OpenVikingClientFactory = get_openviking_client,
evermemos_client: Any | None = None,
) -> None:
self.repo = repo
self.openviking_client_factory = openviking_client_factory
self.evermemos_client = evermemos_client
async def ingest_conversation_turn(self, request: IngestRequest) -> IngestResponse:
normalized = self._normalize_ingest_request(request)
gateway_id = self._build_gateway_id(normalized)
provenance_id = self._build_provenance_id(normalized, gateway_id)
content_hash = self._content_hash(normalized.content)
self._check_namespace_access(normalized)
payload = self._apply_safety_policy(normalized)
refs: list[MemoryRef] = []
if normalized.policy.allow_openviking:
refs.append(
await self._write_openviking_turn(
normalized,
payload,
gateway_id=gateway_id,
provenance_id=provenance_id,
content_hash=content_hash,
)
)
else:
refs.append(
self._save_ref(
normalized,
gateway_id,
provenance_id,
BackendType.OPENVIKING,
MemoryRefType.SESSION_ARCHIVE,
BackendRefStatus.SKIPPED,
content_hash=content_hash,
metadata=self._control_metadata(normalized, content_hash, {"reason": "policy_disabled"}),
)
)
if normalized.policy.allow_evermemos:
refs.append(
await self._write_evermemos_message(
normalized,
payload,
gateway_id=gateway_id,
provenance_id=provenance_id,
content_hash=content_hash,
)
)
else:
refs.append(
self._save_ref(
normalized,
gateway_id,
provenance_id,
BackendType.EVERMEMOS,
MemoryRefType.MESSAGE_MEMORY,
BackendRefStatus.SKIPPED,
content_hash=content_hash,
metadata=self._control_metadata(normalized, content_hash, {"reason": "policy_disabled"}),
)
)
status_value = self._aggregate_ref_status(refs)
errors = [ref.error_message for ref in refs if ref.error_message]
self.repo.add_audit(
AuditLog(
actor_user_id=normalized.user_id,
actor_agent_id=normalized.agent_id,
action="v2_ingest_conversation_turn",
target_type="conversation_turn",
target_id=normalized.turn_id,
namespace=normalized.namespace,
metadata={
"gateway_id": gateway_id,
"provenance_id": provenance_id,
"idempotency_basis": self._idempotency_basis(normalized),
"content_hash": content_hash,
"status": status_value.value,
"source_type": normalized.source_type,
"source_event_id": normalized.source_event_id,
},
)
)
return IngestResponse(
status=status_value,
gateway_id=gateway_id,
provenance_id=provenance_id,
request_id=normalized.request_id,
turn_id=normalized.turn_id,
refs=self._view_refs(refs),
errors=errors,
metadata={"backend_count": len(refs)},
)
async def commit_session(self, session_id: str, request: CommitRequest) -> CommitResponse:
# TODO(v2): add a worker that consumes these outbox events and writes
# resulting backend refs. This method intentionally only records
# control-plane intent.
job_id = f"job_{uuid4().hex[:16]}"
gateway_id = self._commit_gateway_id(session_id, request)
job = CommitJob(
job_id=job_id,
workspace_id=request.workspace_id,
user_id=request.user_id,
agent_id=request.agent_id,
session_id=session_id,
namespace=request.namespace,
requested_by=request.user_id,
)
self.repo.save_commit_job(job)
self._create_commit_outbox_events(gateway_id, job, request)
self.repo.add_audit(
AuditLog(
actor_user_id=request.user_id,
actor_agent_id=request.agent_id,
action="v2_commit_session_accepted",
target_type="session",
target_id=session_id,
namespace=request.namespace,
metadata={
"job_id": job_id,
"gateway_id": gateway_id,
"workspace_id": request.workspace_id,
"outbox_events": 2,
},
)
)
return CommitResponse(
job_id=job_id,
session_id=session_id,
metadata={"gateway_id": gateway_id, "outbox_events": 2},
)
async def retrieve_context(self, request: RetrieveRequest) -> RetrieveResponse:
# TODO(v2): expand namespace ACL, fan out concurrently to OpenViking and
# EverMemOS, then apply lightweight merge/rerank before returning.
refs = self.repo.list_memory_refs(
workspace_id=request.workspace_id,
user_id=request.user_id,
agent_id=request.agent_id,
session_id=request.session_id,
namespace=request.namespace,
limit=request.limit,
)
items = [
ContextItem(
text=None,
source_backend=ref.backend_type,
ref_id=ref.id,
score=0.0,
memory_type=ref.ref_type.value,
metadata={
"status": ref.status.value,
"native_id": ref.native_id,
"native_uri": ref.native_uri,
},
)
for ref in refs
]
trace_id = request.metadata.get("trace_id") if request.metadata else None
return RetrieveResponse(
status=OperationStatus.SUCCESS,
items=items,
refs=self._view_refs(refs),
conflicts=[],
trace_id=trace_id,
metadata={"skeleton": True},
)
async def record_memory_feedback(self, request: FeedbackRequest) -> FeedbackResponse:
# TODO(v2): persist review/feedback state in a dedicated table and route
# accepted corrections back to the owning backend adapter.
feedback_id = f"fb_{uuid4().hex[:16]}"
self.repo.add_audit(
AuditLog(
actor_user_id=request.user_id,
actor_agent_id=request.agent_id,
action=f"v2_feedback:{request.feedback_type}",
target_type="memory_ref",
target_id=request.memory_ref_id,
namespace=request.namespace,
metadata={
"feedback_id": feedback_id,
"workspace_id": request.workspace_id,
"comment": request.comment,
"source_type": request.source_type,
"source_event_id": request.source_event_id,
},
)
)
return FeedbackResponse(
status=OperationStatus.ACCEPTED,
feedback_id=feedback_id,
memory_ref_id=request.memory_ref_id,
)
async def process_pending_outbox_events(
self,
limit: int = 100,
worker_id: str | None = None,
lease_seconds: int = 300,
) -> list[OutboxEvent]:
worker_id = worker_id or f"worker_{uuid4().hex[:12]}"
self.repo.release_expired_processing_events()
events = self.repo.claim_pending_outbox_events(
limit=limit,
worker_id=worker_id,
lease_seconds=lease_seconds,
)
processed: list[OutboxEvent] = []
for event in events:
try:
result = await self._execute_outbox_event(event)
except Exception as exc: # noqa: BLE001
result = BackendCommitResult(
backend_type=event.backend_type,
operation=event.operation,
status=BackendResultStatus.FAILED,
retryable=True,
error_code="adapter_exception",
error_message=str(exc),
)
processed.append(self._apply_outbox_result(event, result))
return processed
async def process_pending_outbox_events_summary(
self,
limit: int = 100,
worker_id: str | None = None,
lease_seconds: int = 300,
) -> OutboxProcessResponse:
worker_id = worker_id or f"worker_{uuid4().hex[:12]}"
processed = await self.process_pending_outbox_events(
limit=limit,
worker_id=worker_id,
lease_seconds=lease_seconds,
)
return OutboxProcessResponse(
status=OperationStatus.SUCCESS,
worker_id=worker_id,
processed_count=len(processed),
outbox_summary=self._outbox_summary(),
)
async def process_commit_job(self, job_id: str) -> CommitJob:
job = self.repo.get_commit_job(job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Commit job not found")
self.repo.update_commit_job_status(job_id, OperationStatus.RUNNING.value)
self.repo.release_expired_processing_events()
events = self.repo.list_outbox_events_by_job(job_id)
for event in events:
if event.status == OutboxEventStatus.PENDING:
await self.process_outbox_event(event.id)
events = self.repo.list_outbox_events_by_job(job_id)
final_status = self._aggregate_commit_job_status(events)
created_refs_count = self.repo.count_memory_refs(session_id=job.session_id, status=BackendRefStatus.SUCCESS)
error_message = self._commit_job_error_message(events)
updated = self.repo.update_commit_job_status(
job_id,
final_status.value,
error_message=error_message,
created_refs_count=created_refs_count,
)
if not updated:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Commit job not found")
return updated
def get_commit_job_view(self, job_id: str) -> CommitJobView:
job = self.repo.get_commit_job(job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Commit job not found")
return CommitJobView(
job_id=job.job_id,
workspace_id=job.workspace_id,
user_id=job.user_id,
agent_id=job.agent_id,
session_id=job.session_id,
namespace=job.namespace,
status=job.status,
created_refs_count=job.created_refs_count,
error_message=job.error_message,
created_at=job.created_at,
updated_at=job.updated_at,
started_at=job.started_at,
finished_at=job.finished_at,
outbox_summary=self._outbox_summary(self.repo.list_outbox_events_by_job(job_id)),
)
async def process_outbox_event(self, event_id: str) -> OutboxEvent | None:
event = self.repo.claim_outbox_event(event_id)
if not event:
return None
try:
result = await self._execute_outbox_event(event)
except Exception as exc: # noqa: BLE001
result = BackendCommitResult(
backend_type=event.backend_type,
operation=event.operation,
status=BackendResultStatus.FAILED,
retryable=True,
error_code="adapter_exception",
error_message=str(exc),
)
return self._apply_outbox_result(event, result)
def list_memory_refs(
self,
workspace_id: str | None = None,
user_id: str | None = None,
agent_id: str | None = None,
session_id: str | None = None,
namespace: str | None = None,
backend_type: BackendType | str | None = None,
ref_type: MemoryRefType | str | None = None,
status: BackendRefStatus | str | None = None,
limit: int = 100,
) -> list[MemoryRef]:
return self.repo.list_memory_refs(
workspace_id=workspace_id,
user_id=user_id,
agent_id=agent_id,
session_id=session_id,
namespace=namespace,
backend_type=backend_type,
ref_type=ref_type,
status=status,
limit=limit,
)
async def _execute_outbox_event(self, event: OutboxEvent) -> BackendCommitResult | BackendWriteResult:
payload = self._outbox_payload(event)
if event.operation != BackendOperation.COMMIT_SESSION:
return BackendCommitResult(
backend_type=event.backend_type,
operation=event.operation,
status=BackendResultStatus.SKIPPED,
metadata={"reason": "unsupported_operation"},
)
if event.backend_type == BackendType.OPENVIKING:
client = await self.openviking_client_factory()
if not hasattr(client, "commit_session_v2"):
return BackendCommitResult(
backend_type=BackendType.OPENVIKING,
operation=BackendOperation.COMMIT_SESSION,
status=BackendResultStatus.SKIPPED,
metadata={"reason": "adapter_method_missing"},
)
result = await client.commit_session_v2(payload)
return result
if event.backend_type == BackendType.EVERMEMOS:
client = self.evermemos_client or EverMemOSClient()
if not hasattr(client, "extract_profile_long_term_v2"):
return BackendCommitResult(
backend_type=BackendType.EVERMEMOS,
operation=BackendOperation.COMMIT_SESSION,
status=BackendResultStatus.SKIPPED,
metadata={"reason": "adapter_method_missing"},
)
result = client.extract_profile_long_term_v2(payload)
if hasattr(result, "__await__"):
result = await result
return result
return BackendCommitResult(
backend_type=event.backend_type,
operation=event.operation,
status=BackendResultStatus.SKIPPED,
metadata={"reason": "unsupported_backend"},
)
def _outbox_payload(self, event: OutboxEvent) -> dict[str, Any]:
return {
"event_id": event.id,
"gateway_id": event.gateway_id,
"workspace_id": event.workspace_id,
"user_id": event.user_id,
"agent_id": event.agent_id,
"session_id": event.session_id,
"backend_type": event.backend_type.value,
"operation": event.operation.value,
"payload_ref": event.payload_ref,
"metadata": self._safe_control_metadata(event.metadata),
}
def _apply_outbox_result(self, event: OutboxEvent, result: BackendCommitResult | BackendWriteResult) -> OutboxEvent:
result_data = self._backend_result_to_dict(result)
safe_result = self._backend_control_metadata(result_data)
event.metadata = self._safe_control_metadata({**event.metadata, "backend_result": safe_result})
event.last_error = result.error_message
event.updated_at = datetime.now(timezone.utc)
if result.status == BackendResultStatus.SUCCESS:
self._write_commit_memory_refs(event, result)
event.status = OutboxEventStatus.SUCCESS
self._clear_outbox_lock(event)
return self.repo.save_outbox_event(event)
if result.status == BackendResultStatus.SKIPPED:
event.status = OutboxEventStatus.SKIPPED
self._clear_outbox_lock(event)
return self.repo.save_outbox_event(event)
if result.status == BackendResultStatus.FAILED:
return self._handle_failed_outbox_event(event, result)
event.status = OutboxEventStatus.PENDING
self._clear_outbox_lock(event)
return self.repo.save_outbox_event(event)
def _handle_failed_outbox_event(self, event: OutboxEvent, result: BackendCommitResult | BackendWriteResult) -> OutboxEvent:
event.attempt_count += 1
event.last_error = result.error_message
if result.retryable and event.attempt_count < event.max_attempts:
event.status = OutboxEventStatus.PENDING
event.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=min(60, 2**event.attempt_count))
else:
event.status = OutboxEventStatus.DEAD_LETTER
event.next_retry_at = None
self._clear_outbox_lock(event)
return self.repo.save_outbox_event(event)
def _clear_outbox_lock(self, event: OutboxEvent) -> None:
event.locked_by = None
event.locked_at = None
event.lease_expires_at = None
def _write_commit_memory_refs(self, event: OutboxEvent, result: BackendCommitResult | BackendWriteResult) -> list[MemoryRef]:
produced_refs = result.refs if isinstance(result, BackendCommitResult) and result.refs else []
if produced_refs:
refs: list[MemoryRef] = []
for index, produced_ref in enumerate(produced_refs):
saved = self._write_commit_memory_ref(event, result, produced_ref, index=index)
if saved:
refs.append(saved)
return refs
fallback = BackendProducedRef(
ref_type=self._commit_ref_type(event, result),
native_id=result.native_id,
native_uri=result.native_uri,
metadata={},
)
saved = self._write_commit_memory_ref(event, result, fallback, index=0)
return [saved] if saved else []
def _write_commit_memory_ref(
self,
event: OutboxEvent,
result: BackendCommitResult | BackendWriteResult,
produced_ref: BackendProducedRef,
index: int,
) -> MemoryRef | None:
stable_key = self._produced_ref_stable_key(produced_ref, index)
if not produced_ref.native_id and not produced_ref.native_uri and not stable_key:
return None
ref_type = produced_ref.ref_type
ref_id = self._memory_ref_id(event.gateway_id, event.backend_type, ref_type, produced_ref.native_id, produced_ref.native_uri, stable_key)
existing = self.repo.get_memory_ref(ref_id)
safe_produced_metadata = self._safe_control_metadata(produced_ref.metadata)
ref = MemoryRef(
id=ref_id,
gateway_id=event.gateway_id,
workspace_id=event.workspace_id,
user_id=event.user_id,
agent_id=event.agent_id,
session_id=event.session_id,
namespace=event.metadata.get("namespace"),
backend_type=event.backend_type,
ref_type=ref_type,
native_id=produced_ref.native_id,
native_uri=produced_ref.native_uri,
provenance_id="prov_"
+ hashlib.sha256(f"{event.id}|{ref_type.value}|{produced_ref.native_id}|{produced_ref.native_uri}".encode("utf-8")).hexdigest()[:24],
source_type="commit_session",
source_event_id=event.id,
status=BackendRefStatus.SUCCESS,
error_message=None,
metadata={
"schema_version": "memory-gateway.commit-ref.v2",
"job_id": self._job_id_from_payload_ref(event.payload_ref),
"outbox_event_id": event.id,
"operation": event.operation.value,
"produced_ref_index": index,
"stable_key": stable_key,
"produced_ref": safe_produced_metadata,
"backend_result": self._backend_control_metadata(self._backend_result_to_dict(result)),
},
)
if existing:
ref.created_at = existing.created_at
return self.repo.save_memory_ref(ref)
def _commit_ref_type(self, event: OutboxEvent, result: BackendCommitResult | BackendWriteResult) -> MemoryRefType:
requested = result.metadata.get("ref_type") if isinstance(result.metadata, dict) else None
if requested:
try:
return MemoryRefType(requested)
except ValueError:
pass
if event.backend_type == BackendType.OPENVIKING:
return MemoryRefType.SESSION_ARCHIVE
if event.backend_type == BackendType.EVERMEMOS:
return MemoryRefType.LONG_TERM_MEMORY
return MemoryRefType.DRAFT_REVIEW
def _job_id_from_payload_ref(self, payload_ref: str | None) -> str | None:
if payload_ref and payload_ref.startswith("commit_job:"):
return payload_ref.split(":", 1)[1]
return None
def _aggregate_commit_job_status(self, events: list[OutboxEvent]) -> OperationStatus:
if not events:
return OperationStatus.FAILED
statuses = {event.status for event in events}
if statuses.issubset({OutboxEventStatus.SUCCESS, OutboxEventStatus.SKIPPED}):
return OperationStatus.SUCCESS
if statuses == {OutboxEventStatus.DEAD_LETTER} or statuses == {OutboxEventStatus.FAILED}:
return OperationStatus.FAILED
if OutboxEventStatus.PENDING in statuses or OutboxEventStatus.PROCESSING in statuses:
if OutboxEventStatus.SUCCESS in statuses:
return OperationStatus.PARTIAL_SUCCESS
return OperationStatus.FAILED
if OutboxEventStatus.SUCCESS in statuses:
return OperationStatus.PARTIAL_SUCCESS
return OperationStatus.FAILED
def _commit_job_error_message(self, events: list[OutboxEvent]) -> str | None:
errors = [event.last_error for event in events if event.last_error]
return "; ".join(errors) if errors else None
def _outbox_summary(self, events: list[OutboxEvent] | None = None) -> OutboxSummary:
events = events if events is not None else self.repo.list_outbox_events(limit=100000)
counts = {status: 0 for status in OutboxEventStatus}
for event in events:
counts[event.status] = counts.get(event.status, 0) + 1
return OutboxSummary(
total_events=len(events),
pending_events=counts.get(OutboxEventStatus.PENDING, 0),
processing_events=counts.get(OutboxEventStatus.PROCESSING, 0),
success_events=counts.get(OutboxEventStatus.SUCCESS, 0),
skipped_events=counts.get(OutboxEventStatus.SKIPPED, 0),
dead_letter_events=counts.get(OutboxEventStatus.DEAD_LETTER, 0),
)
def _safe_control_metadata(self, metadata: dict[str, Any] | None) -> dict[str, Any]:
if not metadata:
return {}
blocked = {"content", "raw_request", "messages", "conversation", "transcript"}
safe: dict[str, Any] = {}
for key, value in metadata.items():
if key in blocked:
continue
if isinstance(value, dict):
nested = self._safe_control_metadata(value)
if nested:
safe[key] = nested
elif isinstance(value, (str, int, float, bool)) or value is None:
safe[key] = value
elif isinstance(value, list):
safe[key] = [item for item in value if isinstance(item, (str, int, float, bool))]
return safe
def _normalize_ingest_request(self, request: IngestRequest) -> IngestRequest:
data = request.model_copy(deep=True)
data.namespace = data.namespace.strip("/")
data.content = data.content.strip()
if not data.namespace:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="namespace is required")
if not data.content:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="content is required")
return data
def _build_gateway_id(self, request: IngestRequest) -> str:
seed = self._idempotency_basis(request)
return "gw_" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def _build_provenance_id(self, request: IngestRequest, gateway_id: str) -> str:
seed = f"{gateway_id}|{self._idempotency_basis(request)}"
return "prov_" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def _idempotency_basis(self, request: IngestRequest) -> str:
if request.idempotency_key:
return f"idempotency_key:{request.workspace_id}:{request.idempotency_key}"
if request.source_event_id:
return f"source_event_id:{request.workspace_id}:{request.source_type}:{request.source_event_id}"
return f"turn:{request.workspace_id}:{request.session_id}:{request.turn_id}"
def _content_hash(self, content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _check_namespace_access(self, request: IngestRequest) -> None:
# TODO(v2): enforce workspace/user/agent namespace ACL tree.
if not request.workspace_id or not request.user_id or not request.agent_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="workspace, user, and agent are required")
def _apply_safety_policy(self, request: IngestRequest) -> dict[str, Any]:
# TODO(v2): apply configurable redaction/safety rules before fan-out.
return {
"workspace_id": request.workspace_id,
"user_id": request.user_id,
"agent_id": request.agent_id,
"session_id": request.session_id,
"turn_id": request.turn_id,
"namespace": request.namespace,
"source_type": request.source_type,
"source_event_id": request.source_event_id,
"role": request.role,
"content": request.content,
"metadata": request.metadata,
"trace": request.trace.model_dump(mode="json"),
}
async def _write_openviking_turn(
self,
request: IngestRequest,
payload: dict[str, Any],
gateway_id: str,
provenance_id: str,
content_hash: str,
) -> MemoryRef:
try:
client = await self.openviking_client_factory()
if not hasattr(client, "ingest_conversation_turn"):
return self._save_ref(
request,
gateway_id,
provenance_id,
BackendType.OPENVIKING,
MemoryRefType.SESSION_ARCHIVE,
BackendRefStatus.SKIPPED,
content_hash=content_hash,
metadata=self._control_metadata(request, content_hash, {"reason": "adapter_method_missing"}),
)
result = await client.ingest_conversation_turn(payload)
return self._ref_from_backend_result(
request,
gateway_id,
provenance_id,
BackendType.OPENVIKING,
MemoryRefType.SESSION_ARCHIVE,
result,
content_hash,
)
except Exception as exc: # noqa: BLE001
return self._save_ref(
request,
gateway_id,
provenance_id,
BackendType.OPENVIKING,
MemoryRefType.SESSION_ARCHIVE,
BackendRefStatus.FAILED,
content_hash=content_hash,
error_message=str(exc),
metadata=self._control_metadata(request, content_hash),
)
async def _write_evermemos_message(
self,
request: IngestRequest,
payload: dict[str, Any],
gateway_id: str,
provenance_id: str,
content_hash: str,
) -> MemoryRef:
try:
client = self.evermemos_client or EverMemOSClient()
if not hasattr(client, "ingest_message"):
return self._save_ref(
request,
gateway_id,
provenance_id,
BackendType.EVERMEMOS,
MemoryRefType.MESSAGE_MEMORY,
BackendRefStatus.SKIPPED,
content_hash=content_hash,
metadata=self._control_metadata(request, content_hash, {"reason": "adapter_method_missing"}),
)
result = client.ingest_message(payload)
if hasattr(result, "__await__"):
result = await result
return self._ref_from_backend_result(
request,
gateway_id,
provenance_id,
BackendType.EVERMEMOS,
MemoryRefType.MESSAGE_MEMORY,
result,
content_hash,
)
except Exception as exc: # noqa: BLE001
return self._save_ref(
request,
gateway_id,
provenance_id,
BackendType.EVERMEMOS,
MemoryRefType.MESSAGE_MEMORY,
BackendRefStatus.FAILED,
content_hash=content_hash,
error_message=str(exc),
metadata=self._control_metadata(request, content_hash),
)
def _ref_from_backend_result(
self,
request: IngestRequest,
gateway_id: str,
provenance_id: str,
backend_type: BackendType,
ref_type: MemoryRefType,
result: Any,
content_hash: str,
) -> MemoryRef:
data = self._backend_result_to_dict(result)
raw_status = str(data.get("status") or "success")
ref_status = BackendRefStatus.SUCCESS
if raw_status in {BackendRefStatus.PENDING.value, BackendRefStatus.FAILED.value, BackendRefStatus.SKIPPED.value}:
ref_status = BackendRefStatus(raw_status)
native_id = data.get("native_id") or data.get("id") or data.get("memory_id") or data.get("session_id")
native_uri = data.get("native_uri") or data.get("uri") or data.get("url")
return self._save_ref(
request,
gateway_id,
provenance_id,
backend_type,
ref_type,
ref_status,
native_id=native_id,
native_uri=native_uri,
error_message=data.get("error") or data.get("error_message"),
content_hash=content_hash,
metadata=self._control_metadata(
request,
content_hash,
{"backend_response": self._backend_control_metadata(data)},
),
)
def _save_ref(
self,
request: IngestRequest,
gateway_id: str,
provenance_id: str,
backend_type: BackendType,
ref_type: MemoryRefType,
ref_status: BackendRefStatus,
native_id: str | None = None,
native_uri: str | None = None,
error_message: str | None = None,
content_hash: str | None = None,
metadata: dict[str, Any] | None = None,
) -> MemoryRef:
ref_id = self._memory_ref_id(gateway_id, backend_type, ref_type)
existing = self.repo.get_memory_ref(ref_id)
ref = MemoryRef(
id=ref_id,
gateway_id=gateway_id,
workspace_id=request.workspace_id,
user_id=request.user_id,
agent_id=request.agent_id,
session_id=request.session_id,
turn_id=request.turn_id,
namespace=request.namespace,
backend_type=backend_type,
ref_type=ref_type,
native_id=native_id,
native_uri=native_uri,
provenance_id=provenance_id,
idempotency_key=request.idempotency_key,
content_hash=content_hash,
source_type=request.source_type,
source_event_id=request.source_event_id,
status=ref_status,
error_message=error_message,
metadata=metadata or {},
)
if existing:
ref.created_at = existing.created_at
return self.repo.save_memory_ref(ref)
def _memory_ref_id(
self,
gateway_id: str,
backend_type: BackendType,
ref_type: MemoryRefType,
native_id: str | None = None,
native_uri: str | None = None,
stable_key: str | None = None,
) -> str:
native_key = f"|{native_id or ''}|{native_uri or ''}|{stable_key or ''}" if native_id or native_uri or stable_key else ""
seed = f"{gateway_id}|{backend_type.value}|{ref_type.value}{native_key}"
return "ref_" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def _produced_ref_stable_key(self, produced_ref: BackendProducedRef, index: int) -> str | None:
for key in ("stable_key", "backend_ref_key", "idempotency_key"):
value = produced_ref.metadata.get(key)
if isinstance(value, str) and value:
return value
if not produced_ref.native_id and not produced_ref.native_uri:
return f"produced_ref_index:{index}"
return None
def _aggregate_ref_status(self, refs: list[MemoryRef]) -> OperationStatus:
if not refs:
return OperationStatus.SKIPPED
statuses = {ref.status for ref in refs}
if statuses == {BackendRefStatus.SUCCESS}:
return OperationStatus.SUCCESS
if BackendRefStatus.SUCCESS in statuses and (BackendRefStatus.FAILED in statuses or BackendRefStatus.SKIPPED in statuses):
return OperationStatus.PARTIAL_SUCCESS
if statuses == {BackendRefStatus.FAILED}:
return OperationStatus.FAILED
if BackendRefStatus.FAILED in statuses:
return OperationStatus.PARTIAL_SUCCESS
if statuses == {BackendRefStatus.SKIPPED}:
return OperationStatus.SKIPPED
return OperationStatus.PENDING
def _view_refs(self, refs: list[MemoryRef]) -> list[MemoryRefView]:
return [MemoryRefView.model_validate(ref.model_dump(mode="json")) for ref in refs]
def _backend_control_metadata(self, data: dict[str, Any]) -> dict[str, Any]:
allowed_keys = {
"status",
"reason",
"native_id",
"native_uri",
"id",
"uri",
"url",
"memory_id",
"session_id",
"retryable",
"error_code",
"error",
"error_message",
"latency_ms",
}
metadata = {key: value for key, value in data.items() if key in allowed_keys}
nested_metadata = data.get("metadata")
if isinstance(nested_metadata, dict):
for key in ("reason", "backend_request_id", "latency_ms", "schema_version"):
if key in nested_metadata:
metadata[key] = nested_metadata[key]
return metadata
def _control_metadata(
self,
request: IngestRequest,
content_hash: str,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
metadata: dict[str, Any] = {
"schema_version": "memory-gateway.control-ref.v2",
"idempotency_basis": self._idempotency_basis(request),
"content_hash": content_hash,
}
source_channel = request.metadata.get("source_channel") or request.metadata.get("channel")
if source_channel:
metadata["source_channel"] = source_channel
if request.trace.trace_id:
metadata["trace_id"] = request.trace.trace_id
if request.trace.request_id:
metadata["trace_request_id"] = request.trace.request_id
if extra:
metadata.update(extra)
return metadata
def _backend_result_to_dict(self, result: Any) -> dict[str, Any]:
if isinstance(result, BackendWriteResult):
data = result.model_dump(mode="json")
if result.status == BackendResultStatus.FAILED and result.retryable:
data["retryable"] = True
return data
if hasattr(result, "model_dump"):
return result.model_dump(mode="json")
return result if isinstance(result, dict) else {"raw": str(result)}
def _commit_gateway_id(self, session_id: str, request: CommitRequest) -> str:
basis = request.idempotency_key or request.request_id or session_id
seed = f"commit:{request.workspace_id}:{session_id}:{basis}"
return "gwc_" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
def _create_commit_outbox_events(self, gateway_id: str, job: CommitJob, request: CommitRequest) -> None:
metadata = {
"schema_version": "memory-gateway.outbox.v2",
"job_id": job.job_id,
"namespace": request.namespace,
"idempotency_key": request.idempotency_key,
"request_id": request.request_id,
}
for backend_type in (BackendType.OPENVIKING, BackendType.EVERMEMOS):
event = OutboxEvent(
id=self._outbox_event_id(gateway_id, backend_type, BackendOperation.COMMIT_SESSION),
event_type="commit_session",
gateway_id=gateway_id,
workspace_id=request.workspace_id,
user_id=request.user_id,
agent_id=request.agent_id,
session_id=job.session_id,
backend_type=backend_type,
operation=BackendOperation.COMMIT_SESSION,
payload_ref=f"commit_job:{job.job_id}",
metadata=metadata,
)
self.repo.save_outbox_event(event)
def _outbox_event_id(self, gateway_id: str, backend_type: BackendType, operation: BackendOperation) -> str:
seed = f"{gateway_id}|{backend_type.value}|{operation.value}"
return "outbox_" + hashlib.sha256(seed.encode("utf-8")).hexdigest()[:24]
v2_service = MemoryGatewayV2Service()