#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re from pathlib import Path from typing import Any DEFAULT_POC_ROOT = os.environ.get("SOC_MEMORY_POC_ROOT", "/home/tom/soc_memory_poc") DEFAULT_VAULT_ROOT = str(Path(DEFAULT_POC_ROOT) / "obsidian-vault") TOKEN_RE = re.compile(r"[A-Za-z0-9_./:-]+") SKIP_DIRS = {"05_Templates"} SKIP_FILES = {"README.md"} def tokenize(text: str) -> list[str]: lowered = (text or "").lower() tokens = TOKEN_RE.findall(lowered) return [token for token in tokens if len(token) >= 3] def parse_frontmatter(text: str) -> tuple[dict[str, str], str]: if not text.startswith("---\n"): return {}, text parts = text.split("\n---\n", 1) if len(parts) != 2: return {}, text raw_frontmatter = parts[0].splitlines()[1:] body = parts[1] data: dict[str, str] = {} for line in raw_frontmatter: if ":" not in line: continue key, value = line.split(":", 1) data[key.strip()] = value.strip() return data, body def extract_title(body: str, fallback: str) -> str: for line in body.splitlines(): if line.startswith("# "): return line[2:].strip() return fallback def extract_section_text(body: str, heading: str) -> str: lines = body.splitlines() marker = f"## {heading}" collecting = False collected: list[str] = [] for line in lines: if line.strip() == marker: collecting = True continue if collecting and line.startswith("## "): break if collecting: stripped = line.strip() if stripped: collected.append(stripped) return " ".join(collected[:4]).strip() def extract_tags(body: str) -> list[str]: tags: list[str] = [] in_tag_section = False for line in body.splitlines(): if line.strip() == "## 标签": in_tag_section = True continue if in_tag_section and line.startswith("## "): break if in_tag_section: for token in re.findall(r"#[^\s,]+", line): tags.append(token) return tags def score_doc(query: str, tokens: list[str], doc: dict[str, Any]) -> tuple[int, list[str]]: score = 0 matched: list[str] = [] path_text = f"{doc['relative_path']} {doc['file_name']}".lower() title_text = doc["title"].lower() summary_text = doc.get("summary", "").lower() body_text = doc.get("body", "").lower() frontmatter_text = " ".join(f"{k}:{v}" for k, v in doc.get("frontmatter", {}).items()).lower() tags_text = " ".join(doc.get("tags", [])).lower() if query and query.lower() in body_text: score += 8 matched.append(query.lower()) case_id = doc.get("frontmatter", {}).get("case_id", "") if case_id and case_id.lower() in query.lower(): score += 80 matched.append(case_id.lower()) scenario = doc.get("frontmatter", {}).get("scenario", "") if scenario and scenario.lower() in query.lower(): score += 20 matched.append(scenario.lower()) for token in tokens: token_hit = False if token in title_text: score += 12 token_hit = True elif token in summary_text: score += 7 token_hit = True elif token in path_text: score += 6 token_hit = True elif token in frontmatter_text: score += 5 token_hit = True elif token in tags_text: score += 4 token_hit = True elif token in body_text: score += 1 token_hit = True if token_hit and token not in matched: matched.append(token) return score, matched[:8] def load_docs(vault_root: str | Path) -> list[dict[str, Any]]: vault_root = Path(vault_root) docs: list[dict[str, Any]] = [] for path in sorted(vault_root.rglob("*.md")): rel = path.relative_to(vault_root) if any(part in SKIP_DIRS for part in rel.parts): continue if path.name in SKIP_FILES: continue text = path.read_text(encoding="utf-8") frontmatter, body = parse_frontmatter(text) docs.append( { "file_name": path.name, "relative_path": str(rel), "absolute_path": str(path), "category": rel.parts[0] if rel.parts else "", "directory": str(rel.parent), "frontmatter": frontmatter, "title": extract_title(body, path.stem), "summary": extract_section_text(body, "告警摘要") or extract_section_text(body, "Summary"), "tags": extract_tags(body), "body": body, } ) return docs def main() -> None: parser = argparse.ArgumentParser(description="Search Obsidian SOC notes and return matching document references.") parser.add_argument("--query", required=True, help="Search query") parser.add_argument("--vault-root", default=DEFAULT_VAULT_ROOT, help="Obsidian vault root") parser.add_argument("--limit", type=int, default=5, help="Maximum results") parser.add_argument("--scenario", default="", help="Optional scenario filter") args = parser.parse_args() docs = load_docs(args.vault_root) tokens = tokenize(args.query) results: list[dict[str, Any]] = [] for doc in docs: scenario = doc.get("frontmatter", {}).get("scenario", "") if args.scenario and scenario != args.scenario: continue score, matched_terms = score_doc(args.query, tokens, doc) if score <= 0: continue results.append( { "score": score, "title": doc["title"], "file_name": doc["file_name"], "relative_path": doc["relative_path"], "directory": doc["directory"], "category": doc["category"], "scenario": scenario, "summary": doc.get("summary", ""), "tags": doc.get("tags", []), "matched_terms": matched_terms, } ) results.sort(key=lambda item: item["score"], reverse=True) payload = { "query": args.query, "vault_root": str(Path(args.vault_root)), "matched_docs": results[: args.limit], } print(json.dumps(payload, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()