145 lines
5.4 KiB
Python
145 lines
5.4 KiB
Python
"""Bridge event dedupe store for external connector retries."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
from typing import Any
|
|
|
|
|
|
def _iso_now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _parse_iso(value: str) -> datetime:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ConnectorMessageDedupeRecord:
|
|
dedupe_key: str
|
|
connection_id: str
|
|
event_id: str
|
|
status: str
|
|
first_seen_at: str
|
|
updated_at: str
|
|
delivery_attempts: int
|
|
message_id: str | None = None
|
|
last_error: str | None = None
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict[str, Any]) -> "ConnectorMessageDedupeRecord":
|
|
return cls(
|
|
dedupe_key=str(data.get("dedupe_key") or ""),
|
|
connection_id=str(data.get("connection_id") or ""),
|
|
event_id=str(data.get("event_id") or ""),
|
|
status=str(data.get("status") or "processing"),
|
|
first_seen_at=str(data.get("first_seen_at") or _iso_now()),
|
|
updated_at=str(data.get("updated_at") or _iso_now()),
|
|
delivery_attempts=int(data.get("delivery_attempts") or 0),
|
|
message_id=str(data["message_id"]) if data.get("message_id") is not None else None,
|
|
last_error=str(data["last_error"]) if data.get("last_error") is not None else None,
|
|
)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class DedupeBeginResult:
|
|
should_process: bool
|
|
dedupe_key: str
|
|
status: str
|
|
http_status: int
|
|
retry_after_seconds: int | None
|
|
record: ConnectorMessageDedupeRecord
|
|
|
|
|
|
class MessageDedupeStore:
|
|
def __init__(self, path: Path, *, processing_ttl_seconds: int = 60) -> None:
|
|
self.path = Path(path)
|
|
self.processing_ttl_seconds = int(processing_ttl_seconds)
|
|
self._lock = Lock()
|
|
|
|
def begin(self, *, connection_id: str, event_id: str, delivery_attempt: int) -> DedupeBeginResult:
|
|
dedupe_key = f"{connection_id}:{event_id}"
|
|
now = _iso_now()
|
|
with self._lock:
|
|
data = self._load()
|
|
raw = data["records"].get(dedupe_key)
|
|
if isinstance(raw, dict):
|
|
record = ConnectorMessageDedupeRecord.from_dict(raw)
|
|
if record.status == "completed":
|
|
return DedupeBeginResult(False, dedupe_key, record.status, 200, None, record)
|
|
if record.status == "processing" and not self._is_stale(record, now):
|
|
return DedupeBeginResult(False, dedupe_key, record.status, 409, 5, record)
|
|
record.status = "processing"
|
|
record.updated_at = now
|
|
record.delivery_attempts = max(record.delivery_attempts + 1, int(delivery_attempt))
|
|
record.last_error = None
|
|
else:
|
|
record = ConnectorMessageDedupeRecord(
|
|
dedupe_key=dedupe_key,
|
|
connection_id=connection_id,
|
|
event_id=event_id,
|
|
status="processing",
|
|
first_seen_at=now,
|
|
updated_at=now,
|
|
delivery_attempts=max(1, int(delivery_attempt)),
|
|
)
|
|
data["records"][dedupe_key] = record.to_dict()
|
|
self._save(data)
|
|
return DedupeBeginResult(True, dedupe_key, record.status, 200, None, record)
|
|
|
|
def complete(self, dedupe_key: str, *, message_id: str | None) -> ConnectorMessageDedupeRecord:
|
|
return self._mark(dedupe_key, status="completed", message_id=message_id, error=None)
|
|
|
|
def fail(self, dedupe_key: str, *, error: str) -> ConnectorMessageDedupeRecord:
|
|
return self._mark(dedupe_key, status="failed", message_id=None, error=error)
|
|
|
|
def _mark(
|
|
self,
|
|
dedupe_key: str,
|
|
*,
|
|
status: str,
|
|
message_id: str | None,
|
|
error: str | None,
|
|
) -> ConnectorMessageDedupeRecord:
|
|
with self._lock:
|
|
data = self._load()
|
|
raw = data["records"].get(dedupe_key)
|
|
if not isinstance(raw, dict):
|
|
raise KeyError(dedupe_key)
|
|
record = ConnectorMessageDedupeRecord.from_dict(raw)
|
|
record.status = status
|
|
record.updated_at = _iso_now()
|
|
record.message_id = message_id or record.message_id
|
|
record.last_error = error
|
|
data["records"][dedupe_key] = record.to_dict()
|
|
self._save(data)
|
|
return record
|
|
|
|
def _is_stale(self, record: ConnectorMessageDedupeRecord, now: str) -> bool:
|
|
age = (_parse_iso(now) - _parse_iso(record.updated_at)).total_seconds()
|
|
return age >= self.processing_ttl_seconds
|
|
|
|
def _load(self) -> dict[str, Any]:
|
|
if not self.path.exists():
|
|
return {"records": {}}
|
|
try:
|
|
data = json.loads(self.path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
return {"records": {}}
|
|
if not isinstance(data, dict) or not isinstance(data.get("records"), dict):
|
|
return {"records": {}}
|
|
return data
|
|
|
|
def _save(self, data: dict[str, Any]) -> None:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
|
|
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
tmp_path.replace(self.path)
|