diff --git a/README.md b/README.md index cfd90eb..5228c08 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,8 @@ Memory Gateway 是一个轻量级 FastAPI 服务,用于在上游记忆服务 ## 功能范围 - 上传用户资源:文件、图片、音频、PDF、HTML、普通文档、纯文本。 +- 登记外部资源:文件真实保存在 Beaver user_files/MinIO 等外部存储, + Gateway 只保存元数据和 URI 映射,不保存原始文件副本。 - 保存资源元数据到 SQLite。 - 为每个资源生成独立的上游记忆服务 `session_id`。 - 调用上游记忆服务的 `add` 和 `flush` 完成资源记忆摄入,并对临时失败做轻量重试。 @@ -285,6 +287,83 @@ curl -X POST http://127.0.0.1:8010/resources \ 资源上传接口返回的 `uri` 始终是 `resource://{user_id}/{resource_id}`。按文件名 命中的记忆搜索结果会另外通过 `attachments[].internal_uri` 返回真实 URI。 +### 3b. 登记外部资源 + +```http +POST /resources/external +Content-Type: application/json +``` + +当文件已经保存在 Beaver user_files/MinIO 等外部存储时,调用该接口登记资源。 +Gateway 不会保存原始文件副本,也不会下载 `source_uri` 或 `ingest_uri`;它只写入 +SQLite 元数据、建立附件映射,并把 `ingest_uri` 作为 content item URI 传给上游 +记忆服务完成解析和索引。 + +请求参数: + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|---|---|---|---|---| +| `user_id` | string | 是 | 无 | 用户 ID | +| `user_key` | string | 是 | 无 | 用户 key | +| `app_id` | string | 否 | `default` | 上游记忆服务 app scope | +| `project_id` | string | 否 | `default` | 上游记忆服务 project scope | +| `filename` | string | 是 | 无 | 文件名,用于检索附件匹配和上游解析 | +| `mime_type` | string | 否 | `null` | MIME 类型,如 `image/png`、`application/pdf` | +| `content_type` | string | 否 | 自动推断 | 上游 content type:`image`、`audio`、`pdf`、`html`、`text` 或 `doc`。如果传入 MIME 类型,Gateway 会按 MIME/扩展名重新推断 | +| `size_bytes` | integer | 否 | `null` | 文件大小 | +| `sha256` | string | 否 | `null` | 文件内容 sha256;同用户同 app/project 下相同 sha256 的活跃资源会复用 | +| `source_uri` | string | 是 | 无 | 长期真实映射 URI,如 `minio://bucket/users/u_123/outputs/chart.png` 或本地开发的 `file://...` | +| `ingest_uri` | string | 否 | `source_uri` | 上游记忆服务当次摄入可读取的 URI。MinIO 场景推荐传短期 presigned GET URL | +| `title` | string | 否 | `null` | 资源标题 | +| `description` | string | 否 | `null` | 资源描述 | + +处理流程: + +1. 生成 `resource_id`。 +2. 生成 `session_id = resource:{user_id}:{resource_id}`。 +3. 写入 `user_resources`,`uri = source_uri`,状态为 `ingesting`。 +4. 写入 `memory_attachments`,`internal_uri = source_uri`,`source = external_resource`。 +5. 构造上游 content item:`{"type": "...", "name": filename, "uri": ingest_uri, "extras": {"resource_id": "...", "source": "external_resource"}}`。 +6. 调用上游记忆服务 `/api/v1/memory/add`。 +7. 调用上游记忆服务 `/api/v1/memory/flush`。 +8. 成功后状态改为 `extracted`,失败后状态改为 `failed` 并记录错误。 + +请求示例: + +```bash +curl -X POST http://127.0.0.1:8010/resources/external \ + -H 'Content-Type: application/json' \ + -d '{ + "user_id": "u_123", + "user_key": "uk_xxx", + "app_id": "default", + "project_id": "default", + "filename": "chart.png", + "mime_type": "image/png", + "size_bytes": 12345, + "sha256": "abc123...", + "source_uri": "minio://beaver-user-files/users/u_123/outputs/chart.png", + "ingest_uri": "https://minio.example/presigned/chart.png", + "title": "chart.png", + "description": "Beaver user file outputs/chart.png" + }' +``` + +响应示例: + +```json +{ + "resource_id": "r_xxx", + "session_id": "resource:u_123:r_xxx", + "uri": "resource://u_123/r_xxx", + "status": "extracted" +} +``` + +`source_uri` 是 Gateway 长期保存和搜索结果附件映射使用的真实 URI; +`ingest_uri` 只用于本次上游解析。不要把短期 presigned URL 作为 `source_uri` +长期入库。 + ### 4. 查询资源列表 ```http @@ -426,7 +505,121 @@ curl -X DELETE "http://127.0.0.1:8010/resources/r_xxx?user_id=u_123&user_key=uk_ } ``` -### 7. 搜索记忆 +### 7. 添加记忆 + +```http +POST /memories/add +Content-Type: application/json +``` + +该接口把一批消息追加到指定 `session_id`,并调用上游记忆服务 +`/api/v1/memory/add`。如果消息 content item 中包含 `uri` 或 `base64` 附件, +Gateway 会额外登记附件映射,供后续 `/memories/search` 返回 +`attachments[].internal_uri`。 + +请求参数: + +| 参数 | 类型 | 必填 | 默认值 | 说明 | +|---|---|---|---|---| +| `user_id` | string | 是 | 无 | 用户 ID | +| `user_key` | string | 是 | 无 | 用户 key | +| `session_id` | string | 是 | 无 | 记忆 session,如 `chat:{conversation_id}`、`resource:{user_id}:{resource_id}` 或 `memory_edit:{user_id}` | +| `messages` | object[] | 是 | 无 | 追加到该 session 的消息数组,至少 1 条 | +| `app_id` | string | 否 | `default` | 上游记忆服务 app scope | +| `project_id` | string | 否 | `default` | 上游记忆服务 project scope | + +`messages[]` 字段: + +| 字段 | 类型 | 必填 | 说明 | +|---|---|---|---| +| `sender_id` | string | 是 | 发送方 ID,通常为当前用户 ID | +| `role` | string | 是 | `user`、`assistant` 或 `tool` | +| `timestamp` | integer | 是 | Unix 毫秒时间戳,必须大于 0 | +| `content` | string 或 object[] | 是 | 纯文本,或上游记忆服务 content item 数组 | + +常见 content item: + +```json +{"type": "text", "text": "用户喜欢简洁的中文回答"} +``` + +```json +{ + "type": "image", + "uri": "file:///home/tom/memory-gateway/tests/simple-multimodal-image.png", + "ext": "png", + "name": "simple-multimodal-image.png" +} +``` + +```json +{ + "type": "audio", + "base64": "BASE64_DATA", + "ext": "wav", + "name": "tone.wav" +} +``` + +处理流程: + +1. 校验 `user_id` 和 `user_key`。 +2. 检查 `messages` 基本结构。 +3. 遍历 object[] content item:含 `uri` 时登记该 URI;只有 `base64` 时先校验并保存到 `MEMORY_GATEWAY_STORAGE_DIR/{user_id}/memory_attachments/{sha256}/`,再登记生成的 `file://` URI。 +4. 将原始 `session_id`、`app_id`、`project_id`、`messages` 原样转发给上游记忆服务 `/api/v1/memory/add`。 +5. 上游 add 成功后写入附件映射;失败时不会保留本次新生成的 base64 附件文件。 + +请求示例: + +```bash +curl -X POST http://127.0.0.1:8010/memories/add \ + -H 'Content-Type: application/json' \ + -d '{ + "user_id": "u_123", + "user_key": "uk_xxx", + "session_id": "chat:c_456", + "app_id": "default", + "project_id": "default", + "messages": [ + { + "sender_id": "u_123", + "role": "user", + "timestamp": 1781172177000, + "content": [ + { + "type": "text", + "text": "记住:我偏好 concise 的中文说明" + }, + { + "type": "image", + "uri": "file:///home/tom/memory-gateway/tests/simple-multimodal-image.png", + "ext": "png", + "name": "simple-multimodal-image.png" + } + ] + } + ] + }' +``` + +响应示例: + +```json +{ + "session_id": "chat:c_456", + "backend": { + "request_id": "add", + "data": { + "status": "accumulated" + } + } +} +``` + +`/memories/add` 只负责追加消息。需要让上游记忆服务对该 session 完成抽取和索引时, +继续调用 `/memories/flush`。 + +### 8. 搜索记忆 ```http POST /memories/search @@ -506,17 +699,19 @@ curl -X POST http://127.0.0.1:8010/memories/search \ 附件路径映射规则: -1. `/resources` 上传成功后,将资源真实 URI 与资源 session 写入 +1. `/resources` 上传成功后,将 Gateway 本地保存的真实 URI 与资源 session 写入 `memory_attachments`。数据库初始化会自动回填已有 `user_resources`。 -2. `/memories/add` 中含 `uri` 的 content item 会直接登记 URI。 -3. `/memories/add` 中只有 `base64` 的 content item 会保存到 +2. `/resources/external` 登记成功后,将外部存储的 `source_uri` 与资源 session + 写入 `memory_attachments`,但 Gateway 不保存原始文件副本。 +3. `/memories/add` 中含 `uri` 的 content item 会直接登记 URI。 +4. `/memories/add` 中只有 `base64` 的 content item 会保存到 `MEMORY_GATEWAY_STORAGE_DIR/{user_id}/memory_attachments/{sha256}/`,再登记 生成的 `file://` URI。相同用户、session、文件名和内容的重试会复用路径。 -4. 搜索时根据当前用户和结果 `session_id` 查询附件,递归检查 `raw` 中的字符串 +5. 搜索时根据当前用户和结果 `session_id` 查询附件,递归检查 `raw` 中的字符串 值。只有完整文件名出现时才返回对应附件;匹配不区分大小写。 -5. `raw` 中键名为 `base64` 的内容不会参与匹配。未匹配时返回 +6. `raw` 中键名为 `base64` 的内容不会参与匹配。未匹配时返回 `"attachments": []`。 -6. 历史 `/memories/add` 请求未保存在 Gateway 数据库中,无法自动补录映射;新 +7. 历史 `/memories/add` 请求未保存在 Gateway 数据库中,无法自动补录映射;新 版本上线后的请求会建立映射。 `attachments[].internal_uri` 会按配置和调用方输入直接暴露服务器真实 URI,调用 @@ -553,7 +748,7 @@ curl -X POST http://127.0.0.1:8010/memories/search \ } ``` -### 8. 修改记忆 +### 9. 修改记忆 ```http PATCH /memories/{memory_id} @@ -594,7 +789,7 @@ curl -X PATCH http://127.0.0.1:8010/memories/mem_abc \ } ``` -### 9. 删除记忆 +### 10. 删除记忆 ```http DELETE /memories/{memory_id} diff --git a/core/api.py b/core/api.py index 639c82d..b99fbcf 100644 --- a/core/api.py +++ b/core/api.py @@ -7,6 +7,7 @@ from datetime import datetime, timezone from typing import Any, Literal from urllib.parse import parse_qsl, quote, urlsplit, urlunsplit +import httpx from fastapi import APIRouter, FastAPI, File, Form, HTTPException, Request, UploadFile from pydantic import BaseModel, Field, field_validator from starlette.responses import Response @@ -86,6 +87,22 @@ class FlushMemoryRequest(BaseModel): project_id: str = "default" +class ExternalResourceRequest(BaseModel): + user_id: str = Field(min_length=1) + user_key: str = Field(min_length=1) + app_id: str = "default" + project_id: str = "default" + filename: str = Field(min_length=1) + mime_type: str | None = None + content_type: str | None = None + size_bytes: int | None = Field(default=None, ge=0) + sha256: str | None = None + source_uri: str = Field(min_length=1) + ingest_uri: str | None = None + title: str | None = None + description: str | None = None + + class MemoryOverrideRequest(BaseModel): user_id: str = Field(min_length=1) user_key: str = Field(min_length=1) @@ -196,6 +213,13 @@ def _body_for_log(body: bytes, content_type: str | None) -> Any: return {"content_type": content_type, "size_bytes": len(body)} +def _backend_http_error_detail(exc: httpx.HTTPStatusError) -> Any: + try: + return exc.response.json() + except ValueError: + return exc.response.text + + def create_app( *, config: GatewayConfig | None = None, @@ -366,6 +390,26 @@ def create_app( return {"resources": []} return {"resources": [resource]} + @router.post("/resources/external") + async def register_external_resource( + request: ExternalResourceRequest, + ) -> dict[str, Any]: + require_user(request.user_id, request.user_key) + return await service.register_external_resource( + user_id=request.user_id, + app_id=request.app_id, + project_id=request.project_id, + filename=request.filename, + mime_type=request.mime_type, + content_type=request.content_type, + size_bytes=request.size_bytes, + sha256=request.sha256, + source_uri=request.source_uri, + ingest_uri=request.ingest_uri, + title=request.title, + description=request.description, + ) + @router.delete("/resources/{resource_id}") async def delete_resource( resource_id: str, @@ -412,6 +456,11 @@ def create_app( project_id=request.project_id, messages=[message.model_dump() for message in request.messages], ) + except httpx.HTTPStatusError as exc: + raise HTTPException( + status_code=exc.response.status_code, + detail=_backend_http_error_detail(exc), + ) from exc except UploadTooLarge as exc: raise HTTPException(status_code=413, detail=str(exc)) from exc except InvalidAttachment as exc: diff --git a/core/service.py b/core/service.py index 086b545..9e62a82 100644 --- a/core/service.py +++ b/core/service.py @@ -51,6 +51,17 @@ def infer_content_type(filename: str | None, mime_type: str | None) -> str: return "doc" +def normalize_content_type( + filename: str | None, + mime_type: str | None, + content_type: str | None, +) -> str: + requested = (content_type or "").strip().lower() + if requested in {"image", "audio", "pdf", "html", "text", "doc"}: + return requested + return infer_content_type(filename, mime_type or requested) + + def _safe_filename(filename: str | None) -> str: name = Path(filename or "upload.bin").name return name or "upload.bin" @@ -301,6 +312,123 @@ class MemoryGatewayService: item["base64"] = base64.b64encode(content).decode("ascii") return item + async def register_external_resource( + self, + *, + user_id: str, + app_id: str, + project_id: str, + filename: str, + mime_type: str | None, + content_type: str | None, + size_bytes: int | None, + sha256: str | None, + source_uri: str, + ingest_uri: str | None, + title: str | None, + description: str | None, + ) -> dict[str, Any]: + resource_id = new_resource_id() + session_id = resource_session_id(user_id, resource_id) + original_filename = _safe_filename(filename) + normalized_content_type = normalize_content_type( + original_filename, + mime_type, + content_type, + ) + existing = None + if sha256: + existing = self.repository.find_active_resource_by_sha256( + user_id=user_id, + app_id=app_id, + project_id=project_id, + sha256=sha256, + ) + if existing is not None: + self._register_resource_attachment(existing, source="external_resource") + return self._resource_summary(existing) + + resource = self.repository.create_resource( + id=resource_id, + user_id=user_id, + app_id=app_id, + project_id=project_id, + session_id=session_id, + original_filename=original_filename, + mime_type=mime_type, + content_type=normalized_content_type, + uri=source_uri, + uri_public=False, + sha256=sha256, + size_bytes=size_bytes, + title=title, + description=description, + status="ingesting", + error_message=None, + ) + self._register_resource_attachment(resource, source="external_resource") + + try: + await self._retry_backend_call( + lambda: self.backend_client.add_memory( + self._build_external_add_payload( + resource=resource, + user_id=user_id, + app_id=app_id, + project_id=project_id, + filename=original_filename, + ingest_uri=ingest_uri or source_uri, + ) + ) + ) + await self._retry_backend_call( + lambda: self.backend_client.flush_memory(session_id, app_id, project_id) + ) + except Exception as exc: + failed = self.repository.update_resource_status( + resource_id, + "failed", + str(exc), + ) + return self._resource_summary(failed or resource) + + extracted = self.repository.update_resource_status(resource_id, "extracted") + return self._resource_summary(extracted or resource) + + def _build_external_add_payload( + self, + *, + resource: dict[str, Any], + user_id: str, + app_id: str, + project_id: str, + filename: str, + ingest_uri: str, + ) -> dict[str, Any]: + content_item = { + "type": str(resource["content_type"]), + "name": filename, + "ext": Path(filename).suffix.lstrip(".") or None, + "uri": ingest_uri, + "extras": { + "resource_id": resource["id"], + "source": "external_resource", + }, + } + return { + "session_id": resource["session_id"], + "app_id": app_id, + "project_id": project_id, + "messages": [ + { + "sender_id": user_id, + "role": "user", + "timestamp": current_timestamp_ms(), + "content": [content_item], + } + ], + } + def _resource_file_path(self, resource: dict[str, Any]) -> Path: uri = str(resource["uri"]) parsed = urlparse(uri) @@ -489,7 +617,12 @@ class MemoryGatewayService: raise return {"session_id": session_id, "backend": backend} - def _register_resource_attachment(self, resource: dict[str, Any]) -> None: + def _register_resource_attachment( + self, + resource: dict[str, Any], + *, + source: str = "resource_upload", + ) -> None: self.repository.create_attachment( user_id=resource["user_id"], app_id=resource["app_id"], @@ -499,7 +632,7 @@ class MemoryGatewayService: content_type=resource["content_type"], name=resource["original_filename"] or resource["id"], internal_uri=resource["uri"], - source="resource_upload", + source=source, sha256=resource["sha256"], ) diff --git a/tests/test_gateway.py b/tests/test_gateway.py index 2215469..92e7799 100644 --- a/tests/test_gateway.py +++ b/tests/test_gateway.py @@ -440,6 +440,61 @@ async def test_upload_resource_creates_attachment_mapping( assert attachments[0]["source"] == "resource_upload" +@pytest.mark.asyncio +async def test_register_external_resource_does_not_copy_file_and_uses_ingest_uri( + config: GatewayConfig, + repo: MemoryRepository, +) -> None: + backend = FakeBackendClient() + async with app_client(config, backend) as client: + user_key = await create_user(client) + response = await client.post( + "/resources/external", + json={ + "user_id": "u_123", + "user_key": user_key, + "app_id": "default", + "project_id": "default", + "filename": "chart.png", + "mime_type": "image/png", + "content_type": "image", + "size_bytes": 9, + "sha256": "sha-chart", + "source_uri": "minio://beaver-user-files/users/u_123/outputs/chart.png", + "ingest_uri": "http://minio.local/presigned/chart.png", + "title": "Chart", + "description": "Generated chart", + }, + ) + + assert response.status_code == 200, response.text + body = response.json() + resource_id = body["resource_id"] + assert body["session_id"] == f"resource:u_123:{resource_id}" + assert body["uri"] == f"resource://u_123/{resource_id}" + assert body["status"] == "extracted" + assert not config.storage_dir.exists() + + resource = repo.get_resource(resource_id) + assert resource is not None + assert resource["uri"] == "minio://beaver-user-files/users/u_123/outputs/chart.png" + assert resource["uri_public"] == 0 + assert resource["sha256"] == "sha-chart" + assert resource["size_bytes"] == 9 + + content = backend.add_calls[0]["messages"][0]["content"][0] + assert content["type"] == "image" + assert content["uri"] == "http://minio.local/presigned/chart.png" + assert content["name"] == "chart.png" + assert content["extras"] == {"resource_id": resource_id, "source": "external_resource"} + assert "base64" not in content + + attachments = repo.list_attachments_for_session("u_123", body["session_id"]) + assert len(attachments) == 1 + assert attachments[0]["internal_uri"] == "minio://beaver-user-files/users/u_123/outputs/chart.png" + assert attachments[0]["source"] == "external_resource" + + @pytest.mark.asyncio async def test_upload_resource_uses_current_timestamp( config: GatewayConfig, @@ -704,6 +759,59 @@ async def test_add_memory_forwards_multimodal_payload_to_backend( ] +@pytest.mark.asyncio +async def test_add_memory_propagates_backend_http_error( + config: GatewayConfig, +) -> None: + backend = FakeBackendClient() + request = httpx.Request("POST", "http://backend.test/api/v1/memory/add") + response = httpx.Response( + 415, + request=request, + json={"error": {"message": "LibreOffice conversion failed"}}, + ) + + async def fail_add_memory(payload: dict[str, Any]) -> dict[str, Any]: + backend.add_calls.append(payload) + raise httpx.HTTPStatusError( + "Client error '415 Unsupported Media Type'", + request=request, + response=response, + ) + + backend.add_memory = fail_add_memory # type: ignore[method-assign] + async with app_client(config, backend) as client: + user_key = await create_user(client) + gateway_response = await client.post( + "/memories/add", + json={ + "user_id": "u_123", + "user_key": user_key, + "session_id": "chat:c_backend_415", + "messages": [ + { + "sender_id": "u_123", + "role": "user", + "timestamp": 1234567890123, + "content": [ + { + "type": "doc", + "uri": "file:///tmp/bad.doc", + "ext": "doc", + "name": "bad.doc", + } + ], + } + ], + }, + ) + + assert gateway_response.status_code == 415 + assert gateway_response.json()["detail"] == { + "error": {"message": "LibreOffice conversion failed"} + } + + @pytest.mark.asyncio async def test_add_memory_creates_uri_attachment_mapping( config: GatewayConfig,