Add external resource registration

This commit is contained in:
2026-06-22 13:19:18 +08:00
parent e5cd87789f
commit 12c767cd68
4 changed files with 496 additions and 11 deletions

213
README.md
View File

@ -10,6 +10,8 @@ Memory Gateway 是一个轻量级 FastAPI 服务,用于在上游记忆服务
## 功能范围
- 上传用户资源文件、图片、音频、PDF、HTML、普通文档、纯文本。
- 登记外部资源:文件真实保存在 Beaver user_files/MinIO 等外部存储,
Gateway 只保存元数据和 URI 映射,不保存原始文件副本。
- 保存资源元数据到 SQLite。
- 为每个资源生成独立的上游记忆服务 `session_id`
- 调用上游记忆服务的 `add``flush` 完成资源记忆摄入,并对临时失败做轻量重试。
@ -285,6 +287,83 @@ curl -X POST http://127.0.0.1:8010/resources \
资源上传接口返回的 `uri` 始终是 `resource://{user_id}/{resource_id}`。按文件名
命中的记忆搜索结果会另外通过 `attachments[].internal_uri` 返回真实 URI。
### 3b. 登记外部资源
```http
POST /resources/external
Content-Type: application/json
```
当文件已经保存在 Beaver user_files/MinIO 等外部存储时,调用该接口登记资源。
Gateway 不会保存原始文件副本,也不会下载 `source_uri``ingest_uri`;它只写入
SQLite 元数据、建立附件映射,并把 `ingest_uri` 作为 content item URI 传给上游
记忆服务完成解析和索引。
请求参数:
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| `user_id` | string | 是 | 无 | 用户 ID |
| `user_key` | string | 是 | 无 | 用户 key |
| `app_id` | string | 否 | `default` | 上游记忆服务 app scope |
| `project_id` | string | 否 | `default` | 上游记忆服务 project scope |
| `filename` | string | 是 | 无 | 文件名,用于检索附件匹配和上游解析 |
| `mime_type` | string | 否 | `null` | MIME 类型,如 `image/png``application/pdf` |
| `content_type` | string | 否 | 自动推断 | 上游 content type`image``audio``pdf``html``text``doc`。如果传入 MIME 类型Gateway 会按 MIME/扩展名重新推断 |
| `size_bytes` | integer | 否 | `null` | 文件大小 |
| `sha256` | string | 否 | `null` | 文件内容 sha256同用户同 app/project 下相同 sha256 的活跃资源会复用 |
| `source_uri` | string | 是 | 无 | 长期真实映射 URI`minio://bucket/users/u_123/outputs/chart.png` 或本地开发的 `file://...` |
| `ingest_uri` | string | 否 | `source_uri` | 上游记忆服务当次摄入可读取的 URI。MinIO 场景推荐传短期 presigned GET URL |
| `title` | string | 否 | `null` | 资源标题 |
| `description` | string | 否 | `null` | 资源描述 |
处理流程:
1. 生成 `resource_id`
2. 生成 `session_id = resource:{user_id}:{resource_id}`
3. 写入 `user_resources``uri = source_uri`,状态为 `ingesting`
4. 写入 `memory_attachments``internal_uri = source_uri``source = external_resource`
5. 构造上游 content item`{"type": "...", "name": filename, "uri": ingest_uri, "extras": {"resource_id": "...", "source": "external_resource"}}`
6. 调用上游记忆服务 `/api/v1/memory/add`
7. 调用上游记忆服务 `/api/v1/memory/flush`
8. 成功后状态改为 `extracted`,失败后状态改为 `failed` 并记录错误。
请求示例:
```bash
curl -X POST http://127.0.0.1:8010/resources/external \
-H 'Content-Type: application/json' \
-d '{
"user_id": "u_123",
"user_key": "uk_xxx",
"app_id": "default",
"project_id": "default",
"filename": "chart.png",
"mime_type": "image/png",
"size_bytes": 12345,
"sha256": "abc123...",
"source_uri": "minio://beaver-user-files/users/u_123/outputs/chart.png",
"ingest_uri": "https://minio.example/presigned/chart.png",
"title": "chart.png",
"description": "Beaver user file outputs/chart.png"
}'
```
响应示例:
```json
{
"resource_id": "r_xxx",
"session_id": "resource:u_123:r_xxx",
"uri": "resource://u_123/r_xxx",
"status": "extracted"
}
```
`source_uri` 是 Gateway 长期保存和搜索结果附件映射使用的真实 URI
`ingest_uri` 只用于本次上游解析。不要把短期 presigned URL 作为 `source_uri`
长期入库。
### 4. 查询资源列表
```http
@ -426,7 +505,121 @@ curl -X DELETE "http://127.0.0.1:8010/resources/r_xxx?user_id=u_123&user_key=uk_
}
```
### 7. 搜索记忆
### 7. 添加记忆
```http
POST /memories/add
Content-Type: application/json
```
该接口把一批消息追加到指定 `session_id`,并调用上游记忆服务
`/api/v1/memory/add`。如果消息 content item 中包含 `uri``base64` 附件,
Gateway 会额外登记附件映射,供后续 `/memories/search` 返回
`attachments[].internal_uri`
请求参数:
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| `user_id` | string | 是 | 无 | 用户 ID |
| `user_key` | string | 是 | 无 | 用户 key |
| `session_id` | string | 是 | 无 | 记忆 session`chat:{conversation_id}``resource:{user_id}:{resource_id}``memory_edit:{user_id}` |
| `messages` | object[] | 是 | 无 | 追加到该 session 的消息数组,至少 1 条 |
| `app_id` | string | 否 | `default` | 上游记忆服务 app scope |
| `project_id` | string | 否 | `default` | 上游记忆服务 project scope |
`messages[]` 字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| `sender_id` | string | 是 | 发送方 ID通常为当前用户 ID |
| `role` | string | 是 | `user``assistant``tool` |
| `timestamp` | integer | 是 | Unix 毫秒时间戳,必须大于 0 |
| `content` | string 或 object[] | 是 | 纯文本,或上游记忆服务 content item 数组 |
常见 content item
```json
{"type": "text", "text": "用户喜欢简洁的中文回答"}
```
```json
{
"type": "image",
"uri": "file:///home/tom/memory-gateway/tests/simple-multimodal-image.png",
"ext": "png",
"name": "simple-multimodal-image.png"
}
```
```json
{
"type": "audio",
"base64": "BASE64_DATA",
"ext": "wav",
"name": "tone.wav"
}
```
处理流程:
1. 校验 `user_id``user_key`
2. 检查 `messages` 基本结构。
3. 遍历 object[] content item`uri` 时登记该 URI只有 `base64` 时先校验并保存到 `MEMORY_GATEWAY_STORAGE_DIR/{user_id}/memory_attachments/{sha256}/`,再登记生成的 `file://` URI。
4. 将原始 `session_id``app_id``project_id``messages` 原样转发给上游记忆服务 `/api/v1/memory/add`
5. 上游 add 成功后写入附件映射;失败时不会保留本次新生成的 base64 附件文件。
请求示例:
```bash
curl -X POST http://127.0.0.1:8010/memories/add \
-H 'Content-Type: application/json' \
-d '{
"user_id": "u_123",
"user_key": "uk_xxx",
"session_id": "chat:c_456",
"app_id": "default",
"project_id": "default",
"messages": [
{
"sender_id": "u_123",
"role": "user",
"timestamp": 1781172177000,
"content": [
{
"type": "text",
"text": "记住:我偏好 concise 的中文说明"
},
{
"type": "image",
"uri": "file:///home/tom/memory-gateway/tests/simple-multimodal-image.png",
"ext": "png",
"name": "simple-multimodal-image.png"
}
]
}
]
}'
```
响应示例:
```json
{
"session_id": "chat:c_456",
"backend": {
"request_id": "add",
"data": {
"status": "accumulated"
}
}
}
```
`/memories/add` 只负责追加消息。需要让上游记忆服务对该 session 完成抽取和索引时,
继续调用 `/memories/flush`
### 8. 搜索记忆
```http
POST /memories/search
@ -506,17 +699,19 @@ curl -X POST http://127.0.0.1:8010/memories/search \
附件路径映射规则:
1. `/resources` 上传成功后,将资源真实 URI 与资源 session 写入
1. `/resources` 上传成功后,将 Gateway 本地保存的真实 URI 与资源 session 写入
`memory_attachments`。数据库初始化会自动回填已有 `user_resources`
2. `/memories/add` 中含 `uri` 的 content item 会直接登记 URI。
3. `/memories/add` 中只有 `base64` 的 content item 会保存到
2. `/resources/external` 登记成功后,将外部存储的 `source_uri` 与资源 session
写入 `memory_attachments`,但 Gateway 不保存原始文件副本。
3. `/memories/add` 中含 `uri` 的 content item 会直接登记 URI。
4. `/memories/add` 中只有 `base64` 的 content item 会保存到
`MEMORY_GATEWAY_STORAGE_DIR/{user_id}/memory_attachments/{sha256}/`,再登记
生成的 `file://` URI。相同用户、session、文件名和内容的重试会复用路径。
4. 搜索时根据当前用户和结果 `session_id` 查询附件,递归检查 `raw` 中的字符串
5. 搜索时根据当前用户和结果 `session_id` 查询附件,递归检查 `raw` 中的字符串
值。只有完整文件名出现时才返回对应附件;匹配不区分大小写。
5. `raw` 中键名为 `base64` 的内容不会参与匹配。未匹配时返回
6. `raw` 中键名为 `base64` 的内容不会参与匹配。未匹配时返回
`"attachments": []`
6. 历史 `/memories/add` 请求未保存在 Gateway 数据库中,无法自动补录映射;新
7. 历史 `/memories/add` 请求未保存在 Gateway 数据库中,无法自动补录映射;新
版本上线后的请求会建立映射。
`attachments[].internal_uri` 会按配置和调用方输入直接暴露服务器真实 URI调用
@ -553,7 +748,7 @@ curl -X POST http://127.0.0.1:8010/memories/search \
}
```
### 8. 修改记忆
### 9. 修改记忆
```http
PATCH /memories/{memory_id}
@ -594,7 +789,7 @@ curl -X PATCH http://127.0.0.1:8010/memories/mem_abc \
}
```
### 9. 删除记忆
### 10. 删除记忆
```http
DELETE /memories/{memory_id}

View File

@ -7,6 +7,7 @@ from datetime import datetime, timezone
from typing import Any, Literal
from urllib.parse import parse_qsl, quote, urlsplit, urlunsplit
import httpx
from fastapi import APIRouter, FastAPI, File, Form, HTTPException, Request, UploadFile
from pydantic import BaseModel, Field, field_validator
from starlette.responses import Response
@ -86,6 +87,22 @@ class FlushMemoryRequest(BaseModel):
project_id: str = "default"
class ExternalResourceRequest(BaseModel):
user_id: str = Field(min_length=1)
user_key: str = Field(min_length=1)
app_id: str = "default"
project_id: str = "default"
filename: str = Field(min_length=1)
mime_type: str | None = None
content_type: str | None = None
size_bytes: int | None = Field(default=None, ge=0)
sha256: str | None = None
source_uri: str = Field(min_length=1)
ingest_uri: str | None = None
title: str | None = None
description: str | None = None
class MemoryOverrideRequest(BaseModel):
user_id: str = Field(min_length=1)
user_key: str = Field(min_length=1)
@ -196,6 +213,13 @@ def _body_for_log(body: bytes, content_type: str | None) -> Any:
return {"content_type": content_type, "size_bytes": len(body)}
def _backend_http_error_detail(exc: httpx.HTTPStatusError) -> Any:
try:
return exc.response.json()
except ValueError:
return exc.response.text
def create_app(
*,
config: GatewayConfig | None = None,
@ -366,6 +390,26 @@ def create_app(
return {"resources": []}
return {"resources": [resource]}
@router.post("/resources/external")
async def register_external_resource(
request: ExternalResourceRequest,
) -> dict[str, Any]:
require_user(request.user_id, request.user_key)
return await service.register_external_resource(
user_id=request.user_id,
app_id=request.app_id,
project_id=request.project_id,
filename=request.filename,
mime_type=request.mime_type,
content_type=request.content_type,
size_bytes=request.size_bytes,
sha256=request.sha256,
source_uri=request.source_uri,
ingest_uri=request.ingest_uri,
title=request.title,
description=request.description,
)
@router.delete("/resources/{resource_id}")
async def delete_resource(
resource_id: str,
@ -412,6 +456,11 @@ def create_app(
project_id=request.project_id,
messages=[message.model_dump() for message in request.messages],
)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code,
detail=_backend_http_error_detail(exc),
) from exc
except UploadTooLarge as exc:
raise HTTPException(status_code=413, detail=str(exc)) from exc
except InvalidAttachment as exc:

View File

@ -51,6 +51,17 @@ def infer_content_type(filename: str | None, mime_type: str | None) -> str:
return "doc"
def normalize_content_type(
filename: str | None,
mime_type: str | None,
content_type: str | None,
) -> str:
requested = (content_type or "").strip().lower()
if requested in {"image", "audio", "pdf", "html", "text", "doc"}:
return requested
return infer_content_type(filename, mime_type or requested)
def _safe_filename(filename: str | None) -> str:
name = Path(filename or "upload.bin").name
return name or "upload.bin"
@ -301,6 +312,123 @@ class MemoryGatewayService:
item["base64"] = base64.b64encode(content).decode("ascii")
return item
async def register_external_resource(
self,
*,
user_id: str,
app_id: str,
project_id: str,
filename: str,
mime_type: str | None,
content_type: str | None,
size_bytes: int | None,
sha256: str | None,
source_uri: str,
ingest_uri: str | None,
title: str | None,
description: str | None,
) -> dict[str, Any]:
resource_id = new_resource_id()
session_id = resource_session_id(user_id, resource_id)
original_filename = _safe_filename(filename)
normalized_content_type = normalize_content_type(
original_filename,
mime_type,
content_type,
)
existing = None
if sha256:
existing = self.repository.find_active_resource_by_sha256(
user_id=user_id,
app_id=app_id,
project_id=project_id,
sha256=sha256,
)
if existing is not None:
self._register_resource_attachment(existing, source="external_resource")
return self._resource_summary(existing)
resource = self.repository.create_resource(
id=resource_id,
user_id=user_id,
app_id=app_id,
project_id=project_id,
session_id=session_id,
original_filename=original_filename,
mime_type=mime_type,
content_type=normalized_content_type,
uri=source_uri,
uri_public=False,
sha256=sha256,
size_bytes=size_bytes,
title=title,
description=description,
status="ingesting",
error_message=None,
)
self._register_resource_attachment(resource, source="external_resource")
try:
await self._retry_backend_call(
lambda: self.backend_client.add_memory(
self._build_external_add_payload(
resource=resource,
user_id=user_id,
app_id=app_id,
project_id=project_id,
filename=original_filename,
ingest_uri=ingest_uri or source_uri,
)
)
)
await self._retry_backend_call(
lambda: self.backend_client.flush_memory(session_id, app_id, project_id)
)
except Exception as exc:
failed = self.repository.update_resource_status(
resource_id,
"failed",
str(exc),
)
return self._resource_summary(failed or resource)
extracted = self.repository.update_resource_status(resource_id, "extracted")
return self._resource_summary(extracted or resource)
def _build_external_add_payload(
self,
*,
resource: dict[str, Any],
user_id: str,
app_id: str,
project_id: str,
filename: str,
ingest_uri: str,
) -> dict[str, Any]:
content_item = {
"type": str(resource["content_type"]),
"name": filename,
"ext": Path(filename).suffix.lstrip(".") or None,
"uri": ingest_uri,
"extras": {
"resource_id": resource["id"],
"source": "external_resource",
},
}
return {
"session_id": resource["session_id"],
"app_id": app_id,
"project_id": project_id,
"messages": [
{
"sender_id": user_id,
"role": "user",
"timestamp": current_timestamp_ms(),
"content": [content_item],
}
],
}
def _resource_file_path(self, resource: dict[str, Any]) -> Path:
uri = str(resource["uri"])
parsed = urlparse(uri)
@ -489,7 +617,12 @@ class MemoryGatewayService:
raise
return {"session_id": session_id, "backend": backend}
def _register_resource_attachment(self, resource: dict[str, Any]) -> None:
def _register_resource_attachment(
self,
resource: dict[str, Any],
*,
source: str = "resource_upload",
) -> None:
self.repository.create_attachment(
user_id=resource["user_id"],
app_id=resource["app_id"],
@ -499,7 +632,7 @@ class MemoryGatewayService:
content_type=resource["content_type"],
name=resource["original_filename"] or resource["id"],
internal_uri=resource["uri"],
source="resource_upload",
source=source,
sha256=resource["sha256"],
)

View File

@ -440,6 +440,61 @@ async def test_upload_resource_creates_attachment_mapping(
assert attachments[0]["source"] == "resource_upload"
@pytest.mark.asyncio
async def test_register_external_resource_does_not_copy_file_and_uses_ingest_uri(
config: GatewayConfig,
repo: MemoryRepository,
) -> None:
backend = FakeBackendClient()
async with app_client(config, backend) as client:
user_key = await create_user(client)
response = await client.post(
"/resources/external",
json={
"user_id": "u_123",
"user_key": user_key,
"app_id": "default",
"project_id": "default",
"filename": "chart.png",
"mime_type": "image/png",
"content_type": "image",
"size_bytes": 9,
"sha256": "sha-chart",
"source_uri": "minio://beaver-user-files/users/u_123/outputs/chart.png",
"ingest_uri": "http://minio.local/presigned/chart.png",
"title": "Chart",
"description": "Generated chart",
},
)
assert response.status_code == 200, response.text
body = response.json()
resource_id = body["resource_id"]
assert body["session_id"] == f"resource:u_123:{resource_id}"
assert body["uri"] == f"resource://u_123/{resource_id}"
assert body["status"] == "extracted"
assert not config.storage_dir.exists()
resource = repo.get_resource(resource_id)
assert resource is not None
assert resource["uri"] == "minio://beaver-user-files/users/u_123/outputs/chart.png"
assert resource["uri_public"] == 0
assert resource["sha256"] == "sha-chart"
assert resource["size_bytes"] == 9
content = backend.add_calls[0]["messages"][0]["content"][0]
assert content["type"] == "image"
assert content["uri"] == "http://minio.local/presigned/chart.png"
assert content["name"] == "chart.png"
assert content["extras"] == {"resource_id": resource_id, "source": "external_resource"}
assert "base64" not in content
attachments = repo.list_attachments_for_session("u_123", body["session_id"])
assert len(attachments) == 1
assert attachments[0]["internal_uri"] == "minio://beaver-user-files/users/u_123/outputs/chart.png"
assert attachments[0]["source"] == "external_resource"
@pytest.mark.asyncio
async def test_upload_resource_uses_current_timestamp(
config: GatewayConfig,
@ -704,6 +759,59 @@ async def test_add_memory_forwards_multimodal_payload_to_backend(
]
@pytest.mark.asyncio
async def test_add_memory_propagates_backend_http_error(
config: GatewayConfig,
) -> None:
backend = FakeBackendClient()
request = httpx.Request("POST", "http://backend.test/api/v1/memory/add")
response = httpx.Response(
415,
request=request,
json={"error": {"message": "LibreOffice conversion failed"}},
)
async def fail_add_memory(payload: dict[str, Any]) -> dict[str, Any]:
backend.add_calls.append(payload)
raise httpx.HTTPStatusError(
"Client error '415 Unsupported Media Type'",
request=request,
response=response,
)
backend.add_memory = fail_add_memory # type: ignore[method-assign]
async with app_client(config, backend) as client:
user_key = await create_user(client)
gateway_response = await client.post(
"/memories/add",
json={
"user_id": "u_123",
"user_key": user_key,
"session_id": "chat:c_backend_415",
"messages": [
{
"sender_id": "u_123",
"role": "user",
"timestamp": 1234567890123,
"content": [
{
"type": "doc",
"uri": "file:///tmp/bad.doc",
"ext": "doc",
"name": "bad.doc",
}
],
}
],
},
)
assert gateway_response.status_code == 415
assert gateway_response.json()["detail"] == {
"error": {"message": "LibreOffice conversion failed"}
}
@pytest.mark.asyncio
async def test_add_memory_creates_uri_attachment_mapping(
config: GatewayConfig,