from __future__ import annotations from dataclasses import dataclass from typing import Any import httpx @dataclass(frozen=True) class BackendRegistrationResult: backend_id: str client_id: str client_secret: str created_at: str frontend_base_url: str | None = None class AuthzClient: def __init__(self, base_url: str, timeout_seconds: int = 10): self.base_url = base_url.rstrip("/") self.timeout_seconds = timeout_seconds async def _request( self, method: str, path: str, *, json_body: dict[str, Any] | None = None, headers: dict[str, str] | None = None, ) -> Any: # Internal AuthZ calls should not inherit shell proxy env vars. async with httpx.AsyncClient( timeout=self.timeout_seconds, follow_redirects=True, trust_env=False, ) as client: response = await client.request( method, f"{self.base_url}{path}", json=json_body, headers=headers, ) response.raise_for_status() if not response.content: return None return response.json() async def register_backend( self, *, name: str, base_url: str, frontend_base_url: str | None = None, backend_id: str | None = None, ) -> BackendRegistrationResult: payload = {"name": name, "base_url": base_url} if backend_id: payload["backend_id"] = backend_id if frontend_base_url: payload["frontend_base_url"] = frontend_base_url data = await self._request("POST", "/backends/register", json_body=payload) return BackendRegistrationResult( backend_id=str(data["backend_id"]), client_id=str(data["client_id"]), client_secret=str(data["client_secret"]), created_at=str(data["created_at"]), frontend_base_url=str(data.get("frontend_base_url") or "").strip() or None, ) async def register_user( self, *, username: str, password: str, email: str | None = None, backend_name: str | None = None, backend_id: str | None = None, base_url: str | None = None, frontend_base_url: str | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = { "username": username, "password": password, } if email: payload["email"] = email backend_payload: dict[str, Any] = {} if backend_name: payload["name"] = backend_name payload["backend_name"] = backend_name backend_payload["name"] = backend_name if backend_id: payload["backend_id"] = backend_id backend_payload["backend_id"] = backend_id if base_url: payload["base_url"] = base_url payload["public_base_url"] = base_url backend_payload["base_url"] = base_url if frontend_base_url: payload["frontend_base_url"] = frontend_base_url backend_payload["frontend_base_url"] = frontend_base_url if backend_payload: payload["backend"] = backend_payload data = await self._request("POST", "/oauth/register", json_body=payload) return data if isinstance(data, dict) else {} async def list_backends(self) -> list[dict[str, Any]]: data = await self._request("GET", "/backends") return data if isinstance(data, list) else [] async def get_backend(self, backend_id: str) -> dict[str, Any]: data = await self._request("GET", f"/backends/{backend_id}") return data if isinstance(data, dict) else {} async def update_backend( self, backend_id: str, *, name: str | None = None, base_url: str | None = None, frontend_base_url: str | None = None, ) -> dict[str, Any]: payload: dict[str, Any] = {} if name: payload["name"] = name if base_url: payload["base_url"] = base_url if frontend_base_url: payload["frontend_base_url"] = frontend_base_url data = await self._request("PUT", f"/backends/{backend_id}", json_body=payload) return data if isinstance(data, dict) else {} async def disable_backend(self, backend_id: str) -> dict[str, Any]: data = await self._request("POST", f"/backends/{backend_id}/disable") return data if isinstance(data, dict) else {} async def enable_backend(self, backend_id: str) -> dict[str, Any]: data = await self._request("POST", f"/backends/{backend_id}/enable") return data if isinstance(data, dict) else {} async def rotate_secret(self, backend_id: str) -> dict[str, Any]: data = await self._request("POST", f"/backends/{backend_id}/rotate-secret") return data if isinstance(data, dict) else {} async def get_permissions(self, backend_id: str) -> dict[str, Any]: data = await self._request("GET", f"/backends/{backend_id}/permissions") return data if isinstance(data, dict) else {} async def set_permissions(self, backend_id: str, payload: dict[str, Any]) -> dict[str, Any]: data = await self._request("POST", f"/backends/{backend_id}/permissions", json_body=payload) return data if isinstance(data, dict) else {} async def get_outlook_settings(self, backend_id: str) -> dict[str, Any]: data = await self._request("GET", f"/backends/{backend_id}/settings/outlook") return data if isinstance(data, dict) else {} async def set_outlook_settings(self, backend_id: str, payload: dict[str, Any]) -> dict[str, Any]: data = await self._request("POST", f"/backends/{backend_id}/settings/outlook", json_body=payload) return data if isinstance(data, dict) else {} async def delete_outlook_settings(self, backend_id: str) -> dict[str, Any]: data = await self._request("DELETE", f"/backends/{backend_id}/settings/outlook") return data if isinstance(data, dict) else {} async def list_channel_settings(self, backend_id: str) -> dict[str, Any]: data = await self._request("GET", f"/backends/{backend_id}/settings/channels") return data if isinstance(data, dict) else {} async def get_channel_settings(self, backend_id: str, channel_id: str) -> dict[str, Any]: data = await self._request("GET", f"/backends/{backend_id}/settings/channels/{channel_id}") return data if isinstance(data, dict) else {} async def set_channel_settings( self, backend_id: str, channel_id: str, payload: dict[str, Any], ) -> dict[str, Any]: data = await self._request( "POST", f"/backends/{backend_id}/settings/channels/{channel_id}", json_body=payload, ) return data if isinstance(data, dict) else {} async def delete_channel_settings(self, backend_id: str, channel_id: str) -> dict[str, Any]: data = await self._request("DELETE", f"/backends/{backend_id}/settings/channels/{channel_id}") return data if isinstance(data, dict) else {} async def issue_token( self, *, client_id: str, client_secret: str, audience: str, scopes: list[str], ) -> dict[str, Any]: data = await self._request( "POST", "/oauth/token", json_body={ "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "aud": audience, "scopes": scopes, }, ) return data if isinstance(data, dict) else {}