from __future__ import annotations from typing import Any from fastapi import FastAPI, Header, HTTPException from fastapi.responses import JSONResponse from external_connector.models import ConnectorSessionRequest, SendRequest from external_connector.providers.base import ConnectorProvider def create_app(*, provider: ConnectorProvider, api_token: str) -> FastAPI: app = FastAPI(title="External Connector") def require_auth(authorization: str | None) -> None: if not api_token: raise HTTPException(status_code=503, detail="Connector API token is not configured") if authorization != f"Bearer {api_token}": raise HTTPException(status_code=401, detail="Invalid connector token") @app.get("/health") def health() -> dict[str, Any]: return provider.health() @app.get("/connectors") def connectors(authorization: str | None = Header(default=None)) -> list[dict[str, Any]]: require_auth(authorization) return provider.connectors() @app.post("/connector-sessions") def start_session( payload: ConnectorSessionRequest, authorization: str | None = Header(default=None), ) -> dict[str, Any]: require_auth(authorization) return provider.start_session(payload.model_dump(by_alias=True)) @app.get("/connector-sessions/{session_id}") def get_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: require_auth(authorization) try: return provider.get_session(session_id) except KeyError: raise HTTPException(status_code=404, detail="Connector session not found") @app.post("/connector-sessions/{session_id}/cancel") def cancel_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: require_auth(authorization) provider.cancel_session(session_id) return {"ok": True} @app.post("/connections/{connection_id}/logout") def logout(connection_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: require_auth(authorization) provider.logout(connection_id) return {"ok": True} @app.post("/send", response_model=None) def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> JSONResponse | dict[str, Any]: require_auth(authorization) result = dict(provider.send(payload.model_dump(by_alias=True))) status_code = int(result.pop("httpStatus", 200)) if status_code != 200: return JSONResponse(status_code=status_code, content=result) return result if hasattr(provider, "handle_event"): @app.post("/feishu/events", response_model=None) def feishu_events(payload: dict[str, Any]) -> JSONResponse | dict[str, Any]: result = dict(provider.handle_event(payload)) # type: ignore[attr-defined] status_code = int(result.pop("httpStatus", 200)) if status_code != 200: return JSONResponse(status_code=status_code, content=result) return result return app