"""OpenViking client wrapper used by Memory Gateway.""" from __future__ import annotations import json import logging import mimetypes import tempfile from json import JSONDecodeError from pathlib import Path from typing import Any, Optional import httpx from .backend_contracts import BackendCommitResult, BackendOperation, BackendResultStatus, BackendRetrieveResult, BackendWriteResult from .backend_normalization import ( map_backend_error_to_retryable, normalize_openviking_commit_response, normalize_openviking_ingest_response, normalize_openviking_retrieve_response, ) from .config import get_config from .schemas_v2 import BackendType from .types import MemoryEntry, ResourceEntry, SearchResult logger = logging.getLogger(__name__) class OpenVikingClient: """Thin async client for the OpenViking HTTP API.""" def __init__( self, base_url: Optional[str] = None, api_key: Optional[str] = None, timeout: int | None = None, account: str = "default", user: str = "default", enabled: bool | None = None, mode: str | None = None, verify_ssl: bool | None = None, ingest_path: str | None = None, transport: httpx.AsyncBaseTransport | None = None, ): self.config = get_config() self.base_url = base_url if base_url is not None else self.config.openviking.url self.api_key = api_key if api_key is not None else (self.config.openviking.api_key or "your-secret-root-key") self.timeout = timeout if timeout is not None else self.config.openviking.timeout self.account = account self.user = user self.enabled = self.config.openviking.enabled if enabled is None else enabled self.mode = mode or self.config.openviking.mode self.verify_ssl = self.config.openviking.verify_ssl if verify_ssl is None else verify_ssl self.ingest_path = ingest_path or self.config.openviking.ingest_path self.transport = transport self._client: Optional[httpx.AsyncClient] = None def _get_headers(self) -> dict[str, str]: headers = {} if self.api_key: headers["X-API-Key"] = self.api_key headers["X-OpenViking-Account"] = self.account headers["X-OpenViking-User"] = self.user return headers async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient( base_url=self.base_url, headers=self._get_headers(), timeout=self.timeout, verify=self.verify_ssl, transport=self.transport, ) return self._client async def close(self): if self._client: await self._client.aclose() self._client = None async def health_check(self) -> dict[str, Any]: client = await self._get_client() try: response = await client.get("/health") response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.error(f"OpenViking 健康检查失败: {e}") return {"status": "error", "message": str(e)} async def ingest_conversation_turn(self, payload: dict[str, Any]) -> BackendWriteResult: """v2 adapter placeholder for OpenViking session archive ingestion. Mapping spec: `backend_adapter_mapping.AdapterMappingSpec` maps OpenViking ingest_turn to this method and requires BackendWriteResult. Payloads must contain only control-plane fields; conversation content is not persisted by the Gateway control-plane store. TODO(v2): bind this to OpenViking's stable session/message archive API once that contract is finalized. Until then the gateway records a skipped backend ref instead of inventing an unstable HTTP contract. """ runtime_payload = self._build_ingest_payload(payload) if self._use_real_api: return await self._ingest_conversation_turn_real(runtime_payload) raw = { "status": "skipped", "session_id": runtime_payload.get("session_id"), "uri": f"viking://sessions/{runtime_payload.get('session_id')}", "metadata": { "reason": "openviking_v2_ingest_adapter_not_configured", "schema_version": "openviking.fixture.ingest.v2", }, } return self._normalize_ingest_response(raw) @property def _use_real_api(self) -> bool: # Real ingest is strictly gated by mode=real. The legacy `enabled` # field is retained for config compatibility, but must not trigger # network traffic by itself. return self.mode == "real" async def _ingest_conversation_turn_real(self, runtime_payload: dict[str, Any]) -> BackendWriteResult: if not self.base_url: return self._failed_ingest_result( error_code="config_error", error_message="OpenViking real ingest is enabled but base_url is missing", retryable=False, ) try: client = await self._get_client() response = await client.post( self._format_ingest_path(runtime_payload), json=runtime_payload, ) if response.status_code >= 400: return self._failed_ingest_result( error_code=f"http_{response.status_code}", error_message=f"OpenViking ingest failed with HTTP {response.status_code}", retryable=self._map_error(response), ) try: raw = response.json() except (JSONDecodeError, ValueError): return self._failed_ingest_result( error_code="invalid_json", error_message="OpenViking ingest returned invalid JSON", retryable=True, ) if not isinstance(raw, dict): return self._failed_ingest_result( error_code="unexpected_response", error_message="OpenViking ingest returned an unexpected response shape", retryable=True, ) return self._normalize_ingest_response(raw) except httpx.TimeoutException as exc: return self._failed_ingest_result("timeout", self._safe_error_message(exc), retryable=self._map_error(exc)) except httpx.RequestError as exc: return self._failed_ingest_result("network_error", self._safe_error_message(exc), retryable=self._map_error(exc)) except Exception as exc: # noqa: BLE001 return self._failed_ingest_result("unexpected_error", self._safe_error_message(exc), retryable=self._map_error(exc)) async def commit_session_v2(self, payload: dict[str, Any]) -> BackendCommitResult: """v2 adapter placeholder for OpenViking session commit. Mapping spec: commit_session returns BackendCommitResult and should produce a native session/archive ref once the real API is stable. """ runtime_payload = self._build_commit_payload(payload) raw = { "status": "success", "session_id": runtime_payload.get("session_id"), "metadata": { "reason": "openviking_v2_commit_fixture", "schema_version": "openviking.fixture.commit.v2", }, "result": { "refs": [ { "type": "session_summary", "id": f"ov_session_summary:{runtime_payload.get('session_id')}", "uri": f"viking://sessions/{runtime_payload.get('session_id')}/summary", "metadata": {"schema_version": "openviking.fixture.ref.v2"}, } ] }, } return self._normalize_commit_response(raw) async def retrieve_context_v2(self, payload: dict[str, Any]) -> BackendRetrieveResult: """v2 adapter placeholder for OpenViking runtime context retrieval. Mapping spec: retrieve_context returns BackendRetrieveResult with runtime context items, not raw backend payload dumps. """ raw = { "status": "ok", "session_id": payload.get("session_id"), "metadata": { "reason": "openviking_v2_retrieve_fixture", "schema_version": "openviking.fixture.retrieve.v2", }, "result": { "items": [ { "text": "OpenViking fixture runtime context.", "ref_id": f"ov_context:{payload.get('session_id') or 'unknown'}", "score": 0.75, "memory_type": "context_resource", "metadata": {"schema_version": "openviking.fixture.retrieve.item.v2"}, } ] }, } return self._normalize_retrieve_response(raw) def _build_ingest_payload(self, payload: dict[str, Any]) -> dict[str, Any]: # Runtime-only adapter payload. It may include conversation content for # the current request lifecycle; callers must not persist it to SQLite. return dict(payload) def _format_ingest_path(self, payload: dict[str, Any]) -> str: session_id = str(payload.get("session_id") or "unknown") return self.ingest_path.format(session_id=session_id) def _build_commit_payload(self, payload: dict[str, Any]) -> dict[str, Any]: return dict(payload) def _normalize_ingest_response(self, raw: dict[str, Any]) -> BackendWriteResult: return normalize_openviking_ingest_response(raw) def _normalize_commit_response(self, raw: dict[str, Any]) -> BackendCommitResult: return normalize_openviking_commit_response(raw) def _normalize_retrieve_response(self, raw: dict[str, Any]) -> BackendRetrieveResult: return normalize_openviking_retrieve_response(raw) def _map_error(self, exc_or_response: Any) -> bool: status_code = getattr(exc_or_response, "status_code", None) error_code = getattr(exc_or_response, "error_code", None) error_message = str(exc_or_response) if exc_or_response is not None else None return map_backend_error_to_retryable( BackendType.OPENVIKING, status_code=status_code, error_code=error_code, error_message=error_message, ) def _failed_ingest_result(self, error_code: str, error_message: str, retryable: bool) -> BackendWriteResult: return BackendWriteResult( backend_type=BackendType.OPENVIKING, operation=BackendOperation.INGEST_TURN, status=BackendResultStatus.FAILED, retryable=retryable, error_code=error_code, error_message=error_message, metadata={"error_code": error_code}, ) def _safe_error_message(self, exc: Exception) -> str: return exc.__class__.__name__ async def search( self, query: str, namespace: Optional[str] = None, limit: Optional[int] = None, uri: Optional[str] = None, ) -> SearchResult: """Semantic search against OpenViking resources/memories.""" client = await self._get_client() payload: dict[str, Any] = {"query": query} if limit: payload["limit"] = limit if uri: payload["uri"] = uri elif namespace: payload["uri"] = f"viking://{namespace}" try: response = await client.post("/api/v1/search/search", json=payload) response.raise_for_status() data = response.json() if data.get("status") != "ok": logger.warning(f"搜索返回错误: {data.get('error')}") return SearchResult(results=[], total=0) result = data.get("result", {}) memories = result.get("memories", []) resources = result.get("resources", []) all_results = [] for m in memories + resources: all_results.append( { "uri": m.get("uri"), "abstract": m.get("abstract"), "score": m.get("score"), "context_type": m.get("context_type"), } ) return SearchResult(results=all_results, total=result.get("total", len(all_results))) except httpx.HTTPError as e: logger.error(f"搜索失败: {e}") return SearchResult(results=[], total=0) async def add_memory( self, content: str, namespace: Optional[str] = None, memory_type: str = "general", ) -> dict[str, Any]: """Add memory via session commit flow.""" client = await self._get_client() ns = namespace or self.config.memory.default_namespace or "user/default/memories" try: response = await client.post("/api/v1/sessions", json={"mode": "interactive"}) response.raise_for_status() session_data = response.json() if session_data.get("status") != "ok": return session_data session_id = session_data["result"]["session_id"] commit_response = await client.post( f"/api/v1/sessions/{session_id}/commit", json={ "messages": [ { "role": "user", "content": f"[{ns}/{memory_type}] {content}", } ] }, ) commit_response.raise_for_status() return commit_response.json() except httpx.HTTPError as e: logger.error(f"添加记忆失败: {e}") raise async def _upload_temp_file(self, file_path: str | Path) -> str: client = await self._get_client() file_path = Path(file_path) mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream" with file_path.open("rb") as f: response = await client.post( "/api/v1/resources/temp_upload", files={"file": (file_path.name, f, mime_type)}, ) response.raise_for_status() data = response.json() result = data.get("result", {}) if "temp_path" in result: return result["temp_path"] if "temp_file_id" in result: return result["temp_file_id"] raise KeyError(f"Unexpected temp upload response: {data}") async def add_resource( self, uri: str, content: str, resource_type: str = "text", wait: bool = False, ) -> dict[str, Any]: """Add a text/json resource by uploading a temporary file first. OpenViking HTTP API does not accept raw `uri + content` directly. The client must upload a temp file and then create the resource with `to`. """ client = await self._get_client() suffix_map = { "json": ".json", "text": ".txt", "markdown": ".md", "md": ".md", } suffix = suffix_map.get(resource_type, ".txt") with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=suffix, delete=False) as tmp: tmp.write(content) tmp_path = Path(tmp.name) try: temp_ref = await self._upload_temp_file(tmp_path) payload = { "temp_path": temp_ref, "to": uri, "wait": wait, "source_name": Path(uri).name or tmp_path.name, "strict": False, } response = await client.post("/api/v1/resources", json=payload) if response.status_code >= 400: logger.error("添加资源失败响应: %s", response.text) response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.error(f"添加资源失败: {e}") raise finally: tmp_path.unlink(missing_ok=True) async def list_memories( self, namespace: Optional[str] = None, memory_type: Optional[str] = None, limit: Optional[int] = None, ) -> list[MemoryEntry]: client = await self._get_client() ns = namespace or "user/default/memories" if memory_type: ns = f"{ns}/{memory_type}" try: response = await client.post( "/api/v1/search/search", json={"query": "", "uri": f"viking://{ns}", "limit": limit or 10}, ) response.raise_for_status() data = response.json() if data.get("status") == "ok": result = data.get("result", {}) memories = result.get("memories", []) return [ MemoryEntry( id=m.get("uri", ""), content=m.get("abstract", ""), namespace=ns, memory_type=memory_type or "general", ) for m in memories ] return [] except httpx.HTTPError as e: logger.error(f"列出记忆失败: {e}") return [] async def list_resources( self, namespace: Optional[str] = None, limit: Optional[int] = None, ) -> list[ResourceEntry]: client = await self._get_client() uri = f"viking://{namespace}" if namespace else "viking://resources" try: response = await client.post( "/api/v1/search/search", json={"query": "", "uri": uri, "limit": limit or 10}, ) response.raise_for_status() data = response.json() if data.get("status") == "ok": result = data.get("result", {}) resources = result.get("resources", []) return [ ResourceEntry( uri=r.get("uri", ""), content=r.get("abstract", ""), resource_type="text", ) for r in resources ] return [] except httpx.HTTPError as e: logger.error(f"列出资源失败: {e}") return [] _client: Optional[OpenVikingClient] = None async def get_openviking_client() -> OpenVikingClient: global _client if _client is None: _client = OpenVikingClient() return _client async def close_openviking_client(): global _client if _client: await _client.close() _client = None