From 83d9d8c200448857b10ed957c6156fcb069ba620 Mon Sep 17 00:00:00 2001 From: steven_li Date: Tue, 16 Jun 2026 15:58:42 +0800 Subject: [PATCH] =?UTF-8?q?```=20feat(learning):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=8A=80=E8=83=BD=E5=AD=A6=E4=B9=A0=E5=80=99=E9=80=89=E8=80=85?= =?UTF-8?q?=E5=90=88=E6=88=90=E9=94=81=E5=AE=9A=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加了 DraftSynthesisInProgress 和 DraftHasNoChanges 异常来处理并发场景, 确保同一技能学习候选者的合成过程不会重复执行。实现了 claim_learning_candidate_for_synthesis 方法来原子性地锁定候选者进行合成。 fix(web): 为技能草案创建端点添加适当的HTTP状态码 当草案没有变化或正在合成时,现在正确返回409状态码而不是内部错误。 feat(skills): 实现技能修订内容比较以检测无变化情况 添加了 _is_noop_revision 方法来比较基础技能和提议的修订, 如果内容没有实际变化则抛出 NoDraftChanges 异常。 refactor(process): 修复任务证据记录后根运行状态更新逻辑 将任务证据记录事件后的状态从 waiting 更改为 done,并设置 finished_at 时间戳。 feat(tools): 防止在同一运行中重复执行外部写入操作 为邮件发送、日历创建等外部写入工具添加去重机制,避免重复的外部操作。 test: 添加技能学习和工具执行的单元测试 增加测试用例验证并发草案合成、重复外部写入抑制和无变化修订检测等功能。 ``` --- .../backend/beaver/interfaces/web/app.py | 16 +- .../backend/beaver/memory/skills/store.py | 46 +++++ .../beaver/services/process_service.py | 4 +- .../beaver/skills/learning/__init__.py | 4 +- .../beaver/skills/learning/pipeline.py | 41 +++- .../backend/beaver/skills/learning/service.py | 28 ++- .../backend/beaver/skills/learning/worker.py | 16 +- .../backend/beaver/tools/runtime/executor.py | 80 +++++++- .../tests/unit/test_create_instance_script.py | 69 +++++++ .../tests/unit/test_process_projection.py | 46 +++++ .../tests/unit/test_skill_learning_worker.py | 181 +++++++++++++++++- .../backend/tests/unit/test_tool_assembler.py | 29 +++ app-instance/create-instance.sh | 1 + 域名配置指引.md | 41 +++- 部署指南.md | 42 +++- 15 files changed, 615 insertions(+), 29 deletions(-) create mode 100644 app-instance/backend/tests/unit/test_create_instance_script.py diff --git a/app-instance/backend/beaver/interfaces/web/app.py b/app-instance/backend/beaver/interfaces/web/app.py index 0575593..f1318f7 100644 --- a/app-instance/backend/beaver/interfaces/web/app.py +++ b/app-instance/backend/beaver/interfaces/web/app.py @@ -52,7 +52,13 @@ from beaver.services.user_file_resolver import ( ) from beaver.skills.authoring import canonical_skill_format_instructions, ensure_canonical_skill_body, normalize_skill_frontmatter from beaver.skills.authoring.format import parse_skill_rewrite_json -from beaver.skills.learning import SkillLearningService, SkillLearningWorker, SkillLearningWorkerConfig +from beaver.skills.learning import ( + DraftHasNoChanges, + DraftSynthesisInProgress, + SkillLearningService, + SkillLearningWorker, + SkillLearningWorkerConfig, +) from beaver.skills.learning.replay import ReplayRunner from beaver.skills.catalog.utils import extract_required_tool_names, parse_frontmatter @@ -2236,6 +2242,10 @@ def create_app( candidate_id, provider_bundle=provider_bundle, ) + except DraftHasNoChanges as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc + except DraftSynthesisInProgress as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc except ValueError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return _skill_draft_payload(loaded, draft.skill_name, draft.draft_id) @@ -2251,6 +2261,10 @@ def create_app( candidate_id, provider_bundle=provider_bundle, ) + except DraftHasNoChanges as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc + except DraftSynthesisInProgress as exc: + raise HTTPException(status_code=409, detail=str(exc)) from exc except ValueError as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc return _skill_draft_payload(loaded, draft.skill_name, draft.draft_id) diff --git a/app-instance/backend/beaver/memory/skills/store.py b/app-instance/backend/beaver/memory/skills/store.py index 0b4a384..0c324d6 100644 --- a/app-instance/backend/beaver/memory/skills/store.py +++ b/app-instance/backend/beaver/memory/skills/store.py @@ -114,6 +114,52 @@ class SkillLearningStore: ) return updated + def claim_learning_candidate_for_synthesis( + self, + candidate_id: str, + *, + force: bool = False, + ) -> SkillLearningCandidate | None: + """Atomically claim a candidate before the expensive draft synthesis step.""" + + with self._locked(): + candidates = self.list_learning_candidates() + claimed: SkillLearningCandidate | None = None + for index, candidate in enumerate(candidates): + if candidate.candidate_id != candidate_id: + continue + if candidate.status in {"queued", "synthesizing"}: + return None + if not force and candidate.draft_skill_name and candidate.draft_id: + return None + payload = candidate.to_dict() + payload.update( + { + "status": "synthesizing", + "last_error": None, + "updated_at": _utc_now(), + } + ) + claimed = SkillLearningCandidate.from_dict(payload) + candidates[index] = claimed + break + if claimed is None: + return None + self.candidates_path.parent.mkdir(parents=True, exist_ok=True) + self.candidates_path.write_text( + "".join( + json.dumps(candidate.to_dict(), ensure_ascii=False, sort_keys=True) + "\n" + for candidate in candidates + ), + encoding="utf-8", + ) + self.append_audit_event( + candidate_id, + "draft_synthesis_started", + {"status": "synthesizing", "force": force}, + ) + return claimed + def list_learning_candidates(self, status: str | None = None) -> list[SkillLearningCandidate]: results: list[SkillLearningCandidate] = [] for payload in self._read_jsonl(self.candidates_path): diff --git a/app-instance/backend/beaver/services/process_service.py b/app-instance/backend/beaver/services/process_service.py index 6a0c5a0..0a8e7f0 100644 --- a/app-instance/backend/beaver/services/process_service.py +++ b/app-instance/backend/beaver/services/process_service.py @@ -351,8 +351,8 @@ class SessionProcessProjector: ) elif record.event_type == "task_evidence_recorded": - root["status"] = "waiting" - root["finished_at"] = None + root["status"] = "done" + root["finished_at"] = created_at add_event( event_id=_event_id(record, "evidence"), run_id=record.run_id or root_run_id, diff --git a/app-instance/backend/beaver/skills/learning/__init__.py b/app-instance/backend/beaver/skills/learning/__init__.py index 564aedb..3968669 100644 --- a/app-instance/backend/beaver/skills/learning/__init__.py +++ b/app-instance/backend/beaver/skills/learning/__init__.py @@ -9,7 +9,7 @@ from .missing_skill import ( MissingSkillDraftResult, MissingSkillSynthesizer, ) -from .pipeline import SkillLearningPipelineService +from .pipeline import DraftHasNoChanges, DraftSynthesisInProgress, SkillLearningPipelineService from .preservation import check_preservation from .replay import ReplayArmRequest, ReplayRunner, ReplayToolExecutor, ReplayToolPolicy, classify_tool_mode from .service import RunReceiptContext, SkillLearningService @@ -27,6 +27,8 @@ __all__ = [ "MissingSkillDraftResult", "MissingSkillSynthesizer", "RunReceiptContext", + "DraftHasNoChanges", + "DraftSynthesisInProgress", "SkillLearningPipelineService", "check_preservation", "ReplayToolExecutor", diff --git a/app-instance/backend/beaver/skills/learning/pipeline.py b/app-instance/backend/beaver/skills/learning/pipeline.py index fa87d67..796f068 100644 --- a/app-instance/backend/beaver/skills/learning/pipeline.py +++ b/app-instance/backend/beaver/skills/learning/pipeline.py @@ -9,7 +9,7 @@ from beaver.memory.skills import SkillDraftEvalReport, SkillDraftSafetyReport, S from beaver.skills.drafts import DraftService from beaver.skills.learning.eval import SkillDraftEvaluator from beaver.skills.learning.replay import ReplayRunner -from beaver.skills.learning.service import SkillLearningService +from beaver.skills.learning.service import NoDraftChanges, SkillLearningService from beaver.skills.learning.safety import SkillDraftSafetyChecker from beaver.skills.publisher import SkillPublisher from beaver.skills.reviews import ReviewService @@ -22,6 +22,14 @@ _REJECTABLE_DRAFT_STATUSES = { } +class DraftSynthesisInProgress(RuntimeError): + """Raised when another request already claimed the candidate for synthesis.""" + + +class DraftHasNoChanges(RuntimeError): + """Raised when synthesis produced no effective changes from the base skill.""" + + class SkillLearningPipelineService: """Coordinates candidate -> draft -> review -> publish lifecycle.""" @@ -60,8 +68,23 @@ class SkillLearningPipelineService: candidate_id: str, *, provider_bundle: ProviderBundle, + force: bool = False, ) -> SkillDraft: - draft = await self.learning_service.synthesize_draft(candidate_id, provider_bundle) + if not force: + existing = self._draft_for_candidate(candidate_id) + if existing is not None: + return existing + claimed = self.learning_store.claim_learning_candidate_for_synthesis(candidate_id, force=force) + if claimed is None: + existing = self._draft_for_candidate(candidate_id) + if existing is not None: + return existing + raise DraftSynthesisInProgress(f"Draft synthesis is already in progress for candidate: {candidate_id}") + try: + draft = await self.learning_service.synthesize_draft(candidate_id, provider_bundle) + except NoDraftChanges as exc: + self.mark_candidate_superseded(candidate_id, str(exc)) + raise DraftHasNoChanges(str(exc)) from exc self.mark_draft_synthesized(candidate_id, draft) return draft @@ -71,13 +94,7 @@ class SkillLearningPipelineService: *, provider_bundle: ProviderBundle, ) -> SkillDraft: - self.learning_store.transition_learning_candidate( - candidate_id, - "synthesizing", - event_type="draft_synthesis_started", - last_error=None, - ) - return await self.synthesize_draft(candidate_id, provider_bundle=provider_bundle) + return await self.synthesize_draft(candidate_id, provider_bundle=provider_bundle, force=True) def mark_candidate_queued(self, candidate_id: str) -> SkillLearningCandidate: return self._require_updated( @@ -162,6 +179,12 @@ class SkillLearningPipelineService: raise ValueError(f"Draft not found: {skill_name}/{draft_id}") return draft + def _draft_for_candidate(self, candidate_id: str) -> SkillDraft | None: + candidate = self.get_candidate(candidate_id) + if not candidate.draft_skill_name or not candidate.draft_id: + return None + return self.draft_service.get_draft(candidate.draft_skill_name, candidate.draft_id) + def submit_review( self, skill_name: str, diff --git a/app-instance/backend/beaver/skills/learning/service.py b/app-instance/backend/beaver/skills/learning/service.py index ef12a67..45e8181 100644 --- a/app-instance/backend/beaver/skills/learning/service.py +++ b/app-instance/backend/beaver/skills/learning/service.py @@ -20,8 +20,9 @@ from beaver.plugins.tree_merge import merge_supporting_file_trees from beaver.skills.drafts.service import DraftService from beaver.skills.learning.evidence import EvidencePacket, EvidenceSelector from beaver.skills.learning.synthesizer import SkillDraftSynthesizer -from beaver.skills.catalog.utils import parse_frontmatter +from beaver.skills.catalog.utils import parse_frontmatter, strip_frontmatter from beaver.skills.specs import SkillActivationReceipt +from beaver.skills.specs.serialization import normalize_frontmatter @dataclass(slots=True) @@ -30,6 +31,10 @@ class RunReceiptContext: effect_records: list[SkillEffectRecord] = field(default_factory=list) +class NoDraftChanges(ValueError): + """Raised when synthesis produces the same effective skill content as the base version.""" + + class SkillLearningService: def __init__( self, @@ -231,13 +236,18 @@ class SkillLearningService: ) target_skill = candidate.related_skill_names[0] base_version = candidate.evidence.get("skill_version") + base_skill = self._base_skill_snapshot(target_skill, base_version) payload = await self.synthesizer.synthesize_revision( candidate, packet, provider, model, - base_skill=self._base_skill_snapshot(target_skill, base_version), + base_skill=base_skill, ) + if self._is_noop_revision(payload, base_skill): + raise NoDraftChanges( + f"Synthesis produced no changes for {target_skill}/{base_version or 'current'}" + ) return self.draft_service.create_revision_draft( skill_name=target_skill, base_version=base_version, @@ -340,6 +350,16 @@ class SkillLearningService: "tool_hints": list(loaded.version.tool_hints), } + @staticmethod + def _is_noop_revision(payload: dict[str, Any], base_skill: dict[str, Any] | None) -> bool: + if base_skill is None: + return False + base_frontmatter = normalize_frontmatter(dict(base_skill.get("frontmatter") or {})) + proposed_frontmatter = normalize_frontmatter(dict(payload.get("frontmatter") or {})) + base_body = _normalize_skill_body(str(base_skill.get("content") or "")) + proposed_body = _normalize_skill_body(str(payload.get("content") or "")) + return base_frontmatter == proposed_frontmatter and base_body == proposed_body + def _merged_base_skill_snapshot(self, skill_names: list[str]) -> dict[str, Any] | None: snapshots = [ snapshot @@ -602,6 +622,10 @@ class SkillLearningService: return parsed.astimezone(timezone.utc) +def _normalize_skill_body(content: str) -> str: + return "\n".join(line.rstrip() for line in strip_frontmatter(content).strip().splitlines()).strip() + + def _digest_map(root: Path) -> dict[str, dict[str, Any]]: digest = hash_plugin_skill_tree(root) return { diff --git a/app-instance/backend/beaver/skills/learning/worker.py b/app-instance/backend/beaver/skills/learning/worker.py index e2dd2f8..7451931 100644 --- a/app-instance/backend/beaver/skills/learning/worker.py +++ b/app-instance/backend/beaver/skills/learning/worker.py @@ -9,7 +9,7 @@ from typing import Callable from beaver.engine.providers import ProviderBundle from beaver.memory.skills import SkillLearningCandidate -from beaver.skills.learning.pipeline import SkillLearningPipelineService +from beaver.skills.learning.pipeline import DraftHasNoChanges, SkillLearningPipelineService from beaver.skills.learning.replay import ReplayRunner @@ -114,13 +114,13 @@ class SkillLearningWorker: if self._has_active_draft(candidate): self.pipeline.mark_candidate_superseded(candidate.candidate_id, "active draft already exists for this skill") return False - self.pipeline.mark_candidate_queued(candidate.candidate_id) - self.pipeline.mark_candidate_synthesizing(candidate.candidate_id) - draft = await self.pipeline.synthesize_draft( - candidate.candidate_id, - provider_bundle=self.provider_bundle_factory(), - ) - self.pipeline.mark_draft_synthesized(candidate.candidate_id, draft) + try: + draft = await self.pipeline.synthesize_draft( + candidate.candidate_id, + provider_bundle=self.provider_bundle_factory(), + ) + except DraftHasNoChanges: + return False safety = self.pipeline.check_safety(draft.skill_name, draft.draft_id) if not safety.passed or safety.risk_level == "critical": return True diff --git a/app-instance/backend/beaver/tools/runtime/executor.py b/app-instance/backend/beaver/tools/runtime/executor.py index 6241df1..2d842a3 100644 --- a/app-instance/backend/beaver/tools/runtime/executor.py +++ b/app-instance/backend/beaver/tools/runtime/executor.py @@ -11,6 +11,7 @@ from __future__ import annotations +import hashlib import json from typing import TYPE_CHECKING, Any @@ -44,7 +45,45 @@ class ToolExecutor: tool_name=tool_name, error="tool_not_found", ) - return await tool.invoke(arguments or {}, context or ToolContext()) + normalized_arguments = dict(arguments or {}) + tool_context = context or ToolContext() + write_key = _external_write_key(tool_name, normalized_arguments) + if write_key is None: + return await tool.invoke(normalized_arguments, tool_context) + + external_writes = _external_write_state(tool_context) + previous = external_writes.get(write_key) + if previous is not None: + previous_content = str(previous.get("content") or "").strip() + detail = f" Previous result: {previous_content}" if previous_content else "" + return ToolResult( + success=True, + content=( + f"Duplicate external write suppressed for {tool_name}. " + "A matching write was already attempted in this run." + f"{detail}" + ), + tool_name=tool_name, + error="duplicate_external_write_suppressed", + raw_output={"duplicate": True, "previous": previous}, + ) + + external_writes[write_key] = { + "tool_name": tool_name, + "arguments": normalized_arguments, + "status": "attempted", + "content": "", + "error": None, + } + result = await tool.invoke(normalized_arguments, tool_context) + external_writes[write_key] = { + "tool_name": tool_name, + "arguments": normalized_arguments, + "status": "done" if result.success else "error", + "content": result.content, + "error": result.error, + } + return result async def execute_tool_call( self, @@ -115,3 +154,42 @@ class ToolExecutor: if tool_call.get("name"): return str(tool_call["name"]) return "unknown" + + +_EXTERNAL_WRITE_TOOL_TERMS = ( + "mail_send_email", + "mail_reply_to_message", + "mail_forward_message", + "mail_move_message", + "calendar_create_event", + "calendar_update_event", +) + + +def _external_write_state(context: ToolContext) -> dict[str, dict[str, Any]]: + state = context.metadata.setdefault("external_write_attempts", {}) + if not isinstance(state, dict): + state = {} + context.metadata["external_write_attempts"] = state + return state + + +def _external_write_key(tool_name: str, arguments: dict[str, Any]) -> str | None: + lowered = tool_name.lower() + if not any(term in lowered for term in _EXTERNAL_WRITE_TOOL_TERMS): + return None + payload = json.dumps(_normalize_for_key(arguments), ensure_ascii=False, sort_keys=True, separators=(",", ":")) + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"{lowered}:{digest}" + + +def _normalize_for_key(value: Any) -> Any: + if isinstance(value, dict): + return {str(key): _normalize_for_key(value[key]) for key in sorted(value, key=str)} + if isinstance(value, list): + return [_normalize_for_key(item) for item in value] + if isinstance(value, tuple): + return [_normalize_for_key(item) for item in value] + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return str(value) diff --git a/app-instance/backend/tests/unit/test_create_instance_script.py b/app-instance/backend/tests/unit/test_create_instance_script.py new file mode 100644 index 0000000..078be54 --- /dev/null +++ b/app-instance/backend/tests/unit/test_create_instance_script.py @@ -0,0 +1,69 @@ +import json +import os +import subprocess +from pathlib import Path + + +def test_create_instance_writes_default_max_tool_iterations(tmp_path) -> None: + app_instance_dir = Path(__file__).resolve().parents[3] + fake_bin = tmp_path / "bin" + fake_bin.mkdir() + docker = fake_bin / "docker" + docker.write_text( + """#!/usr/bin/env bash +set -euo pipefail +case "${1:-}" in + image) + [[ "${2:-}" == "inspect" ]] + exit 0 + ;; + container) + [[ "${2:-}" == "inspect" ]] + exit 1 + ;; + run) + exit 0 + ;; + *) + echo "unexpected docker command: $*" >&2 + exit 1 + ;; +esac +""", + encoding="utf-8", + ) + docker.chmod(0o755) + + env = os.environ.copy() + env["PATH"] = f"{fake_bin}:{env['PATH']}" + instances_root = tmp_path / "instances" + result = subprocess.run( + [ + str(app_instance_dir / "create-instance.sh"), + "--instance-id", + "default-tools", + "--auth-username", + "steven", + "--auth-password", + "secret", + "--skip-provider-config", + "--host-port", + "29001", + "--instances-root", + str(instances_root), + "--registry", + str(tmp_path / "registry.json"), + "--skip-initial-skills", + ], + cwd=app_instance_dir, + env=env, + text=True, + capture_output=True, + check=False, + ) + + assert result.returncode == 0, result.stderr + config_path = instances_root / "default-tools" / "beaver-home" / "config.json" + config = json.loads(config_path.read_text(encoding="utf-8")) + + assert config["agents"]["defaults"]["maxToolIterations"] == 100 diff --git a/app-instance/backend/tests/unit/test_process_projection.py b/app-instance/backend/tests/unit/test_process_projection.py index a28ee64..f3f409b 100644 --- a/app-instance/backend/tests/unit/test_process_projection.py +++ b/app-instance/backend/tests/unit/test_process_projection.py @@ -363,6 +363,52 @@ def test_process_projection_emits_tool_cards_from_run_messages(tmp_path: Path) - assert tool_result["metadata"]["success"] is True +def test_process_projection_marks_root_done_when_result_is_ready(tmp_path: Path) -> None: + session = SessionManager(tmp_path) + run_store = RunMemoryStore(tmp_path / "memory" / "runs") + run_store.append_run_record( + RunRecord( + run_id="main-run", + session_id="web:test", + task_id="task-1", + attempt_index=1, + task_text="send email", + started_at="2026-01-01T00:00:03+00:00", + ended_at="2026-01-01T00:00:04+00:00", + success=True, + finish_reason="stop", + ) + ) + session.append_message( + "web:test", + role="system", + event_type="task_execution_planned", + event_payload={"task_id": "task-1", "attempt_index": 1, "plan_mode": "single", "strategy": "single"}, + context_visible=False, + ) + session.append_message( + "web:test", + role="system", + event_type="task_synthesis_completed", + event_payload={"task_id": "task-1", "attempt_index": 1, "main_run_id": "main-run"}, + context_visible=False, + ) + session.append_message( + "web:test", + run_id="main-run", + role="system", + event_type="task_evidence_recorded", + event_payload={"task_id": "task-1", "attempt_index": 1, "evidence_status": "recorded"}, + context_visible=False, + ) + + projection = SessionProcessProjector(session, run_store).project("web:test") + + root_run = next(run for run in projection["runs"] if run["run_id"] == "task:task-1:attempt:1") + assert root_run["status"] == "done" + assert root_run["finished_at"] is not None + + def test_process_projection_exposes_ephemeral_guidance_artifacts(tmp_path: Path) -> None: session = SessionManager(tmp_path) run_store = RunMemoryStore(tmp_path / "memory" / "runs") diff --git a/app-instance/backend/tests/unit/test_skill_learning_worker.py b/app-instance/backend/tests/unit/test_skill_learning_worker.py index 4ff14b5..5068630 100644 --- a/app-instance/backend/tests/unit/test_skill_learning_worker.py +++ b/app-instance/backend/tests/unit/test_skill_learning_worker.py @@ -5,6 +5,8 @@ import json from pathlib import Path from types import SimpleNamespace +import pytest + from beaver.engine.providers.base import LLMProvider, LLMResponse from beaver.engine.providers.factory import ProviderBundle from beaver.engine.session import SessionManager @@ -13,6 +15,8 @@ from beaver.memory.skills import SkillLearningCandidate, SkillLearningStore from beaver.skills.authoring.format import is_canonical_skill_body from beaver.skills.drafts import DraftService from beaver.skills.learning import ( + DraftHasNoChanges, + DraftSynthesisInProgress, EvidenceSelector, SkillDraftSynthesizer, SkillLearningPipelineService, @@ -22,7 +26,7 @@ from beaver.skills.learning import ( ) from beaver.skills.publisher import SkillPublisher from beaver.skills.reviews import ReviewService -from beaver.skills.specs import SkillSpecStore +from beaver.skills.specs import SkillSpecStore, SkillVersion class JsonProvider(LLMProvider): @@ -44,6 +48,20 @@ class JsonProvider(LLMProvider): return "stub" +class BlockingJsonProvider(JsonProvider): + def __init__(self, *, started: asyncio.Event, release: asyncio.Event) -> None: + super().__init__() + self.started = started + self.release = release + self.calls = 0 + + async def chat(self, messages: list[dict], tools: list[dict] | None = None, model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7) -> LLMResponse: + self.calls += 1 + self.started.set() + await self.release.wait() + return await super().chat(messages, tools=tools, model=model, max_tokens=max_tokens, temperature=temperature) + + def _bundle(provider: LLMProvider) -> ProviderBundle: runtime = SimpleNamespace(model="stub", provider_name="stub") return ProviderBundle(main_runtime=runtime, main_provider=provider) # type: ignore[arg-type] @@ -120,6 +138,69 @@ def _pipeline(tmp_path: Path) -> SkillLearningPipelineService: ) +def _revision_pipeline(tmp_path: Path, content: str, frontmatter: dict) -> SkillLearningPipelineService: + spec_store = SkillSpecStore(tmp_path) + spec_store.write_skill_version( + SkillVersion( + skill_name="web-operation", + version="v0001", + content_hash="hash-v1", + summary_hash="summary-v1", + created_at="2026-06-01T00:00:00+00:00", + created_by="test", + change_reason="initial", + parent_version=None, + review_state="published", + frontmatter=frontmatter, + summary="web operation", + tool_hints=list(frontmatter.get("tools") or []), + ), + content, + ) + spec_store.set_current_version("web-operation", "v0001") + run_store = RunMemoryStore(tmp_path / "memory" / "runs") + learning_store = SkillLearningStore(tmp_path / "memory" / "skills") + run_store.append_run_record( + RunRecord( + run_id="run-1", + session_id="session-1", + task_text="check detailed weather", + started_at="start", + ended_at="end", + success=True, + finish_reason="stop", + ) + ) + learning_store.record_learning_candidate( + SkillLearningCandidate( + candidate_id="candidate-revision", + kind="revise_skill", + source_run_ids=["run-1"], + source_session_ids=["session-1"], + related_skill_names=["web-operation"], + reason="revise web guidance", + evidence={"skill_version": "v0001"}, + priority=10, + confidence=0.9, + ) + ) + draft_service = DraftService(spec_store) + learning_service = SkillLearningService( + run_store=run_store, + learning_store=learning_store, + draft_service=draft_service, + evidence_selector=EvidenceSelector(run_store), + synthesizer=SkillDraftSynthesizer(), + ) + return SkillLearningPipelineService( + learning_store=learning_store, + learning_service=learning_service, + draft_service=draft_service, + review_service=ReviewService(spec_store), + publisher=SkillPublisher(spec_store), + ) + + def test_worker_synthesizes_open_candidate_without_publish(tmp_path: Path) -> None: pipeline = _pipeline(tmp_path) worker = SkillLearningWorker( @@ -137,6 +218,104 @@ def test_worker_synthesizes_open_candidate_without_publish(tmp_path: Path) -> No assert pipeline.list_drafts(candidate.draft_skill_name)[0].status == "draft" +def test_concurrent_draft_synthesis_is_claimed_once(tmp_path: Path) -> None: + pipeline = _pipeline(tmp_path) + + async def scenario(): + started = asyncio.Event() + release = asyncio.Event() + provider = BlockingJsonProvider(started=started, release=release) + first = asyncio.create_task( + pipeline.synthesize_draft("candidate-1", provider_bundle=_bundle(provider)) + ) + await asyncio.wait_for(started.wait(), timeout=1) + with pytest.raises(DraftSynthesisInProgress): + await pipeline.synthesize_draft("candidate-1", provider_bundle=_bundle(JsonProvider())) + release.set() + return await first, provider + + draft, provider = asyncio.run(scenario()) + candidate = pipeline.get_candidate("candidate-1") + + assert provider.calls == 1 + assert candidate.status == "draft_ready" + assert candidate.draft_id == draft.draft_id + assert len(pipeline.list_drafts(candidate.draft_skill_name)) == 1 + + +def test_existing_draft_synthesis_request_returns_same_draft(tmp_path: Path) -> None: + pipeline = _pipeline(tmp_path) + first = asyncio.run(pipeline.synthesize_draft("candidate-1", provider_bundle=_bundle(JsonProvider()))) + second = asyncio.run(pipeline.synthesize_draft("candidate-1", provider_bundle=_bundle(JsonProvider(fail=True)))) + + assert second.draft_id == first.draft_id + assert len(pipeline.list_drafts(first.skill_name)) == 1 + + +def test_revision_synthesis_with_no_content_changes_supersedes_candidate(tmp_path: Path) -> None: + content = ( + "---\n" + "name: web-operation\n" + "description: Web search and fetch.\n" + "tools:\n" + " - web_fetch\n" + " - web_search\n" + "---\n" + "\n" + "# Web Operation\n" + "\n" + "## Overview\n" + "\n" + "Web search and fetch.\n" + "\n" + "## When to Use\n" + "\n" + "- Use when web information is required.\n" + "\n" + "## Required Tools\n" + "\n" + "- `web_fetch`\n" + "- `web_search`\n" + "\n" + "## Workflow\n" + "\n" + "- Use web_search, then web_fetch.\n" + "\n" + "## Validation\n" + "\n" + "- Verify sources.\n" + "\n" + "## Boundaries\n" + "\n" + "- Stay within the request.\n" + "\n" + "## Anti-Patterns\n" + "\n" + "- Do not cite unsupported claims.\n" + ) + frontmatter = { + "name": "web-operation", + "description": "Web search and fetch.", + "tools": ["web_fetch", "web_search"], + } + pipeline = _revision_pipeline(tmp_path, content, frontmatter) + provider = JsonProvider( + payload={ + "frontmatter": frontmatter, + "content": content, + "change_reason": "No changes are required.", + } + ) + + with pytest.raises(DraftHasNoChanges): + asyncio.run(pipeline.synthesize_draft("candidate-revision", provider_bundle=_bundle(provider))) + candidate = pipeline.get_candidate("candidate-revision") + + assert candidate.status == "superseded" + assert "no changes" in (candidate.last_error or "").lower() + assert pipeline.list_drafts("web-operation") == [] + + def test_worker_evaluates_draft_with_replay_runner_when_available(tmp_path: Path) -> None: pipeline = _pipeline(tmp_path) replay_runner = FakeReplayRunner() diff --git a/app-instance/backend/tests/unit/test_tool_assembler.py b/app-instance/backend/tests/unit/test_tool_assembler.py index bdd0464..601fa8b 100644 --- a/app-instance/backend/tests/unit/test_tool_assembler.py +++ b/app-instance/backend/tests/unit/test_tool_assembler.py @@ -28,12 +28,14 @@ class DummyTool(BaseTool): toolset=toolset, always_available=always_available, ) + self.calls: list[dict] = [] @property def spec(self) -> ToolSpec: return self._spec async def invoke(self, arguments: dict, context: ToolContext) -> ToolResult: + self.calls.append(dict(arguments)) return ToolResult(success=True, content="ok", tool_name=self.spec.name) @@ -198,3 +200,30 @@ def test_tool_executor_parses_object_tool_call_string_arguments() -> None: assert name == "echo" assert arguments == {"text": "hello"} + + +def test_tool_executor_suppresses_duplicate_external_write_in_same_run() -> None: + registry = ToolRegistry() + send_tool = DummyTool("mcp_outlook_mcp_mail_send_email", toolset="mcp") + registry.register(send_tool) + executor = ToolExecutor(registry) + context = ToolContext( + metadata={ + "task_id": "task-1", + "run_id": "run-1", + } + ) + arguments = { + "to_recipients": ["jay.chen@boardware.com"], + "subject": "请回复今天下午的日程安排", + "body": "Hi Jay", + } + + first = asyncio.run(executor.execute("mcp_outlook_mcp_mail_send_email", arguments, context=context)) + second = asyncio.run(executor.execute("mcp_outlook_mcp_mail_send_email", dict(arguments), context=context)) + + assert first.success is True + assert second.success is True + assert second.error == "duplicate_external_write_suppressed" + assert "Duplicate external write suppressed" in second.content + assert len(send_tool.calls) == 1 diff --git a/app-instance/create-instance.sh b/app-instance/create-instance.sh index 4a98987..1ee245b 100755 --- a/app-instance/create-instance.sh +++ b/app-instance/create-instance.sh @@ -187,6 +187,7 @@ skip_provider_config = os.environ["SKIP_PROVIDER_CONFIG"].strip() == "1" providers = {} agent_defaults = { "workspace": "/root/.beaver/workspace", + "maxToolIterations": 100, } if not skip_provider_config: provider_cfg = {"apiKey": os.environ["API_KEY"]} diff --git a/域名配置指引.md b/域名配置指引.md index f8bcf8e..d50315f 100644 --- a/域名配置指引.md +++ b/域名配置指引.md @@ -1,5 +1,7 @@ # Beaver Project 域名配置指引 +最后更新:2026-06-16。 + 这份文档说明如何从本机测试域名 `localhost` 子域名切换到正式域名。 核心结论: @@ -9,6 +11,7 @@ - `auth-portal` 和用户实例建议使用不同域名。 - 正式环境建议用外层 Nginx、Caddy、Traefik 或云负载均衡监听 `80/443`。 - `router-proxy` 必须收到原始 `Host` 头,才能按实例域名转发。 +- 正式实例入口推荐使用真实域名;不要用裸 IP 当实例基域名,除非你明确要走每实例直连端口模式。 ## 1. 默认端口职责 @@ -18,6 +21,9 @@ | `8088` | `router-proxy`,所有实例统一入口 | 可以,或由外层代理转发 | | `8090` | `deploy-control`,内部部署控制面 | 不建议 | | `19090` | `authz-service`,内部鉴权服务 | 不建议 | +| `8787` | `external-connector` sidecar 管理/调试口 | 不建议 | +| `9000/9001` | 本地 MinIO S3 API / Console | 不建议 | +| `20000-29999` | app-instance 直连端口池,通常绑定 `127.0.0.1`,裸 IP 模式可能对外绑定 | 不建议 | 正式部署时,通常由外层入口暴露 `80/443`,再转发到本机端口: @@ -91,6 +97,8 @@ proxy_set_header X-Forwarded-Proto $scheme; 否则 `router-proxy` 无法知道请求属于哪个实例。 +如果需要支持用户文件系统的大文件上传,外层代理还要允许足够大的 body。项目内 app-instance Nginx 当前是 `client_max_body_size 5g`,外层 Nginx/Caddy/负载均衡的限制不能比实际业务需求更小。 + ## 5. 项目内部要改哪些变量 实例公网地址由 `deploy-control` 里的这些变量决定: @@ -101,6 +109,7 @@ proxy_set_header X-Forwarded-Proto $scheme; | `DEPLOY_PUBLIC_BASE_DOMAIN` | 实例基域名,例如 `apps.example.com` | | `DEPLOY_PUBLIC_HOST_TEMPLATE` | Host 生成模板,默认 `{slug}.{base_domain}` | | `DEPLOY_PUBLIC_PORT` | 对外端口,`80` / `443` 会在生成 URL 时省略 | +| `DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP` | 仅裸 IP 基域名直连模式使用,控制实例宿主机端口绑定地址 | 本机测试: @@ -144,6 +153,20 @@ https://alice.apps.example.com 前提是外层代理已经把 `*.apps.example.com:443` 转发到 `router-proxy:8088`。 +裸 IP 特例: + +```bash +export BEAVER_BASE_DOMAIN=203.0.113.10 +``` + +当 `DEPLOY_PUBLIC_BASE_DOMAIN` 是 IP 地址时,`deploy-control` 会进入直连端口模式:每个实例从 `20000-29999` 端口池分配一个宿主机端口,生成类似: + +```text +http://203.0.113.10:20037 +``` + +这不是 `router-proxy` 的 Host 路由入口,也无法得到 `https://alice.apps.example.com` 这类实例子域名。正式环境推荐使用 `apps.example.com` 这类真实域名和通配 DNS。 + ## 6. 什么时候 URL 里可以不带端口 浏览器默认端口: @@ -225,6 +248,8 @@ apps.example.com -> 服务器 IP *.apps.example.com -> 服务器 IP ``` +正常域名部署不依赖 `DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP`;它只影响裸 IP 直连端口模式。生产入口应优先让外层代理监听 `80/443`,再转发到本机 `3081` 和 `8088`。 + ## 8. Nginx 外层代理示例 示例只展示关键转发逻辑,证书路径和自动签发方式按你的环境调整。 @@ -261,6 +286,7 @@ server { ssl_certificate_key /etc/letsencrypt/live/apps.example.com/privkey.pem; location / { + client_max_body_size 5g; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; @@ -309,14 +335,27 @@ portal.example.com -> 3081 *.apps.example.com -> 8088 ``` -### 不要公开 8090 和 19090 +### 不要公开内部端口 `8090` 是部署控制面,`19090` 是内部 AuthZ 服务。它们应该只允许容器网络或可信内网访问。 +同理,本地 MinIO 的 `9000/9001`、`external-connector:8787` 和实例直连端口池 `20000-29999` 也不应该作为正式公网入口。正式入口通常只有: + +```text +https://portal.example.com +https://.apps.example.com +``` + +外层代理再把它们分别转发到本机 `3081` 和 `8088`。 + ### 修改 DEPLOY_PUBLIC_* 后旧实例不会自动改 URL 这些变量影响新创建实例的 `public_url` 和 `instance_host`。旧实例已经写入注册表,需要重新创建或手动更新注册表和代理配置。 +### 裸 IP 不是通配子域名 + +如果 `DEPLOY_PUBLIC_BASE_DOMAIN=203.0.113.10`,系统会生成 `http://203.0.113.10:` 形式的直连地址,不会生成可用的 `alice.203.0.113.10` 子域名入口。要使用每用户子域名,必须准备真实域名并配置 `*.apps.example.com` 这类通配 DNS。 + ## 10. 本机测试不需要正式域名 如果只是本机验证完整链路,继续使用: diff --git a/部署指南.md b/部署指南.md index 7bad63f..ca8f130 100644 --- a/部署指南.md +++ b/部署指南.md @@ -1,11 +1,14 @@ # Beaver Project 本机部署指南 +最后更新:2026-06-16。 + 这份文档用于在一台 Linux 或 WSL2 Ubuntu 机器上跑完整链路: - `auth-portal` - `authz-service` - `deploy-control` - `router-proxy` +- `MinIO` 用户文件后端 - 可选的 `external-connector` sidecar - 自动创建出来的 `app-instance` @@ -17,6 +20,14 @@ 如果你只单独启动某个前端页面,页面可以打开,但注册、登录、创建实例这些动作不一定能通。 +当前部署链路的几个关键状态: + +- 注册阶段只创建实例和账号,不再写入模型 provider、model 或 API key。 +- 注册成功后由 `auth-portal` 的模型配置引导调用 `deploy-control /api/instances/configure-provider` 写入模型配置并重启实例;跳过引导也可以先进入实例。 +- 用户文件系统由 Beaver API 代理到 MinIO/S3,前端不会直接接触 bucket、prefix、access key 或 secret key。 +- `external-connector` 是微信、飞书/Lark 等通道的 sidecar;不使用这些通道时可以跳过,但新实例是否带连接器环境变量取决于创建实例时的 `deploy-control` 环境。 +- 新实例会从 `$PROJECT_ROOT/skills` 种入初始 published skills;`deploy-control` 容器必须以相同绝对路径只读挂载该目录。 + ## 0. 前提 推荐环境: @@ -184,6 +195,8 @@ beaver-deploy-control:8090 如果改的是 `BEAVER_BASE_DOMAIN`,还要重启 `beaver-deploy-control`。这个变量只影响之后新创建的实例;已经创建过的实例 URL 已经写入 `app-instance/runtime/registry/instances.json`,不会自动改成新域名。 +不要把 `BEAVER_BASE_DOMAIN` 设置成裸 IP,除非你明确想让实例走直连端口模式。`deploy-control` 检测到 `DEPLOY_PUBLIC_BASE_DOMAIN` 是 IP 时,会为每个实例分配 `20000-29999` 里的独立宿主机端口并生成 `http://:` 形式的 URL;这会绕过按 Host 分发的 `router-proxy` 域名入口。正式环境推荐使用真实域名,例如 `apps.example.com`。 + ### 非本机访问怎么配置域名 如果 Beaver 部署在服务器上,而用户从其他机器访问,不要使用 `localhost`。推荐准备一个真实域名,并把通配子域名解析到服务器,例如: @@ -427,12 +440,15 @@ docker run -d \ -e DEPLOY_PUBLIC_SCHEME="http" \ -e DEPLOY_PUBLIC_BASE_DOMAIN="$BEAVER_BASE_DOMAIN" \ -e DEPLOY_PUBLIC_PORT="8088" \ + -e DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP="0.0.0.0" \ -e DEPLOY_AUTO_START_PROXY="1" \ beaver/deploy-control:latest ``` `DEPLOY_PUBLIC_BASE_DOMAIN` 来自 `BEAVER_BASE_DOMAIN`。本机测试时可以是 `localhost`;如果要让其他设备访问,必须换成它们能解析到 Beaver 服务器的真实域名。修改后需要重启 `beaver-deploy-control`,并重新创建实例或手动更新 registry 后重载 `router-proxy`。 +`DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP` 只在 `DEPLOY_PUBLIC_BASE_DOMAIN` 是裸 IP 时生效,用来控制每个实例直连端口绑定在哪个宿主机地址。正常域名部署不依赖这个变量,实例流量应走 `router-proxy:8088`。 + 当前版本创建实例时会传 `--skip-provider-config`,也就是先不写 provider、model 或 API key。注册成功后,`auth-portal` 会进入模型配置引导页,再调用 `deploy-control /api/instances/configure-provider` 写入该实例的 `config.json` 并重启容器。 `DEFAULT_AUTHZ_INTERNAL_TOKEN` 会写入新建 app-instance 的后端 runtime env,用于 app-instance 后端读取自己的 internal MinIO settings。它不会传给前端。 @@ -441,6 +457,8 @@ docker run -d \ `DEFAULT_INITIAL_SKILLS_DIR` 需要和 `skills/` 的只读挂载路径一致。否则新实例能启动,但 workspace 里不会自动种入初始 published skills。 +如果是在实例创建后才更新 `$PROJECT_ROOT/skills` 里的初始 skills,已有实例不会自动同步这批初始文件。需要按实例使用 `scripts/deploy-initial-skills.sh` 或在实例内走 skills 管理/发布流程。 + ## 11. 启动 auth-portal ```bash @@ -477,6 +495,8 @@ docker ps --format 'table {{.Names}}\t{{.Status}}\t{{.Ports}}' docker logs --tail=50 beaver-router-proxy ``` +公网或局域网正式部署时,通常只应该对外开放 `80/443`,由外层代理转发到 `3081` 和 `8088`。`8090`、`19090`、`9000/9001`、`8787` 以及实例直连端口 `20000-29999` 默认都应限制在本机、容器网络或可信内网。 + 至少应该看到这些容器: - `beaver-authz-service` @@ -715,7 +735,7 @@ cd "$PROJECT_ROOT/app-instance" docker ps --format 'table {{.Names}}\t{{.Status}}' | grep app-instance ``` -排查 URL 变量: +排查部署变量: ```bash docker inspect beaver-authz-service --format '{{range .Config.Env}}{{println .}}{{end}}' \ @@ -725,10 +745,10 @@ docker inspect beaver-auth-portal --format '{{range .Config.Env}}{{println .}}{{ | egrep '^(AUTHZ_API_BASE_URL|DEPLOY_API_BASE_URL)=' docker inspect beaver-deploy-control --format '{{range .Config.Env}}{{println .}}{{end}}' \ - | egrep '^(DEFAULT_EXTERNAL_CONNECTOR_BASE_URL|DEFAULT_EXTERNAL_CONNECTOR_TOKEN|DEFAULT_BEAVER_BRIDGE_TOKEN|DEFAULT_INITIAL_SKILLS_DIR)=' + | egrep '^(DEPLOY_PUBLIC_|DEPLOY_DIRECT_PUBLIC_HOST_BIND_IP|DEFAULT_EXTERNAL_CONNECTOR_BASE_URL|DEFAULT_EXTERNAL_CONNECTOR_TOKEN|DEFAULT_BEAVER_BRIDGE_TOKEN|DEFAULT_INITIAL_SKILLS_DIR)=' ``` -它们都必须是完整 URL,不能是空字符串,也不能是裸 `host:port`。 +其中 `AUTHZ_*_BASE_URL`、`DEPLOY_API_BASE_URL`、`DEFAULT_EXTERNAL_CONNECTOR_BASE_URL` 这类 URL 必须带 `http://` 或 `https://`,不能是裸 `host:port`。token 变量不能为空;`DEFAULT_INITIAL_SKILLS_DIR` 必须对应 `deploy-control` 容器里真实存在、且和宿主机一致的绝对路径。 ## 17. 常见问题 @@ -857,6 +877,22 @@ EXTERNAL_CONNECTOR_CALLBACK_BASE_URL=http://app-instance-alice:8080 如果它为空,通常是实例创建时没有传 `--network "$BEAVER_NET"`,或者旧实例是在连接器变量加入前创建的。重新创建实例,或用同样的实例数据目录手工重建容器。 +### 使用裸 IP 做 BEAVER_BASE_DOMAIN 后 URL 变成直连端口 + +如果设置: + +```bash +export BEAVER_BASE_DOMAIN=203.0.113.10 +``` + +`deploy-control` 会把它识别成 IP,生成类似: + +```text +http://203.0.113.10:20037 +``` + +这是直连实例容器的宿主机端口模式,不是 `router-proxy` 的 Host 路由模式。要得到 `https://alice.apps.example.com` 这类地址,请改用真实域名并配置通配 DNS。 + ## 18. 重新部署基础容器 只重建基础容器和可选 sidecar: