498 lines
19 KiB
Python
498 lines
19 KiB
Python
"""OpenViking client wrapper used by Memory Gateway."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import mimetypes
|
|
import tempfile
|
|
from json import JSONDecodeError
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
|
|
from .backend_contracts import BackendCommitResult, BackendOperation, BackendResultStatus, BackendRetrieveResult, BackendWriteResult
|
|
from .backend_normalization import (
|
|
map_backend_error_to_retryable,
|
|
normalize_openviking_commit_response,
|
|
normalize_openviking_ingest_response,
|
|
normalize_openviking_retrieve_response,
|
|
)
|
|
from .config import get_config
|
|
from .schemas_v2 import BackendType
|
|
from .types import MemoryEntry, ResourceEntry, SearchResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OpenVikingClient:
|
|
"""Thin async client for the OpenViking HTTP API."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
timeout: int | None = None,
|
|
account: str = "default",
|
|
user: str = "default",
|
|
enabled: bool | None = None,
|
|
mode: str | None = None,
|
|
verify_ssl: bool | None = None,
|
|
ingest_path: str | None = None,
|
|
transport: httpx.AsyncBaseTransport | None = None,
|
|
):
|
|
self.config = get_config()
|
|
self.base_url = base_url if base_url is not None else self.config.openviking.url
|
|
self.api_key = api_key if api_key is not None else (self.config.openviking.api_key or "your-secret-root-key")
|
|
self.timeout = timeout if timeout is not None else self.config.openviking.timeout
|
|
self.account = account
|
|
self.user = user
|
|
self.enabled = self.config.openviking.enabled if enabled is None else enabled
|
|
self.mode = mode or self.config.openviking.mode
|
|
self.verify_ssl = self.config.openviking.verify_ssl if verify_ssl is None else verify_ssl
|
|
self.ingest_path = ingest_path or self.config.openviking.ingest_path
|
|
self.transport = transport
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
|
|
def _get_headers(self) -> dict[str, str]:
|
|
headers = {}
|
|
if self.api_key:
|
|
headers["X-API-Key"] = self.api_key
|
|
headers["X-OpenViking-Account"] = self.account
|
|
headers["X-OpenViking-User"] = self.user
|
|
return headers
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None:
|
|
self._client = httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers=self._get_headers(),
|
|
timeout=self.timeout,
|
|
verify=self.verify_ssl,
|
|
transport=self.transport,
|
|
)
|
|
return self._client
|
|
|
|
async def close(self):
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
async def health_check(self) -> dict[str, Any]:
|
|
client = await self._get_client()
|
|
try:
|
|
response = await client.get("/health")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"OpenViking 健康检查失败: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def ingest_conversation_turn(self, payload: dict[str, Any]) -> BackendWriteResult:
|
|
"""v2 adapter placeholder for OpenViking session archive ingestion.
|
|
|
|
Mapping spec: `backend_adapter_mapping.AdapterMappingSpec` maps
|
|
OpenViking ingest_turn to this method and requires BackendWriteResult.
|
|
Payloads must contain only control-plane fields; conversation content
|
|
is not persisted by the Gateway control-plane store.
|
|
|
|
TODO(v2): bind this to OpenViking's stable session/message archive API
|
|
once that contract is finalized. Until then the gateway records a
|
|
skipped backend ref instead of inventing an unstable HTTP contract.
|
|
"""
|
|
runtime_payload = self._build_ingest_payload(payload)
|
|
if self._use_real_api:
|
|
return await self._ingest_conversation_turn_real(runtime_payload)
|
|
raw = {
|
|
"status": "skipped",
|
|
"session_id": runtime_payload.get("session_id"),
|
|
"uri": f"viking://sessions/{runtime_payload.get('session_id')}",
|
|
"metadata": {
|
|
"reason": "openviking_v2_ingest_adapter_not_configured",
|
|
"schema_version": "openviking.fixture.ingest.v2",
|
|
},
|
|
}
|
|
return self._normalize_ingest_response(raw)
|
|
|
|
@property
|
|
def _use_real_api(self) -> bool:
|
|
# Real ingest is strictly gated by mode=real. The legacy `enabled`
|
|
# field is retained for config compatibility, but must not trigger
|
|
# network traffic by itself.
|
|
return self.mode == "real"
|
|
|
|
async def _ingest_conversation_turn_real(self, runtime_payload: dict[str, Any]) -> BackendWriteResult:
|
|
if not self.base_url:
|
|
return self._failed_ingest_result(
|
|
error_code="config_error",
|
|
error_message="OpenViking real ingest is enabled but base_url is missing",
|
|
retryable=False,
|
|
)
|
|
try:
|
|
client = await self._get_client()
|
|
response = await client.post(
|
|
self._format_ingest_path(runtime_payload),
|
|
json=runtime_payload,
|
|
)
|
|
if response.status_code >= 400:
|
|
return self._failed_ingest_result(
|
|
error_code=f"http_{response.status_code}",
|
|
error_message=f"OpenViking ingest failed with HTTP {response.status_code}",
|
|
retryable=self._map_error(response),
|
|
)
|
|
try:
|
|
raw = response.json()
|
|
except (JSONDecodeError, ValueError):
|
|
return self._failed_ingest_result(
|
|
error_code="invalid_json",
|
|
error_message="OpenViking ingest returned invalid JSON",
|
|
retryable=True,
|
|
)
|
|
if not isinstance(raw, dict):
|
|
return self._failed_ingest_result(
|
|
error_code="unexpected_response",
|
|
error_message="OpenViking ingest returned an unexpected response shape",
|
|
retryable=True,
|
|
)
|
|
return self._normalize_ingest_response(raw)
|
|
except httpx.TimeoutException as exc:
|
|
return self._failed_ingest_result("timeout", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
except httpx.RequestError as exc:
|
|
return self._failed_ingest_result("network_error", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
except Exception as exc: # noqa: BLE001
|
|
return self._failed_ingest_result("unexpected_error", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
|
|
async def commit_session_v2(self, payload: dict[str, Any]) -> BackendCommitResult:
|
|
"""v2 adapter placeholder for OpenViking session commit.
|
|
|
|
Mapping spec: commit_session returns BackendCommitResult and should
|
|
produce a native session/archive ref once the real API is stable.
|
|
"""
|
|
runtime_payload = self._build_commit_payload(payload)
|
|
raw = {
|
|
"status": "success",
|
|
"session_id": runtime_payload.get("session_id"),
|
|
"metadata": {
|
|
"reason": "openviking_v2_commit_fixture",
|
|
"schema_version": "openviking.fixture.commit.v2",
|
|
},
|
|
"result": {
|
|
"refs": [
|
|
{
|
|
"type": "session_summary",
|
|
"id": f"ov_session_summary:{runtime_payload.get('session_id')}",
|
|
"uri": f"viking://sessions/{runtime_payload.get('session_id')}/summary",
|
|
"metadata": {"schema_version": "openviking.fixture.ref.v2"},
|
|
}
|
|
]
|
|
},
|
|
}
|
|
return self._normalize_commit_response(raw)
|
|
|
|
async def retrieve_context_v2(self, payload: dict[str, Any]) -> BackendRetrieveResult:
|
|
"""v2 adapter placeholder for OpenViking runtime context retrieval.
|
|
|
|
Mapping spec: retrieve_context returns BackendRetrieveResult with
|
|
runtime context items, not raw backend payload dumps.
|
|
"""
|
|
raw = {
|
|
"status": "ok",
|
|
"session_id": payload.get("session_id"),
|
|
"metadata": {
|
|
"reason": "openviking_v2_retrieve_fixture",
|
|
"schema_version": "openviking.fixture.retrieve.v2",
|
|
},
|
|
"result": {
|
|
"items": [
|
|
{
|
|
"text": "OpenViking fixture runtime context.",
|
|
"ref_id": f"ov_context:{payload.get('session_id') or 'unknown'}",
|
|
"score": 0.75,
|
|
"memory_type": "context_resource",
|
|
"metadata": {"schema_version": "openviking.fixture.retrieve.item.v2"},
|
|
}
|
|
]
|
|
},
|
|
}
|
|
return self._normalize_retrieve_response(raw)
|
|
|
|
def _build_ingest_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
# Runtime-only adapter payload. It may include conversation content for
|
|
# the current request lifecycle; callers must not persist it to SQLite.
|
|
return dict(payload)
|
|
|
|
def _format_ingest_path(self, payload: dict[str, Any]) -> str:
|
|
session_id = str(payload.get("session_id") or "unknown")
|
|
return self.ingest_path.format(session_id=session_id)
|
|
|
|
def _build_commit_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
return dict(payload)
|
|
|
|
def _normalize_ingest_response(self, raw: dict[str, Any]) -> BackendWriteResult:
|
|
return normalize_openviking_ingest_response(raw)
|
|
|
|
def _normalize_commit_response(self, raw: dict[str, Any]) -> BackendCommitResult:
|
|
return normalize_openviking_commit_response(raw)
|
|
|
|
def _normalize_retrieve_response(self, raw: dict[str, Any]) -> BackendRetrieveResult:
|
|
return normalize_openviking_retrieve_response(raw)
|
|
|
|
def _map_error(self, exc_or_response: Any) -> bool:
|
|
status_code = getattr(exc_or_response, "status_code", None)
|
|
error_code = getattr(exc_or_response, "error_code", None)
|
|
error_message = str(exc_or_response) if exc_or_response is not None else None
|
|
return map_backend_error_to_retryable(
|
|
BackendType.OPENVIKING,
|
|
status_code=status_code,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
)
|
|
|
|
def _failed_ingest_result(self, error_code: str, error_message: str, retryable: bool) -> BackendWriteResult:
|
|
return BackendWriteResult(
|
|
backend_type=BackendType.OPENVIKING,
|
|
operation=BackendOperation.INGEST_TURN,
|
|
status=BackendResultStatus.FAILED,
|
|
retryable=retryable,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
metadata={"error_code": error_code},
|
|
)
|
|
|
|
def _safe_error_message(self, exc: Exception) -> str:
|
|
return exc.__class__.__name__
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
namespace: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
uri: Optional[str] = None,
|
|
) -> SearchResult:
|
|
"""Semantic search against OpenViking resources/memories."""
|
|
client = await self._get_client()
|
|
|
|
payload: dict[str, Any] = {"query": query}
|
|
if limit:
|
|
payload["limit"] = limit
|
|
|
|
if uri:
|
|
payload["uri"] = uri
|
|
elif namespace:
|
|
payload["uri"] = f"viking://{namespace}"
|
|
|
|
try:
|
|
response = await client.post("/api/v1/search/search", json=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("status") != "ok":
|
|
logger.warning(f"搜索返回错误: {data.get('error')}")
|
|
return SearchResult(results=[], total=0)
|
|
|
|
result = data.get("result", {})
|
|
memories = result.get("memories", [])
|
|
resources = result.get("resources", [])
|
|
|
|
all_results = []
|
|
for m in memories + resources:
|
|
all_results.append(
|
|
{
|
|
"uri": m.get("uri"),
|
|
"abstract": m.get("abstract"),
|
|
"score": m.get("score"),
|
|
"context_type": m.get("context_type"),
|
|
}
|
|
)
|
|
|
|
return SearchResult(results=all_results, total=result.get("total", len(all_results)))
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"搜索失败: {e}")
|
|
return SearchResult(results=[], total=0)
|
|
|
|
async def add_memory(
|
|
self,
|
|
content: str,
|
|
namespace: Optional[str] = None,
|
|
memory_type: str = "general",
|
|
) -> dict[str, Any]:
|
|
"""Add memory via session commit flow."""
|
|
client = await self._get_client()
|
|
ns = namespace or self.config.memory.default_namespace or "user/default/memories"
|
|
|
|
try:
|
|
response = await client.post("/api/v1/sessions", json={"mode": "interactive"})
|
|
response.raise_for_status()
|
|
session_data = response.json()
|
|
|
|
if session_data.get("status") != "ok":
|
|
return session_data
|
|
|
|
session_id = session_data["result"]["session_id"]
|
|
commit_response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/commit",
|
|
json={
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": f"[{ns}/{memory_type}] {content}",
|
|
}
|
|
]
|
|
},
|
|
)
|
|
commit_response.raise_for_status()
|
|
return commit_response.json()
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"添加记忆失败: {e}")
|
|
raise
|
|
|
|
async def _upload_temp_file(self, file_path: str | Path) -> str:
|
|
client = await self._get_client()
|
|
file_path = Path(file_path)
|
|
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
|
|
|
|
with file_path.open("rb") as f:
|
|
response = await client.post(
|
|
"/api/v1/resources/temp_upload",
|
|
files={"file": (file_path.name, f, mime_type)},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
result = data.get("result", {})
|
|
if "temp_path" in result:
|
|
return result["temp_path"]
|
|
if "temp_file_id" in result:
|
|
return result["temp_file_id"]
|
|
raise KeyError(f"Unexpected temp upload response: {data}")
|
|
|
|
async def add_resource(
|
|
self,
|
|
uri: str,
|
|
content: str,
|
|
resource_type: str = "text",
|
|
wait: bool = False,
|
|
) -> dict[str, Any]:
|
|
"""Add a text/json resource by uploading a temporary file first.
|
|
|
|
OpenViking HTTP API does not accept raw `uri + content` directly. The
|
|
client must upload a temp file and then create the resource with `to`.
|
|
"""
|
|
client = await self._get_client()
|
|
suffix_map = {
|
|
"json": ".json",
|
|
"text": ".txt",
|
|
"markdown": ".md",
|
|
"md": ".md",
|
|
}
|
|
suffix = suffix_map.get(resource_type, ".txt")
|
|
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", suffix=suffix, delete=False) as tmp:
|
|
tmp.write(content)
|
|
tmp_path = Path(tmp.name)
|
|
|
|
try:
|
|
temp_ref = await self._upload_temp_file(tmp_path)
|
|
payload = {
|
|
"temp_path": temp_ref,
|
|
"to": uri,
|
|
"wait": wait,
|
|
"source_name": Path(uri).name or tmp_path.name,
|
|
"strict": False,
|
|
}
|
|
response = await client.post("/api/v1/resources", json=payload)
|
|
if response.status_code >= 400:
|
|
logger.error("添加资源失败响应: %s", response.text)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"添加资源失败: {e}")
|
|
raise
|
|
finally:
|
|
tmp_path.unlink(missing_ok=True)
|
|
|
|
async def list_memories(
|
|
self,
|
|
namespace: Optional[str] = None,
|
|
memory_type: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list[MemoryEntry]:
|
|
client = await self._get_client()
|
|
|
|
ns = namespace or "user/default/memories"
|
|
if memory_type:
|
|
ns = f"{ns}/{memory_type}"
|
|
|
|
try:
|
|
response = await client.post(
|
|
"/api/v1/search/search",
|
|
json={"query": "", "uri": f"viking://{ns}", "limit": limit or 10},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("status") == "ok":
|
|
result = data.get("result", {})
|
|
memories = result.get("memories", [])
|
|
return [
|
|
MemoryEntry(
|
|
id=m.get("uri", ""),
|
|
content=m.get("abstract", ""),
|
|
namespace=ns,
|
|
memory_type=memory_type or "general",
|
|
)
|
|
for m in memories
|
|
]
|
|
return []
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"列出记忆失败: {e}")
|
|
return []
|
|
|
|
async def list_resources(
|
|
self,
|
|
namespace: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list[ResourceEntry]:
|
|
client = await self._get_client()
|
|
|
|
uri = f"viking://{namespace}" if namespace else "viking://resources"
|
|
try:
|
|
response = await client.post(
|
|
"/api/v1/search/search",
|
|
json={"query": "", "uri": uri, "limit": limit or 10},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("status") == "ok":
|
|
result = data.get("result", {})
|
|
resources = result.get("resources", [])
|
|
return [
|
|
ResourceEntry(
|
|
uri=r.get("uri", ""),
|
|
content=r.get("abstract", ""),
|
|
resource_type="text",
|
|
)
|
|
for r in resources
|
|
]
|
|
return []
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"列出资源失败: {e}")
|
|
return []
|
|
|
|
|
|
_client: Optional[OpenVikingClient] = None
|
|
|
|
|
|
async def get_openviking_client() -> OpenVikingClient:
|
|
global _client
|
|
if _client is None:
|
|
_client = OpenVikingClient()
|
|
return _client
|
|
|
|
|
|
async def close_openviking_client():
|
|
global _client
|
|
if _client:
|
|
await _client.close()
|
|
_client = None
|