Files
memory-gateway/tests/test_everos_integration.py

81 lines
2.2 KiB
Python

from __future__ import annotations
import os
from pathlib import Path
from uuid import uuid4
import httpx
import pytest
from core.api import create_app
from core.config import GatewayConfig
from core.everos_client import EverOSClient
pytestmark = pytest.mark.integration
def _integration_enabled() -> bool:
return os.environ.get("RUN_EVEROS_INTEGRATION") == "1"
def _ingest_integration_enabled() -> bool:
return os.environ.get("RUN_EVEROS_INGEST_INTEGRATION") == "1"
def _everos_base_url() -> str:
return os.environ.get("EVEROS_BASE_URL", "http://127.0.0.1:1995")
@pytest.mark.skipif(
not _integration_enabled(),
reason="set RUN_EVEROS_INTEGRATION=1 to run against a real EverOS service",
)
@pytest.mark.asyncio
async def test_real_everos_health_check() -> None:
client = EverOSClient(_everos_base_url(), timeout=10)
health = await client.health_check()
assert isinstance(health, dict)
@pytest.mark.skipif(
not _ingest_integration_enabled(),
reason=(
"set RUN_EVEROS_INGEST_INTEGRATION=1 to run real EverOS add/flush ingestion"
),
)
@pytest.mark.asyncio
async def test_gateway_uploads_text_resource_to_real_everos(tmp_path: Path) -> None:
config = GatewayConfig(
everos_base_url=_everos_base_url(),
database_path=tmp_path / "gateway.sqlite3",
storage_dir=tmp_path / "storage",
everos_ingest_attempts=1,
everos_timeout_seconds=30,
)
app = create_app(config=config)
transport = httpx.ASGITransport(app=app)
user_id = f"it_{uuid4().hex}"
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
created_user = await client.post("/users", json={"user_id": user_id})
assert created_user.status_code == 200, created_user.text
user_key = created_user.json()["user_key"]
uploaded = await client.post(
"/resources",
data={"user_id": user_id, "user_key": user_key},
files={
"file": (
"real-everos.txt",
b"real everos integration",
"text/plain",
)
},
)
assert uploaded.status_code == 200, uploaded.text
assert uploaded.json()["status"] == "extracted"