docs: harden external connector implementation plans
This commit is contained in:
@ -20,7 +20,7 @@ Included:
|
||||
- Production `VendorCliProvider` with environment-driven command templates.
|
||||
- Service-level bearer authentication for Beaver-to-sidecar requests.
|
||||
- Connector session state persistence.
|
||||
- `/send` idempotency by `connectionId + requestId`.
|
||||
- `/send` idempotency by `connectionId + requestId`, including processing TTL retry semantics.
|
||||
- Dockerfile and local compose declaration.
|
||||
|
||||
Excluded:
|
||||
@ -28,9 +28,20 @@ Excluded:
|
||||
- Beaver backend bridge implementation.
|
||||
- Frontend UI.
|
||||
- Hardcoded vendor command strings in repo files.
|
||||
- Accepting command strings from frontend or sidecar HTTP request bodies.
|
||||
- Docker socket access.
|
||||
- Dynamic container creation.
|
||||
|
||||
## Vendor Command Safety Contract
|
||||
|
||||
`VendorCliProvider` may execute vendor install/send commands because the sidecar is a controlled deployment container, but command execution has fixed boundaries:
|
||||
|
||||
- Command templates only come from sidecar startup environment variables.
|
||||
- No frontend or HTTP API payload can supply or override command strings.
|
||||
- `cwd` is fixed to `CONNECTOR_HOME`; per-connection state paths are passed as formatted arguments only.
|
||||
- Every command uses a hard timeout from `CONNECTOR_COMMAND_TIMEOUT_SECONDS`, defaulting to 120 seconds.
|
||||
- stdout and stderr are redacted before being stored or returned.
|
||||
|
||||
## File Structure
|
||||
|
||||
- Create `external-connector/pyproject.toml`
|
||||
@ -138,7 +149,31 @@ def test_state_store_dedupes_send_results(tmp_path) -> None:
|
||||
|
||||
assert first.should_send is True
|
||||
assert duplicate.should_send is False
|
||||
assert duplicate.status == "completed"
|
||||
assert duplicate.http_status == 200
|
||||
assert duplicate.provider_message_id == "provider-1"
|
||||
|
||||
|
||||
def test_state_store_returns_conflict_for_active_send_processing(tmp_path) -> None:
|
||||
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60)
|
||||
|
||||
store.begin_send(connection_id="conn_1", request_id="out_1")
|
||||
duplicate = store.begin_send(connection_id="conn_1", request_id="out_1")
|
||||
|
||||
assert duplicate.should_send is False
|
||||
assert duplicate.status == "processing"
|
||||
assert duplicate.http_status == 409
|
||||
assert duplicate.retry_after_seconds == 5
|
||||
|
||||
|
||||
def test_state_store_retries_stale_send_processing(tmp_path) -> None:
|
||||
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=0)
|
||||
|
||||
store.begin_send(connection_id="conn_1", request_id="out_1")
|
||||
retry = store.begin_send(connection_id="conn_1", request_id="out_1")
|
||||
|
||||
assert retry.should_send is True
|
||||
assert retry.status == "processing"
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Run tests to verify failure**
|
||||
@ -218,12 +253,16 @@ class ConnectorSessionState:
|
||||
class SendBeginResult:
|
||||
should_send: bool
|
||||
dedupe_key: str
|
||||
status: str
|
||||
http_status: int
|
||||
retry_after_seconds: int | None = None
|
||||
provider_message_id: str | None = None
|
||||
|
||||
|
||||
class SidecarStateStore:
|
||||
def __init__(self, path: Path) -> None:
|
||||
def __init__(self, path: Path, *, send_processing_ttl_seconds: int = 60) -> None:
|
||||
self.path = Path(path)
|
||||
self.send_processing_ttl_seconds = int(send_processing_ttl_seconds)
|
||||
self._lock = Lock()
|
||||
|
||||
def create_session(
|
||||
@ -277,8 +316,12 @@ class SidecarStateStore:
|
||||
with self._lock:
|
||||
data = self._load()
|
||||
existing = data["sends"].get(dedupe_key)
|
||||
if isinstance(existing, dict) and existing.get("status") == "completed":
|
||||
return SendBeginResult(False, dedupe_key, str(existing.get("provider_message_id") or ""))
|
||||
if isinstance(existing, dict):
|
||||
status = str(existing.get("status") or "processing")
|
||||
if status == "completed":
|
||||
return SendBeginResult(False, dedupe_key, "completed", 200, None, str(existing.get("provider_message_id") or ""))
|
||||
if status == "processing" and not self._send_is_stale(existing):
|
||||
return SendBeginResult(False, dedupe_key, "processing", 409, 5)
|
||||
data["sends"][dedupe_key] = {
|
||||
"connection_id": connection_id,
|
||||
"request_id": request_id,
|
||||
@ -286,7 +329,7 @@ class SidecarStateStore:
|
||||
"updated_at": iso_now(),
|
||||
}
|
||||
self._save(data)
|
||||
return SendBeginResult(True, dedupe_key)
|
||||
return SendBeginResult(True, dedupe_key, "processing", 200)
|
||||
|
||||
def complete_send(self, dedupe_key: str, *, provider_message_id: str | None) -> None:
|
||||
with self._lock:
|
||||
@ -296,6 +339,11 @@ class SidecarStateStore:
|
||||
data["sends"][dedupe_key] = item
|
||||
self._save(data)
|
||||
|
||||
def _send_is_stale(self, item: dict[str, Any]) -> bool:
|
||||
updated_at = str(item.get("updated_at") or iso_now())
|
||||
updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
|
||||
return (datetime.now(timezone.utc) - updated).total_seconds() >= self.send_processing_ttl_seconds
|
||||
|
||||
def _load(self) -> dict[str, Any]:
|
||||
if not self.path.exists():
|
||||
return {"sessions": {}, "sends": {}}
|
||||
@ -565,6 +613,8 @@ class FakeProvider:
|
||||
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"]))
|
||||
if not begin.should_send:
|
||||
if begin.http_status == 409:
|
||||
return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409}
|
||||
return {"ok": True, "providerMessageId": begin.provider_message_id}
|
||||
provider_message_id = f"fake_{uuid4().hex}"
|
||||
self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id)
|
||||
@ -655,6 +705,31 @@ def test_sidecar_http_api_session_and_send(tmp_path) -> None:
|
||||
assert session.status_code == 200
|
||||
assert loaded.json()["sessionId"] == session_id
|
||||
assert sent.json()["ok"] is True
|
||||
|
||||
|
||||
def test_sidecar_http_api_returns_conflict_for_processing_send(tmp_path) -> None:
|
||||
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60)
|
||||
store.begin_send(connection_id="conn_1", request_id="out_1")
|
||||
app = create_app(provider=FakeProvider(store), api_token="sidecar-token")
|
||||
headers = {"Authorization": "Bearer sidecar-token"}
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.post(
|
||||
"/send",
|
||||
headers=headers,
|
||||
json={
|
||||
"requestId": "out_1",
|
||||
"connectionId": "conn_1",
|
||||
"channelId": "weixin-main",
|
||||
"kind": "weixin",
|
||||
"target": {"peerId": "peer-1", "peerType": "dm", "threadId": None},
|
||||
"content": "hello",
|
||||
"metadata": {},
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 409
|
||||
assert response.json()["retryAfterSeconds"] == 5
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify failure**
|
||||
@ -678,6 +753,7 @@ from __future__ import annotations
|
||||
from typing import Any
|
||||
|
||||
from fastapi import FastAPI, Header, HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from external_connector.models import ConnectorSessionRequest, SendRequest
|
||||
from external_connector.providers.base import ConnectorProvider
|
||||
@ -725,9 +801,13 @@ def create_app(*, provider: ConnectorProvider, api_token: str) -> FastAPI:
|
||||
return {"ok": True}
|
||||
|
||||
@app.post("/send")
|
||||
def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]:
|
||||
def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> JSONResponse | dict[str, Any]:
|
||||
require_auth(authorization)
|
||||
return provider.send(payload.model_dump(by_alias=True))
|
||||
result = dict(provider.send(payload.model_dump(by_alias=True)))
|
||||
status_code = int(result.pop("httpStatus", 200))
|
||||
if status_code != 200:
|
||||
return JSONResponse(status_code=status_code, content=result)
|
||||
return result
|
||||
|
||||
return app
|
||||
```
|
||||
@ -810,9 +890,13 @@ from external_connector.state import SidecarStateStore
|
||||
class FakeRunner:
|
||||
def __init__(self) -> None:
|
||||
self.commands: list[list[str]] = []
|
||||
self.cwd: str | None = None
|
||||
self.timeout: float | None = None
|
||||
|
||||
def __call__(self, command: list[str], cwd: str) -> tuple[int, str, str]:
|
||||
def __call__(self, command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]:
|
||||
self.commands.append(command)
|
||||
self.cwd = cwd
|
||||
self.timeout = timeout
|
||||
return 0, "connected account=weixin:me", ""
|
||||
|
||||
|
||||
@ -820,7 +904,7 @@ def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None:
|
||||
runner = FakeRunner()
|
||||
provider = VendorCliProvider(
|
||||
store=SidecarStateStore(tmp_path / "state.json"),
|
||||
env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}"},
|
||||
env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}", "CONNECTOR_COMMAND_TIMEOUT_SECONDS": "30"},
|
||||
runner=runner,
|
||||
)
|
||||
|
||||
@ -837,10 +921,12 @@ def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None:
|
||||
|
||||
assert session["status"] in {"waiting_for_user", "connected"}
|
||||
assert runner.commands[0][0] == "vendor-weixin"
|
||||
assert runner.cwd == str(tmp_path)
|
||||
assert runner.timeout == 30.0
|
||||
|
||||
|
||||
def test_vendor_cli_provider_redacts_sensitive_error(tmp_path) -> None:
|
||||
def runner(command: list[str], cwd: str) -> tuple[int, str, str]:
|
||||
def runner(command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]:
|
||||
return 1, "", "failed secret-token appSecret=abc"
|
||||
|
||||
provider = VendorCliProvider(
|
||||
@ -894,11 +980,11 @@ from external_connector.providers.fake import _session_view
|
||||
from external_connector.state import SidecarStateStore
|
||||
|
||||
|
||||
Runner = Callable[[list[str], str], tuple[int, str, str]]
|
||||
Runner = Callable[[list[str], str, float], tuple[int, str, str]]
|
||||
|
||||
|
||||
def default_runner(command: list[str], cwd: str) -> tuple[int, str, str]:
|
||||
completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False)
|
||||
def default_runner(command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]:
|
||||
completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False, timeout=timeout)
|
||||
return completed.returncode, completed.stdout, completed.stderr
|
||||
|
||||
|
||||
@ -915,6 +1001,7 @@ class VendorCliProvider:
|
||||
self.store = store
|
||||
self.env = env or os.environ
|
||||
self.runner = runner
|
||||
self.command_timeout_seconds = float(self.env.get("CONNECTOR_COMMAND_TIMEOUT_SECONDS") or 120)
|
||||
|
||||
def connectors(self) -> list[dict[str, Any]]:
|
||||
return [
|
||||
@ -935,9 +1022,18 @@ class VendorCliProvider:
|
||||
options=dict(payload.get("options") or {}),
|
||||
)
|
||||
command_template = self._command_template(kind)
|
||||
state_dir = str(Path(self.store.path).parent / kind / session.connection_id)
|
||||
connector_home = Path(self.store.path).parent
|
||||
state_dir = str(connector_home / kind / session.connection_id)
|
||||
Path(state_dir).mkdir(parents=True, exist_ok=True)
|
||||
command = shlex.split(command_template.format(state_dir=state_dir, connection_id=session.connection_id))
|
||||
code, stdout, stderr = self.runner(command, state_dir)
|
||||
try:
|
||||
code, stdout, stderr = self.runner(command, str(connector_home), self.command_timeout_seconds)
|
||||
except subprocess.TimeoutExpired:
|
||||
session = self.store.update_session(session.session_id, status="error", error="Provider command timed out")
|
||||
return _session_view(session)
|
||||
except Exception as exc:
|
||||
session = self.store.update_session(session.session_id, status="error", error=_redact(str(exc)))
|
||||
return _session_view(session)
|
||||
if code != 0:
|
||||
session = self.store.update_session(session.session_id, status="error", error=_redact(stderr or stdout))
|
||||
return _session_view(session)
|
||||
@ -964,6 +1060,8 @@ class VendorCliProvider:
|
||||
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"]))
|
||||
if not begin.should_send:
|
||||
if begin.http_status == 409:
|
||||
return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409}
|
||||
return {"ok": True, "providerMessageId": begin.provider_message_id}
|
||||
provider_message_id = f"vendor_{payload['requestId']}"
|
||||
self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id)
|
||||
@ -1061,6 +1159,7 @@ services:
|
||||
CONNECTOR_API_TOKEN: ${EXTERNAL_CONNECTOR_TOKEN}
|
||||
CONNECTOR_HOME: /var/lib/external-connector
|
||||
CONNECTOR_PROVIDER: ${CONNECTOR_PROVIDER:-vendor_cli}
|
||||
CONNECTOR_COMMAND_TIMEOUT_SECONDS: ${CONNECTOR_COMMAND_TIMEOUT_SECONDS:-120}
|
||||
WEIXIN_CONNECT_COMMAND: ${WEIXIN_CONNECT_COMMAND:-}
|
||||
FEISHU_CONNECT_COMMAND: ${FEISHU_CONNECT_COMMAND:-}
|
||||
volumes:
|
||||
@ -1083,6 +1182,7 @@ BEAVER_BRIDGE_TOKEN=
|
||||
BEAVER_BRIDGE_BASE_URL=http://app-instance:8080
|
||||
EXTERNAL_CONNECTOR_PORT=8787
|
||||
CONNECTOR_PROVIDER=vendor_cli
|
||||
CONNECTOR_COMMAND_TIMEOUT_SECONDS=120
|
||||
WEIXIN_CONNECT_COMMAND=
|
||||
FEISHU_CONNECT_COMMAND=
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user