Add resource upload APIs
This commit is contained in:
@ -7,6 +7,7 @@ from .auth import verify_api_key
|
||||
from .schemas import (
|
||||
MessageIngestRequest,
|
||||
ProfileRequest,
|
||||
ResourceUploadRequest,
|
||||
SearchRequest,
|
||||
SessionContextRequest,
|
||||
SessionUserRequest,
|
||||
@ -54,6 +55,31 @@ async def ingest_messages(
|
||||
raise user_auth_error(exc) from exc
|
||||
|
||||
|
||||
@router.post("/resources")
|
||||
async def upload_resource(
|
||||
request: ResourceUploadRequest,
|
||||
service: MemorySystemService = Depends(get_service),
|
||||
):
|
||||
try:
|
||||
return await service.upload_resource(request)
|
||||
except PermissionError as exc:
|
||||
raise user_auth_error(exc) from exc
|
||||
|
||||
|
||||
@router.delete("/resources")
|
||||
async def delete_resource(
|
||||
user_id: str = Query(min_length=1),
|
||||
user_key: str = Query(min_length=1),
|
||||
uri: str = Query(min_length=1),
|
||||
recursive: bool = Query(default=True),
|
||||
service: MemorySystemService = Depends(get_service),
|
||||
):
|
||||
try:
|
||||
return await service.delete_resource(user_id, user_key, uri, recursive=recursive)
|
||||
except PermissionError as exc:
|
||||
raise user_auth_error(exc) from exc
|
||||
|
||||
|
||||
@router.post("/sessions/{session_id}/commit")
|
||||
async def commit_session(
|
||||
session_id: str,
|
||||
|
||||
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
@ -136,6 +137,59 @@ class OpenVikingMemorySystemClient:
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def upload_temp_file(self, credential: OpenVikingCredential | str, path: str | Path) -> dict[str, Any]:
|
||||
file_path = Path(path)
|
||||
async with self._credential_client(credential, json_content_type=False) as client:
|
||||
with file_path.open("rb") as file_obj:
|
||||
response = await client.post(
|
||||
"/api/v1/resources/temp_upload",
|
||||
files={"file": (file_path.name, file_obj)},
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def add_resource(
|
||||
self,
|
||||
credential: OpenVikingCredential | str,
|
||||
*,
|
||||
to: str,
|
||||
reason: str | None = None,
|
||||
wait: bool = True,
|
||||
directly_upload_media: bool = True,
|
||||
path: str | None = None,
|
||||
temp_file_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"to": to,
|
||||
"wait": wait,
|
||||
"directly_upload_media": directly_upload_media,
|
||||
}
|
||||
if reason is not None:
|
||||
payload["reason"] = reason
|
||||
if temp_file_id is not None:
|
||||
payload["temp_file_id"] = temp_file_id
|
||||
else:
|
||||
payload["path"] = path
|
||||
|
||||
async with self._credential_client(credential) as client:
|
||||
response = await client.post("/api/v1/resources", json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def delete_resource(
|
||||
self,
|
||||
credential: OpenVikingCredential | str,
|
||||
uri: str,
|
||||
recursive: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
async with self._credential_client(credential) as client:
|
||||
response = await client.delete(
|
||||
"/api/v1/fs",
|
||||
params={"uri": uri, "recursive": str(recursive).lower()},
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def find(self, credential: OpenVikingCredential | str, query: str, limit: int) -> dict[str, Any]:
|
||||
user_id = credential.user_id if isinstance(credential, OpenVikingCredential) else None
|
||||
target_uri = f"viking://user/{user_id}/memories/" if user_id else "viking://user/memories/"
|
||||
@ -198,11 +252,15 @@ class OpenVikingMemorySystemClient:
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def _credential_client(self, credential: OpenVikingCredential | str) -> httpx.AsyncClient:
|
||||
def _credential_client(
|
||||
self,
|
||||
credential: OpenVikingCredential | str,
|
||||
json_content_type: bool = True,
|
||||
) -> httpx.AsyncClient:
|
||||
if isinstance(credential, str):
|
||||
return self._client(credential)
|
||||
return self._client(credential, json_content_type=json_content_type)
|
||||
if credential.user_key_auth:
|
||||
return self._client(credential.api_key)
|
||||
return self._client(credential.api_key, json_content_type=json_content_type)
|
||||
headers = {}
|
||||
if credential.account_id:
|
||||
headers["X-OpenViking-Account"] = credential.account_id
|
||||
@ -210,10 +268,17 @@ class OpenVikingMemorySystemClient:
|
||||
headers["X-OpenViking-User"] = credential.user_id
|
||||
if credential.agent_id:
|
||||
headers["X-OpenViking-Agent"] = credential.agent_id
|
||||
return self._client(credential.api_key, headers)
|
||||
return self._client(credential.api_key, headers, json_content_type=json_content_type)
|
||||
|
||||
def _client(self, api_key: str, extra_headers: dict[str, str] | None = None) -> httpx.AsyncClient:
|
||||
headers = {"Content-Type": "application/json", "X-API-Key": api_key}
|
||||
def _client(
|
||||
self,
|
||||
api_key: str,
|
||||
extra_headers: dict[str, str] | None = None,
|
||||
json_content_type: bool = True,
|
||||
) -> httpx.AsyncClient:
|
||||
headers = {"X-API-Key": api_key}
|
||||
if json_content_type:
|
||||
headers["Content-Type"] = "application/json"
|
||||
if extra_headers:
|
||||
headers.update(extra_headers)
|
||||
return httpx.AsyncClient(
|
||||
|
||||
@ -54,6 +54,16 @@ class ProfileRequest(BaseModel):
|
||||
level: int = Field(default=2, ge=0)
|
||||
|
||||
|
||||
class ResourceUploadRequest(BaseModel):
|
||||
user_id: str = Field(min_length=1)
|
||||
user_key: str = Field(min_length=1)
|
||||
path: str = Field(min_length=1)
|
||||
to: str = Field(min_length=1)
|
||||
reason: str | None = None
|
||||
wait: bool = True
|
||||
directly_upload_media: bool = True
|
||||
|
||||
|
||||
class BackendStatus(BaseModel):
|
||||
status: OperationStatus
|
||||
result: Any = None
|
||||
@ -104,3 +114,9 @@ class ProfileResponse(BaseModel):
|
||||
profile: Any = None
|
||||
items: list[dict[str, Any]] = Field(default_factory=list)
|
||||
backends: dict[str, BackendStatus]
|
||||
|
||||
|
||||
class ResourceMutationResponse(BaseModel):
|
||||
status: OperationStatus
|
||||
resource: Any = None
|
||||
backends: dict[str, BackendStatus]
|
||||
|
||||
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Awaitable, Callable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from .clients import EverOSMemorySystemClient, OpenVikingMemorySystemClient
|
||||
from .schemas import (
|
||||
@ -13,6 +14,8 @@ from .schemas import (
|
||||
MessageIngestRequest,
|
||||
MessageIngestResponse,
|
||||
ProfileResponse,
|
||||
ResourceMutationResponse,
|
||||
ResourceUploadRequest,
|
||||
SearchRequest,
|
||||
SearchResponse,
|
||||
SessionContextRequest,
|
||||
@ -30,6 +33,49 @@ class MemorySystemService:
|
||||
account = backends["openviking"].result if backends["openviking"].status == "success" else None
|
||||
return AccountResponse(status=self._aggregate_status(backends), account=account, backends=backends)
|
||||
|
||||
async def upload_resource(self, request: ResourceUploadRequest) -> ResourceMutationResponse:
|
||||
credential = self.openviking.credential_for_user(request.user_id, request.user_key)
|
||||
|
||||
async def upload_openviking() -> dict[str, Any]:
|
||||
if self._is_remote_url(request.path):
|
||||
return await self.openviking.add_resource(
|
||||
credential,
|
||||
path=request.path,
|
||||
to=request.to,
|
||||
reason=request.reason,
|
||||
wait=request.wait,
|
||||
directly_upload_media=request.directly_upload_media,
|
||||
)
|
||||
|
||||
temp_upload = await self.openviking.upload_temp_file(credential, request.path)
|
||||
temp_file_id = self._temp_file_id_from_result(temp_upload)
|
||||
return await self.openviking.add_resource(
|
||||
credential,
|
||||
temp_file_id=temp_file_id,
|
||||
to=request.to,
|
||||
reason=request.reason,
|
||||
wait=request.wait,
|
||||
directly_upload_media=request.directly_upload_media,
|
||||
)
|
||||
|
||||
backends = {"openviking": await self._capture(upload_openviking)}
|
||||
resource = backends["openviking"].result if backends["openviking"].status == "success" else None
|
||||
return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends)
|
||||
|
||||
async def delete_resource(
|
||||
self,
|
||||
user_id: str,
|
||||
user_key: str,
|
||||
uri: str,
|
||||
recursive: bool = True,
|
||||
) -> ResourceMutationResponse:
|
||||
credential = self.openviking.credential_for_user(user_id, user_key)
|
||||
backends = {
|
||||
"openviking": await self._capture(lambda: self.openviking.delete_resource(credential, uri, recursive)),
|
||||
}
|
||||
resource = backends["openviking"].result if backends["openviking"].status == "success" else None
|
||||
return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends)
|
||||
|
||||
async def ingest_messages(self, request: MessageIngestRequest) -> MessageIngestResponse:
|
||||
messages = self._messages_from_request(request)
|
||||
if not messages:
|
||||
@ -204,6 +250,16 @@ class MemorySystemService:
|
||||
messages.append({"role": "assistant", "content": request.assistant_message})
|
||||
return messages
|
||||
|
||||
def _is_remote_url(self, path: str) -> bool:
|
||||
return urlparse(path).scheme in {"http", "https"}
|
||||
|
||||
def _temp_file_id_from_result(self, result: Any) -> str:
|
||||
data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result
|
||||
temp_file_id = data.get("temp_file_id") if isinstance(data, dict) else None
|
||||
if not temp_file_id:
|
||||
raise ValueError("OpenViking temp upload response missing temp_file_id")
|
||||
return str(temp_file_id)
|
||||
|
||||
async def _run_backends(self, **calls: Callable[[], Awaitable[Any]]) -> dict[str, BackendStatus]:
|
||||
names = list(calls)
|
||||
results = await asyncio.gather(*(self._capture(calls[name]) for name in names))
|
||||
|
||||
Reference in New Issue
Block a user