Files
beaver_project/app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py

40 lines
1.7 KiB
Python

"""HTTP client for the generic external connector sidecar."""
from __future__ import annotations
from typing import Any
import httpx
class ConnectorSidecarClient:
def __init__(self, *, base_url: str, token: str, timeout_seconds: float = 20.0) -> None:
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_seconds = float(timeout_seconds)
async def get_connectors(self) -> list[dict[str, Any]]:
return await self._request("GET", "/connectors")
async def start_session(self, payload: dict[str, Any]) -> dict[str, Any]:
return await self._request("POST", "/connector-sessions", json=payload)
async def get_session(self, session_id: str) -> dict[str, Any]:
return await self._request("GET", f"/connector-sessions/{session_id}")
async def cancel_session(self, session_id: str) -> dict[str, Any]:
return await self._request("POST", f"/connector-sessions/{session_id}/cancel", json={})
async def logout(self, connection_id: str) -> dict[str, Any]:
return await self._request("POST", f"/connections/{connection_id}/logout", json={})
async def send(self, payload: dict[str, Any]) -> dict[str, Any]:
return await self._request("POST", "/send", json=payload)
async def _request(self, method: str, path: str, *, json: dict[str, Any] | None = None) -> Any:
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
async with httpx.AsyncClient(timeout=self.timeout_seconds) as client:
response = await client.request(method, f"{self.base_url}{path}", json=json, headers=headers)
response.raise_for_status()
return response.json()