Files

69 lines
2.5 KiB
Python

from __future__ import annotations
from external_connector.state import SidecarStateStore
def test_state_store_saves_and_loads_connector_sessions(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json")
session = store.create_session(
kind="weixin",
connection_id="conn_1",
channel_id="weixin-main",
display_name="Weixin Main",
options={},
)
store.update_session(session.session_id, status="connected", account_id="weixin:me", display_name="Me")
loaded = store.get_session(session.session_id)
assert session.session_id.startswith("cs_")
assert loaded.status == "connected"
assert loaded.account_id == "weixin:me"
def test_state_store_dedupes_send_results(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json")
first = store.begin_send(connection_id="conn_1", request_id="out_1")
store.complete_send(first.dedupe_key, provider_message_id="provider-1")
duplicate = store.begin_send(connection_id="conn_1", request_id="out_1")
assert first.should_send is True
assert duplicate.should_send is False
assert duplicate.status == "completed"
assert duplicate.http_status == 200
assert duplicate.provider_message_id == "provider-1"
def test_state_store_returns_conflict_for_active_send_processing(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60)
store.begin_send(connection_id="conn_1", request_id="out_1")
duplicate = store.begin_send(connection_id="conn_1", request_id="out_1")
assert duplicate.should_send is False
assert duplicate.status == "processing"
assert duplicate.http_status == 409
assert duplicate.retry_after_seconds == 5
def test_state_store_retries_stale_send_processing(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=0)
store.begin_send(connection_id="conn_1", request_id="out_1")
retry = store.begin_send(connection_id="conn_1", request_id="out_1")
assert retry.should_send is True
assert retry.status == "processing"
def test_state_store_retries_failed_send_immediately(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60)
first = store.begin_send(connection_id="conn_1", request_id="out_1")
store.fail_send(first.dedupe_key, error="provider rejected message")
retry = store.begin_send(connection_id="conn_1", request_id="out_1")
assert retry.should_send is True
assert retry.status == "processing"