Files

688 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import base64
import json
import os
import subprocess
import threading
import time
from collections.abc import Callable
from io import BytesIO
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from uuid import uuid4
import httpx
import qrcode
import qrcode.image.svg
from external_connector.providers.fake import _session_view
from external_connector.state import ConnectorSessionState, SidecarStateStore
BridgePoster = Callable[[str, dict[str, Any], dict[str, str]], None]
ReceiverStart = Callable[[ConnectorSessionState], object]
class FeishuBotProvider:
provider_id = "feishu_bot"
def __init__(
self,
*,
store: SidecarStateStore,
http_client: Any | None = None,
bridge_base_url: str,
public_base_url: str | None = None,
bridge_token: str,
bridge_post: BridgePoster | None = None,
api_base_url: str = "",
start_receivers: bool = True,
receiver_start: ReceiverStart | None = None,
) -> None:
self.store = store
self.http = http_client or httpx.Client(timeout=30)
self.bridge_base_url = bridge_base_url.rstrip("/")
self.public_base_url = (public_base_url or bridge_base_url).rstrip("/")
self.bridge_token = bridge_token
self.bridge_post = bridge_post or self._default_bridge_post
self.api_base_url = api_base_url.rstrip("/")
self.start_receivers = start_receivers
self.receiver_start = receiver_start or self._start_receiver_process
self._receiver_processes: dict[str, object] = {}
self._receiver_lock = threading.Lock()
self._start_existing_connected_receivers()
def connectors(self) -> list[dict[str, Any]]:
return [
{
"kind": "feishu",
"displayName": "Feishu/Lark",
"authType": "qr_or_bot_app",
"providerId": self.provider_id,
"capabilities": ["receive_text", "send_text", "groups"],
}
]
def health(self) -> dict[str, Any]:
return {"ok": True, "providerId": self.provider_id}
def start_session(self, payload: dict[str, Any]) -> dict[str, Any]:
kind = str(payload["kind"])
if kind != "feishu":
raise KeyError(f"Unsupported connector kind: {kind}")
options = dict(payload.get("options") or {})
session = self.store.create_session(
kind=kind,
connection_id=str(payload["connectionId"]),
channel_id=str(payload["channelId"]),
display_name=str(payload["displayName"]),
options=options,
)
metadata = {
"eventCallbackPath": "/feishu/events",
"eventCallbackUrl": f"{self.public_base_url}/feishu/events",
"bridgeBaseUrl": str(payload.get("callbackBaseUrl") or self.bridge_base_url),
}
app_id = str(options.get("appId") or options.get("app_id") or "").strip()
app_secret = str(options.get("appSecret") or options.get("app_secret") or "").strip()
verification_token = str(options.get("verificationToken") or options.get("verification_token") or "").strip()
mode = str(options.get("mode") or "create").strip().lower()
domain = _domain(options)
metadata.update(_policy_metadata(options))
if not app_id or not app_secret:
if mode != "link":
return self._start_registration_session(session, metadata=metadata, domain=domain)
session = self.store.update_session(
session.session_id,
status="waiting_for_user",
instructions=_link_instructions(metadata["eventCallbackUrl"]),
metadata=metadata,
)
return _session_view(session)
token_data = self._tenant_token(app_id, app_secret, domain=domain)
metadata.update(
{
"appId": app_id,
"appSecret": app_secret,
"verificationToken": verification_token,
"tenantAccessToken": token_data["token"],
"tenantTokenExpiresAt": token_data["expires_at"],
"domain": domain,
}
)
session = self.store.update_session(
session.session_id,
status="connected",
account_id=f"feishu:{app_id}",
metadata=metadata,
instructions=_connected_instructions(),
)
self._ensure_receiver(session)
return _session_view(session)
def get_session(self, session_id: str) -> dict[str, Any]:
session = self.store.get_session(session_id)
if session.kind != "feishu":
raise KeyError(session_id)
if session.kind == "feishu" and session.status in {"qr_ready", "scanned", "confirmed", "waiting_for_user"} and session.metadata.get("deviceCode"):
session = self._poll_registration_session(session)
if session.status == "connected":
self._ensure_receiver(session)
return _session_view(session)
def cancel_session(self, session_id: str) -> None:
session = self.store.get_session(session_id)
if session.kind != "feishu":
raise KeyError(session_id)
self.store.update_session(session_id, status="cancelled")
self._stop_receiver(session.connection_id)
def logout(self, connection_id: str) -> None:
try:
session = self.store.find_session_by_connection_id(connection_id)
except KeyError:
return None
self._stop_receiver(connection_id)
self.store.update_session(session.session_id, status="cancelled")
return None
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"]))
if not begin.should_send:
if begin.http_status == 409:
return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409}
return {"ok": True, "providerMessageId": begin.provider_message_id}
session = self.store.find_session_by_connection_id(str(payload["connectionId"]))
token = self._tenant_token_for_session(session)
target = dict(payload.get("target") or {})
metadata = dict(session.metadata)
api_base = _open_api_base_url(str(metadata.get("domain") or "feishu"), self.api_base_url)
peer_type = str(target.get("peerType") or "dm")
receive_id_type = "chat_id" if peer_type == "group" else "open_id"
receive_id = str(target.get("threadId") or target.get("peerId") or "") if receive_id_type == "chat_id" else str(target.get("peerId") or "")
chunks = _text_chunks(str(payload.get("content") or ""), _positive_int(metadata.get("maxMessageChars"), default=20000))
provider_message_ids: list[str] = []
try:
for chunk in chunks:
response = self.http.post(
f"{api_base}/open-apis/im/v1/messages?receive_id_type={receive_id_type}",
json={
"receive_id": receive_id,
"msg_type": "text",
"content": json.dumps({"text": chunk}, ensure_ascii=False),
},
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
timeout=20,
)
response.raise_for_status()
data = dict(response.json())
if int(data.get("code") or 0) != 0:
error = str(data.get("msg") or data)
self.store.fail_send(begin.dedupe_key, error=error)
return {"ok": False, "error": error, "httpStatus": 502}
provider_message_ids.append(str((data.get("data") or {}).get("message_id") or f"feishu_{payload['requestId']}"))
except Exception as exc:
error = str(exc)
self.store.fail_send(begin.dedupe_key, error=error)
return {"ok": False, "error": error, "httpStatus": 502}
provider_message_id = ",".join(provider_message_ids) or f"feishu_{payload['requestId']}"
self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id)
return {"ok": True, "providerMessageId": provider_message_id}
def handle_event(self, payload: dict[str, Any]) -> dict[str, Any]:
challenge = payload.get("challenge")
if challenge:
token = str(payload.get("token") or (payload.get("header") or {}).get("token") or "")
if not token or self._session_for_verification_token(token) is None:
return {"ok": False, "error": "verification token is required", "httpStatus": 401}
return {"challenge": challenge}
header = dict(payload.get("header") or {})
event = dict(payload.get("event") or {})
app_id = str(header.get("app_id") or "")
session = self._session_for_app_id(app_id)
expected_token = str(session.metadata.get("verificationToken") or "")
received_token = str(header.get("token") or payload.get("token") or "")
if not expected_token or received_token != expected_token:
return {"ok": False, "error": "invalid verification token", "httpStatus": 401}
ignored = _ignore_reason(event)
if ignored:
return {"ok": True, "ignored": ignored}
bridge_event = _bridge_event_from_feishu(session, header, event)
if bridge_event is None:
return {"ok": True, "ignored": "empty_or_oversized"}
self.bridge_post(
f"{_bridge_base_url(session, self.bridge_base_url)}/api/channel-connector-bridge/events",
bridge_event,
{"Authorization": f"Bearer {self.bridge_token}"},
)
return {"ok": True}
def _tenant_token_for_session(self, session: ConnectorSessionState) -> str:
metadata = dict(session.metadata)
expires_at = float(metadata.get("tenantTokenExpiresAt") or 0)
token = str(metadata.get("tenantAccessToken") or "")
if token and expires_at - time.time() > 60:
return token
app_id = str(metadata.get("appId") or "")
app_secret = str(metadata.get("appSecret") or "")
token_data = self._tenant_token(app_id, app_secret, domain=str(metadata.get("domain") or "feishu"))
metadata.update({"tenantAccessToken": token_data["token"], "tenantTokenExpiresAt": token_data["expires_at"]})
self.store.update_session(session.session_id, metadata=metadata)
return str(token_data["token"])
def _tenant_token(self, app_id: str, app_secret: str, *, domain: str = "feishu") -> dict[str, Any]:
response = self.http.post(
f"{_open_api_base_url(domain, self.api_base_url)}/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": app_id, "app_secret": app_secret},
timeout=20,
)
response.raise_for_status()
data = dict(response.json())
if int(data.get("code") or 0) != 0:
raise RuntimeError(str(data.get("msg") or data))
return {"token": str(data["tenant_access_token"]), "expires_at": time.time() + int(data.get("expire") or 7200)}
def _session_for_app_id(self, app_id: str) -> ConnectorSessionState:
sessions = self.store.list_sessions()
for session in sorted(sessions, key=lambda item: item.updated_at, reverse=True):
if session.kind == "feishu" and session.status == "connected" and session.metadata.get("appId") == app_id:
return session
raise KeyError(app_id)
def _session_for_verification_token(self, token: str) -> ConnectorSessionState | None:
sessions = self.store.list_sessions()
for session in sorted(sessions, key=lambda item: item.updated_at, reverse=True):
if (
session.kind == "feishu"
and session.status == "connected"
and token
and session.metadata.get("verificationToken") == token
):
return session
return None
def _default_bridge_post(self, url: str, payload: dict[str, Any], headers: dict[str, str]) -> None:
response = self.http.post(url, json=payload, headers=headers, timeout=20)
response.raise_for_status()
def _start_registration_session(
self,
session: ConnectorSessionState,
*,
metadata: dict[str, Any],
domain: str,
) -> dict[str, Any]:
try:
init_data = self._registration_post(domain, {"action": "init"})
supported = init_data.get("supported_auth_methods") or []
if "client_secret" not in supported:
session = self.store.update_session(
session.session_id,
status="error",
error="Current Feishu/Lark environment does not support client_secret bot registration",
metadata=metadata,
)
return _session_view(session)
begin_data = self._registration_post(
domain,
{
"action": "begin",
"archetype": "PersonalAgent",
"auth_method": "client_secret",
"request_user_info": "open_id",
},
)
except Exception as exc:
session = self.store.update_session(
session.session_id,
status="error",
error=str(exc),
metadata=metadata,
)
return _session_view(session)
qr_code = _with_onboard_from(str(begin_data.get("verification_uri_complete") or ""))
if not qr_code:
session = self.store.update_session(
session.session_id,
status="error",
error="Feishu/Lark registration did not return a QR URL",
metadata=metadata,
)
return _session_view(session)
metadata.update(
{
"domain": domain,
"registrationBaseUrl": _registration_base_url(domain),
"deviceCode": str(begin_data.get("device_code") or ""),
"pollIntervalSeconds": int(begin_data.get("interval") or 5),
"expiresAt": time.time() + int(begin_data.get("expire_in") or 600),
}
)
session = self.store.update_session(
session.session_id,
status="qr_ready",
qr_code=qr_code,
qr_image=_qr_svg_data_uri(qr_code),
instructions=_create_instructions(metadata["eventCallbackUrl"]),
metadata=metadata,
)
return _session_view(session)
def _poll_registration_session(self, session: ConnectorSessionState) -> ConnectorSessionState:
metadata = dict(session.metadata)
expires_at = float(metadata.get("expiresAt") or 0)
if expires_at and time.time() > expires_at:
return self.store.update_session(session.session_id, status="expired", metadata=metadata)
domain = str(metadata.get("domain") or "feishu")
device_code = str(metadata.get("deviceCode") or "")
if not device_code:
return session
try:
poll_data = self._registration_post(domain, {"action": "poll", "device_code": device_code})
except Exception as exc:
return self.store.update_session(session.session_id, status="error", error=str(exc), metadata=metadata)
user_info = dict(poll_data.get("user_info") or {})
if user_info.get("tenant_brand") == "lark":
domain = "lark"
metadata["domain"] = domain
app_id = str(poll_data.get("client_id") or "").strip()
app_secret = str(poll_data.get("client_secret") or "").strip()
if app_id and app_secret:
token_data = self._tenant_token(app_id, app_secret, domain=domain)
metadata.update(
{
"appId": app_id,
"appSecret": app_secret,
"tenantAccessToken": token_data["token"],
"tenantTokenExpiresAt": token_data["expires_at"],
"registrationOpenId": str(user_info.get("open_id") or ""),
}
)
return self.store.update_session(
session.session_id,
status="connected",
account_id=f"feishu:{app_id}",
metadata=metadata,
instructions=_connected_instructions(),
)
error = str(poll_data.get("error") or "")
if error == "authorization_pending":
return session
if error == "slow_down":
metadata["pollIntervalSeconds"] = int(metadata.get("pollIntervalSeconds") or 5) + 5
return self.store.update_session(session.session_id, metadata=metadata)
if error == "expired_token":
return self.store.update_session(session.session_id, status="expired", metadata=metadata)
if error == "access_denied":
return self.store.update_session(session.session_id, status="error", error="Feishu/Lark authorization was denied", metadata=metadata)
if error:
description = str(poll_data.get("error_description") or error)
return self.store.update_session(session.session_id, status="error", error=description, metadata=metadata)
return session
def _registration_post(self, domain: str, values: dict[str, str]) -> dict[str, Any]:
response = self.http.post(
f"{_registration_base_url(domain)}/oauth/v1/app/registration",
data=urlencode(values),
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=20,
)
response.raise_for_status()
return dict(response.json())
def _start_existing_connected_receivers(self) -> None:
if not self.start_receivers:
return None
for session in self.store.list_sessions():
if session.kind == "feishu" and session.status == "connected":
self._ensure_receiver(session)
return None
def _ensure_receiver(self, session: ConnectorSessionState) -> None:
if not self.start_receivers or not _has_receiver_material(session):
return None
with self._receiver_lock:
existing = self._receiver_processes.get(session.connection_id)
if existing is not None and _receiver_is_alive(existing):
return None
receiver = self.receiver_start(session)
self._receiver_processes[session.connection_id] = receiver
return None
def _stop_receiver(self, connection_id: str) -> None:
with self._receiver_lock:
receiver = self._receiver_processes.pop(connection_id, None)
if receiver is None:
return None
terminate = getattr(receiver, "terminate", None)
if callable(terminate):
terminate()
wait = getattr(receiver, "wait", None)
if callable(wait):
try:
wait(timeout=5)
except subprocess.TimeoutExpired:
kill = getattr(receiver, "kill", None)
if callable(kill):
kill()
return None
def _start_receiver_process(self, session: ConnectorSessionState) -> subprocess.Popen[bytes]:
metadata = dict(session.metadata)
script = Path(__file__).resolve().parents[1] / "node" / "feishu_ws_receiver.js"
env = os.environ.copy()
env.update(
{
"FEISHU_APP_ID": str(metadata.get("appId") or ""),
"FEISHU_APP_SECRET": str(metadata.get("appSecret") or ""),
"FEISHU_DOMAIN": str(metadata.get("domain") or "feishu"),
"FEISHU_CONNECTION_ID": session.connection_id,
"FEISHU_CHANNEL_ID": session.channel_id,
"FEISHU_ACCOUNT_ID": str(session.account_id or ""),
"FEISHU_REQUIRE_MENTION_IN_GROUPS": _env_bool(metadata.get("requireMentionInGroups"), default=True),
"FEISHU_RESPOND_TO_MENTION_ALL": _env_bool(metadata.get("respondToMentionAll"), default=False),
"FEISHU_DM_MODE": str(metadata.get("dmMode") or "open"),
"FEISHU_ALLOW_FROM": ",".join(_string_list(metadata.get("allowFrom"))),
"FEISHU_GROUP_ALLOW_FROM": ",".join(_string_list(metadata.get("groupAllowFrom"))),
"FEISHU_MAX_MESSAGE_CHARS": str(_positive_int(metadata.get("maxMessageChars"), default=20000)),
"FEISHU_TEXT_BATCH_DELAY_MS": str(_positive_int(metadata.get("textBatchDelayMs"), default=0)),
"FEISHU_TEXT_BATCH_MAX_MESSAGES": str(_positive_int(metadata.get("textBatchMaxMessages"), default=10)),
"FEISHU_TEXT_BATCH_MAX_CHARS": str(_positive_int(metadata.get("textBatchMaxChars"), default=20000)),
"BEAVER_BRIDGE_BASE_URL": _bridge_base_url(session, self.bridge_base_url),
"BEAVER_BRIDGE_TOKEN": self.bridge_token,
}
)
return subprocess.Popen(["node", str(script)], env=env, cwd=str(script.parent))
def _bridge_base_url(session: ConnectorSessionState, fallback: str) -> str:
return str(session.metadata.get("bridgeBaseUrl") or fallback).rstrip("/")
def _bridge_event_from_feishu(session: ConnectorSessionState, header: dict[str, Any], event: dict[str, Any]) -> dict[str, Any] | None:
message = dict(event.get("message") or {})
sender = dict(event.get("sender") or {})
sender_id = dict(sender.get("sender_id") or {})
is_group = message.get("chat_type") == "group"
sender_open_id = str(sender_id.get("open_id") or sender_id.get("user_id") or "")
chat_id = str(message.get("chat_id") or "")
peer_id = chat_id if is_group else sender_open_id
message_id = str(message.get("message_id") or uuid4().hex)
event_id = str(header.get("event_id") or f"{session.channel_id}:{message_id}")
content = _extract_text(message).strip()
max_chars = _positive_int(session.metadata.get("maxMessageChars"), default=20000)
if not content or len(content) > max_chars:
return None
return {
"eventId": event_id,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"deliveryAttempt": 1,
"connectionId": session.connection_id,
"channelId": session.channel_id,
"kind": "feishu",
"accountId": session.account_id,
"peerId": peer_id,
"peerType": "group" if is_group else "dm",
"userId": sender_open_id,
"threadId": chat_id or None,
"messageId": message_id,
"messageType": str(message.get("message_type") or "text"),
"content": content,
"metadata": {
"chatId": message.get("chat_id"),
"rawMessageType": message.get("message_type"),
"senderType": sender.get("sender_type"),
"mentions": message.get("mentions") if isinstance(message.get("mentions"), list) else [],
},
}
def _has_receiver_material(session: ConnectorSessionState) -> bool:
metadata = dict(session.metadata)
return bool(
session.status == "connected"
and str(metadata.get("appId") or "").strip()
and str(metadata.get("appSecret") or "").strip()
and session.connection_id
and session.channel_id
)
def _receiver_is_alive(receiver: object) -> bool:
poll = getattr(receiver, "poll", None)
if callable(poll):
return poll() is None
return True
def _extract_text(message: dict[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
try:
parsed = json.loads(content)
except json.JSONDecodeError:
return content
text = parsed.get("text")
if text is not None:
return str(text)
return content
return ""
def _text_chunks(text: str, max_chars: int) -> list[str]:
cleaned = str(text or "")
if not cleaned:
return [""]
size = max(1, int(max_chars))
return [cleaned[index : index + size] for index in range(0, len(cleaned), size)]
def _ignore_reason(event: dict[str, Any]) -> str:
sender = dict(event.get("sender") or {})
sender_type = str(sender.get("sender_type") or "").strip().lower()
if sender_type and sender_type != "user":
return f"sender_type:{sender_type}"
message = dict(event.get("message") or {})
content = _extract_text(message).strip()
if content.startswith("/feishu"):
return "feishu_command"
return ""
def _policy_metadata(options: dict[str, Any]) -> dict[str, Any]:
metadata: dict[str, Any] = {}
for key in ("allowFrom", "allow_from"):
items = _string_list(options.get(key))
if items:
metadata["allowFrom"] = items
break
for key in ("groupAllowFrom", "group_allow_from"):
items = _string_list(options.get(key))
if items:
metadata["groupAllowFrom"] = items
break
if "requireMentionInGroups" in options or "require_mention_in_groups" in options:
metadata["requireMentionInGroups"] = _bool(options.get("requireMentionInGroups", options.get("require_mention_in_groups")))
else:
metadata["requireMentionInGroups"] = True
if "respondToMentionAll" in options or "respond_to_mention_all" in options:
metadata["respondToMentionAll"] = _bool(options.get("respondToMentionAll", options.get("respond_to_mention_all")))
dm_mode = str(options.get("dmMode") or options.get("dm_mode") or "open").strip()
metadata["dmMode"] = dm_mode if dm_mode in {"open", "allowlist", "pair", "disabled"} else "open"
for key, metadata_key, default in (
("maxMessageChars", "maxMessageChars", 20000),
("textBatchDelayMs", "textBatchDelayMs", 0),
("textBatchMaxMessages", "textBatchMaxMessages", 10),
("textBatchMaxChars", "textBatchMaxChars", 20000),
):
alt_key = _camel_to_snake(key)
if key in options or alt_key in options:
metadata[metadata_key] = _positive_int(options.get(key, options.get(alt_key)), default=default)
return metadata
def _string_list(value: Any) -> list[str]:
if isinstance(value, str):
raw_items = value.replace("\n", ",").split(",")
elif isinstance(value, list):
raw_items = value
else:
raw_items = []
return [str(item).strip() for item in raw_items if str(item).strip()]
def _bool(value: Any) -> bool:
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _positive_int(value: Any, *, default: int) -> int:
try:
number = int(value)
except (TypeError, ValueError):
return default
return number if number > 0 else default
def _env_bool(value: Any, *, default: bool) -> str:
if value is None:
return "true" if default else "false"
return "true" if _bool(value) else "false"
def _camel_to_snake(value: str) -> str:
result: list[str] = []
for char in value:
if char.isupper() and result:
result.append("_")
result.append(char.lower())
return "".join(result)
def _domain(options: dict[str, Any]) -> str:
domain = str(options.get("domain") or "feishu").strip().lower()
return "lark" if domain == "lark" else "feishu"
def _registration_base_url(domain: str) -> str:
return "https://accounts.larksuite.com" if domain == "lark" else "https://accounts.feishu.cn"
def _open_api_base_url(domain: str, configured_base_url: str) -> str:
if configured_base_url:
return configured_base_url.rstrip("/")
return "https://open.larksuite.com" if domain == "lark" else "https://open.feishu.cn"
def _with_onboard_from(value: str) -> str:
if not value:
return ""
parts = urlsplit(value)
query = dict(parse_qsl(parts.query, keep_blank_values=True))
query.setdefault("from", "onboard")
return urlunsplit((parts.scheme, parts.netloc, parts.path, urlencode(query), parts.fragment))
def _qr_svg_data_uri(value: str) -> str:
image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage)
buffer = BytesIO()
image.save(buffer)
return "data:image/svg+xml;base64," + base64.b64encode(buffer.getvalue()).decode("ascii")
def _create_instructions(event_callback_url: str) -> list[str]:
return [
"使用飞书客户端扫描二维码,选择一键创建飞书机器人。",
"创建完成后,点击打开机器人,在飞书中向机器人发送任意消息即可开始对话。",
"在飞书开放平台启用接收消息事件 im.message.receive_v1并选择长连接事件通道。",
f"如果使用 HTTP 回调模式,事件请求 URL 可设为 {event_callback_url}",
"如需用户身份授权,在飞书对话中发送 /feishu auth。",
"建议发送:学习一下我安装的新飞书插件,列出有哪些能力。",
"验证安装:在飞书对话中发送 /feishu start返回版本号代表安装成功。",
"如果 Windows 设备扫码异常,通常是终端二维码分辨率问题,可换用 Cmder 或使用本窗口二维码。",
]
def _link_instructions(event_callback_url: str) -> list[str]:
return [
"选择关联已有机器人时,请输入正确的 App ID 和 App Secret。",
"如果提示无效的 App ID 或 App Secret请回到飞书开放平台复制最新应用凭证。",
"在飞书开放平台启用接收消息事件 im.message.receive_v1并选择长连接事件通道。",
f"如果使用 HTTP 回调模式,事件请求 URL 可设为 {event_callback_url}",
]
def _connected_instructions() -> list[str]:
return [
"飞书机器人凭据已连接。",
"sidecar 会通过飞书长连接保持应用在线。",
"请确认飞书开放平台已启用接收消息事件 im.message.receive_v1。",
"在飞书对话中发送 /feishu start 验证插件返回版本号;发送任意消息验证 Beaver 是否收到。",
]