"""SkillHub marketplace client and installer.""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import posixpath from typing import Any import httpx from beaver.skills.catalog.utils import parse_frontmatter, strip_frontmatter from beaver.skills.specs import SkillSpec, SkillSpecStore, SkillVersion from beaver.skills.specs.serialization import canonical_hash, normalize_frontmatter, summarize_skill_content SKILLHUB_BASE_URL = "https://skillhub.bwgdi.com" SKILLHUB_API_BASE = f"{SKILLHUB_BASE_URL}/api/web" @dataclass(slots=True) class SkillHubService: store: SkillSpecStore timeout_seconds: int = 30 async def search( self, *, q: str = "", sort: str = "relevance", page: int = 0, size: int = 12, namespace: str | None = None, ) -> dict[str, Any]: params = { "q": q, "sort": sort, "page": str(max(0, page)), "size": str(max(1, min(size, 50))), } if namespace: params["namespace"] = namespace.removeprefix("@") data = await self._get_json("/skills", params=params) payload = _unwrap(data) if not isinstance(payload, dict): payload = {} items = [self._with_install_state(item) for item in list(payload.get("items") or [])] return { "items": items, "total": int(payload.get("total") or len(items)), "page": int(payload.get("page") or page), "size": int(payload.get("size") or size), } async def detail(self, namespace: str, slug: str) -> dict[str, Any]: data = await self._get_json(f"/skills/{namespace.removeprefix('@')}/{slug}") payload = _unwrap(data) item = self._with_install_state(payload if isinstance(payload, dict) else {}) return item async def version(self, namespace: str, slug: str, version: str) -> dict[str, Any]: namespace = namespace.removeprefix("@") detail = _unwrap(await self._get_json(f"/skills/{namespace}/{slug}/versions/{version}")) files = _unwrap(await self._get_json(f"/skills/{namespace}/{slug}/versions/{version}/files")) if not isinstance(detail, dict): detail = {} if not isinstance(files, list): files = [] return {"detail": detail, "files": files} async def versions(self, namespace: str, slug: str) -> dict[str, Any]: namespace = namespace.removeprefix("@") payload = _unwrap(await self._get_json(f"/skills/{namespace}/{slug}/versions")) if not isinstance(payload, dict): payload = {} items = payload.get("items") return { "items": items if isinstance(items, list) else [], "total": int(payload.get("total") or len(items or [])), "page": int(payload.get("page") or 0), "size": int(payload.get("size") or len(items or [])), } async def file_content(self, namespace: str, slug: str, version: str, file_path: str) -> dict[str, Any]: namespace = namespace.removeprefix("@") safe_path = _safe_posix_path(file_path) content = await self._get_text( f"/skills/{namespace}/{slug}/versions/{version}/file", params={"path": safe_path}, ) return { "filePath": safe_path, "content": content, "contentType": _guess_content_type(safe_path), "isBinary": False, "fileSize": len(content.encode("utf-8")), } async def install(self, namespace: str, slug: str, version: str | None = None) -> dict[str, Any]: namespace = namespace.removeprefix("@") skill = await self.detail(namespace, slug) selected_version = version or _published_version(skill) if not selected_version: raise ValueError("SkillHub skill has no published version") version_payload = await self.version(namespace, slug, selected_version) files = list(version_payload.get("files") or []) contents: dict[str, str] = {} for item in files: file_path = _safe_posix_path(str(item.get("filePath") or item.get("path") or "")) contents[file_path] = await self._get_text( f"/skills/{namespace}/{slug}/versions/{selected_version}/file", params={"path": file_path}, ) skill_content = contents.get("SKILL.md") if not skill_content: raise ValueError("SkillHub version does not contain SKILL.md") frontmatter, body = parse_frontmatter(skill_content) skill_name = str(frontmatter.get("name") or skill.get("slug") or slug).strip() if not skill_name or "/" in skill_name or "\\" in skill_name or skill_name in {".", ".."}: raise ValueError(f"Unsafe skill name from SkillHub: {skill_name}") normalized_frontmatter = normalize_frontmatter( { **frontmatter, "name": skill_name, "description": frontmatter.get("description") or skill.get("summary") or skill_name, } ) rendered = _render_skill_content(normalized_frontmatter, body) content_hash = canonical_hash(rendered) existing = self.store.read_published_skill(skill_name) existing_spec = self.store.get_skill_spec(skill_name) if existing is not None and existing.version.content_hash == content_hash: return { "ok": True, "skill_name": skill_name, "version": existing.version.version, "source": "skillhub", "namespace": namespace, "slug": slug, "installed_path": str(self.store.root / skill_name), "already_installed": True, } next_version = self._next_version(skill_name) now = datetime.now(timezone.utc).isoformat() skill_version = SkillVersion( skill_name=skill_name, version=next_version, content_hash=content_hash, summary_hash=canonical_hash(strip_frontmatter(rendered).strip()), created_at=now, created_by="skillhub", change_reason=f"Install SkillHub {namespace}/{slug}@{selected_version}", parent_version=existing.version.version if existing is not None else None, review_state="published", frontmatter=normalized_frontmatter, summary=summarize_skill_content(body), tool_hints=self.store._extract_tool_hints(normalized_frontmatter), provenance={ "source": "skillhub", "namespace": namespace, "slug": slug, "skillhub_version": selected_version, "source_url": f"{SKILLHUB_BASE_URL}/space/{namespace}/{slug}", }, ) self.store.write_skill_version(skill_version, rendered) for file_path, content in contents.items(): if file_path == "SKILL.md": continue target = self.store.root / skill_name / "versions" / next_version / file_path target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content, encoding="utf-8") spec = existing_spec or SkillSpec( name=skill_name, display_name=str(skill.get("displayName") or skill_name), description=str(normalized_frontmatter.get("description") or skill_name), created_at=now, updated_at=now, current_version=next_version, status="active", tags=[], owners=["skillhub"], source_kind="skillhub", lineage=[], ) spec.current_version = next_version spec.updated_at = now spec.status = "active" spec.source_kind = "skillhub" if "skillhub" not in spec.owners: spec.owners.append("skillhub") self.store.write_skill_spec(spec) self.store.set_current_version(skill_name, next_version) published = self.store.read_index("published") if skill_name not in published: published.append(skill_name) self.store.update_index("published", published) return { "ok": True, "skill_name": skill_name, "version": next_version, "source": "skillhub", "namespace": namespace, "slug": slug, "installed_path": str(self.store.root / skill_name), "already_installed": False, } async def _get_json(self, path: str, *, params: dict[str, str] | None = None) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout_seconds, follow_redirects=True, trust_env=False) as client: response = await client.get(f"{SKILLHUB_API_BASE}{path}", params=params) response.raise_for_status() data = response.json() return data if isinstance(data, dict) else {} async def _get_text(self, path: str, *, params: dict[str, str]) -> str: async with httpx.AsyncClient(timeout=self.timeout_seconds, follow_redirects=True, trust_env=False) as client: response = await client.get(f"{SKILLHUB_API_BASE}{path}", params=params) response.raise_for_status() return response.text def _with_install_state(self, item: dict[str, Any]) -> dict[str, Any]: result = dict(item) slug = str(result.get("slug") or result.get("displayName") or "") namespace = str(result.get("namespace") or "").removeprefix("@") installed = self.store.get_skill_spec(slug) or self._find_installed_skillhub_spec(namespace, slug) result["installed"] = installed is not None and installed.status == "active" result["installed_version"] = installed.current_version if installed is not None else None return result def _find_installed_skillhub_spec(self, namespace: str, slug: str) -> SkillSpec | None: for spec in self.store.list_skill_specs(): loaded = self.store.read_published_skill(spec.name) provenance = loaded.version.provenance if loaded is not None else {} if provenance.get("source") == "skillhub" and provenance.get("namespace") == namespace and provenance.get("slug") == slug: return spec return None def _next_version(self, skill_name: str) -> str: versions = [item for item in self.store.list_versions(skill_name) if item.startswith("v")] numbers = [int(item[1:]) for item in versions if item[1:].isdigit()] return f"v{(max(numbers) if numbers else 0) + 1:04d}" def _unwrap(payload: dict[str, Any]) -> Any: if "data" in payload: return payload["data"] return payload def _published_version(item: dict[str, Any]) -> str | None: for key in ("publishedVersion", "headlineVersion"): value = item.get(key) if isinstance(value, dict) and value.get("version"): return str(value["version"]) return None def _safe_posix_path(value: str) -> str: cleaned = posixpath.normpath(value.replace("\\", "/")).lstrip("/") if cleaned in {"", ".", ".."} or cleaned.startswith("../") or "/../" in cleaned: raise ValueError(f"Unsafe SkillHub file path: {value}") return cleaned def _render_skill_content(frontmatter: dict[str, Any], body: str) -> str: lines = ["---"] for key, value in normalize_frontmatter(frontmatter).items(): if isinstance(value, list): lines.append(f"{key}:") for item in value: lines.append(f" - {item}") else: lines.append(f"{key}: {value}") lines.extend(["---", "", body.strip()]) return "\n".join(lines).rstrip() + "\n" def _guess_content_type(file_path: str) -> str: lower = file_path.lower() if lower.endswith(".md"): return "text/markdown" if lower.endswith(".json"): return "application/json" if lower.endswith((".txt", ".yaml", ".yml", ".toml", ".csv", ".log")): return "text/plain" return "text/plain"