update memory gateway

This commit is contained in:
2026-04-30 16:09:28 +08:00
parent e6b1520bce
commit ba84b1ddb3
98 changed files with 1341 additions and 6783 deletions

View File

@ -0,0 +1,116 @@
---
name: memory-gateway
description: Use this skill when an agent or harness needs reusable memory: search prior context, retrieve OpenViking resources, upload documents into knowledge, summarize arbitrary content with the Memory Gateway LLM, commit final conclusions, or cite related Obsidian notes. This skill is domain-neutral.
version: 2.0.0
metadata:
hermes:
tags: [memory, openviking, obsidian, knowledge, retrieval, summarization, document-ingestion, agent-context]
---
# Memory Gateway
Use this skill as a generic memory layer for any agent / harness. It connects Hermes to the local Memory Gateway at `http://127.0.0.1:1934`, which fronts OpenViking and an Obsidian vault.
## Trigger Rule
Use this skill when the user asks to:
- search prior memory or retrieve related context
- upload a document and make it reusable knowledge
- summarize content and store it as memory/resource
- commit final conclusions, decisions, lessons learned, or research notes
- cite related OpenViking resources or Obsidian notes
- prepare context for another agent or workflow
Do not assume any domain-specific workflow. Treat Memory Gateway as a reusable memory and knowledge entrypoint.
## Environment
Defaults:
- Memory Gateway URL: `http://127.0.0.1:1934`
- Obsidian vault: `/home/tom/memory-gateway/obsidian-vault`
- Default namespace: `memory-gateway`
Optional env vars:
- `MEMORY_GATEWAY_URL`
- `MEMORY_GATEWAY_API_KEY`
- `MEMORY_GATEWAY_OBSIDIAN_VAULT`
## Core Workflows
### 1. Retrieve Context
```bash
python /home/tom/.hermes/skills/memory-gateway/scripts/retrieve_memory.py \
--query "project decision memory gateway LLM summary" \
--uri viking://resources \
--limit 5
```
Use retrieval before answering when prior context may materially improve correctness.
### 2. Summarize And Commit
```bash
python /home/tom/.hermes/skills/memory-gateway/scripts/commit_summary.py \
--title "Project decision summary" \
--namespace memory-gateway \
--memory-type decision \
--tag project --tag decision \
--persist-as resource \
--text "<final conclusion or reusable knowledge>"
```
This calls `POST /api/summary`, which uses the configured LLM and writes to OpenViking when `persist-as` is not `none`.
### 3. Upload Document As Knowledge
```bash
python /home/tom/.hermes/skills/memory-gateway/scripts/upload_knowledge.py \
--file /path/to/document.pdf \
--title "Design Notes" \
--namespace memory-gateway \
--knowledge-type design_doc \
--tags project,design,reference \
--persist-as resource
```
This calls `POST /api/knowledge/upload`: document -> MarkItDown Markdown -> Obsidian note -> LLM summary -> OpenViking resource.
### 4. Search Obsidian Notes
```bash
python /home/tom/.hermes/skills/memory-gateway/scripts/search_obsidian.py \
--query "design notes memory gateway" \
--limit 5
```
## Output Template
When using this skill, answer with:
```markdown
## Answer
<direct answer or synthesis>
## Memory / Resource References
- `<title or URI>``<viking://...>` — why it matters
## Obsidian References
- `<note.md>``<relative path>` — why it matters
## Suggested Memory Commit
- commit: yes/no
- namespace:
- memory_type:
- tags:
- resource_uri: if committed
```
## Guardrails
- Do not store raw noisy data as long-term memory when a concise summary is enough.
- Prefer LLM summaries and structured artifacts over full chat transcripts.
- Do not commit secrets, credentials, tokens, private keys, or unnecessary personal data.
- If content is sensitive, summarize and redact before committing.
- If retrieval quality looks noisy, state that and cite only useful results.
- Always report whether a commit/upload actually succeeded and include the returned resource URI when available.

View File

@ -0,0 +1,19 @@
from __future__ import annotations
import json
import os
import urllib.request
from typing import Any
DEFAULT_GATEWAY_URL = os.environ.get("MEMORY_GATEWAY_URL", "http://127.0.0.1:1934")
DEFAULT_GATEWAY_API_KEY = os.environ.get("MEMORY_GATEWAY_API_KEY", "")
def post_json(path: str, payload: dict[str, Any], gateway_url: str = DEFAULT_GATEWAY_URL, api_key: str = DEFAULT_GATEWAY_API_KEY, timeout: int = 120) -> dict[str, Any]:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(gateway_url.rstrip("/") + path, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("X-API-Key", api_key)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json
def load_text(args: argparse.Namespace) -> str:
if args.file:
return Path(args.file).read_text(encoding="utf-8")
if args.text:
return args.text
return sys.stdin.read().strip()
def main() -> None:
parser = argparse.ArgumentParser(description="Summarize arbitrary content with the Gateway LLM and commit it as memory/resource.")
parser.add_argument("--text", help="Text to summarize; stdin is used if omitted")
parser.add_argument("--file", help="File containing text to summarize")
parser.add_argument("--title", default="")
parser.add_argument("--summary", default="", help="Optional summary hint")
parser.add_argument("--namespace", default="memory-gateway")
parser.add_argument("--memory-type", default="summary")
parser.add_argument("--tag", action="append", default=[])
parser.add_argument("--source", default="hermes:memory-gateway")
parser.add_argument("--resource-uri", default="")
parser.add_argument("--persist-as", choices=["memory", "resource", "both", "none"], default="resource")
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL)
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY)
args = parser.parse_args()
content = load_text(args)
if not content:
parser.error("No content provided via --text, --file, or stdin")
payload = {
"content": content,
"title": args.title or None,
"summary": args.summary or None,
"namespace": args.namespace,
"memory_type": args.memory_type,
"tags": args.tag,
"source": args.source,
"resource_uri": args.resource_uri or None,
"persist_as": args.persist_as,
}
print(json.dumps(post_json("/api/summary", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json
def main() -> None:
parser = argparse.ArgumentParser(description="Retrieve memory/resources from Memory Gateway.")
parser.add_argument("--query", required=True, help="Search query")
parser.add_argument("--uri", default="", help="Optional OpenViking URI scope, e.g. viking://resources/project")
parser.add_argument("--namespace", default="", help="Optional namespace if URI is not provided")
parser.add_argument("--limit", type=int, default=5)
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL)
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY)
args = parser.parse_args()
payload = {"query": args.query, "limit": args.limit}
if args.uri:
payload["uri"] = args.uri
if args.namespace:
payload["namespace"] = args.namespace
result = post_json("/api/search", payload, args.gateway_url, args.api_key)
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from pathlib import Path
DEFAULT_VAULT = os.environ.get("MEMORY_GATEWAY_OBSIDIAN_VAULT", "/home/tom/memory-gateway/obsidian-vault")
def tokenize(query: str) -> list[str]:
return [t.lower() for t in re.split(r"[^\w\u4e00-\u9fff.-]+", query) if len(t.strip()) > 1]
def main() -> None:
parser = argparse.ArgumentParser(description="Search local Obsidian Markdown notes by keyword.")
parser.add_argument("--query", required=True)
parser.add_argument("--vault-root", default=DEFAULT_VAULT)
parser.add_argument("--limit", type=int, default=5)
args = parser.parse_args()
root = Path(args.vault_root)
tokens = tokenize(args.query)
results = []
for file in root.rglob("*.md"):
try:
text = file.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
haystack = (file.name + "\n" + text).lower()
matched = [token for token in tokens if token in haystack]
if not matched:
continue
summary = ""
for line in text.splitlines():
line = line.strip("# -\t")
if len(line) > 30:
summary = line[:240]
break
results.append({
"score": len(matched) * 10 + min(len(matched), 10),
"file_name": file.name,
"relative_path": str(file.relative_to(root)),
"absolute_path": str(file),
"matched_terms": matched,
"summary": summary,
})
results.sort(key=lambda item: item["score"], reverse=True)
print(json.dumps({"query": args.query, "vault_root": str(root), "matched_docs": results[:args.limit]}, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import mimetypes
import urllib.request
from pathlib import Path
from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL
def multipart_upload(url: str, fields: dict[str, str], file_path: Path, api_key: str = "") -> dict:
boundary = "----memorygatewayboundary"
body = bytearray()
for name, value in fields.items():
if value == "":
continue
body.extend(f"--{boundary}\r\n".encode())
body.extend(f'Content-Disposition: form-data; name="{name}"\r\n\r\n{value}\r\n'.encode())
body.extend(f"--{boundary}\r\n".encode())
mime = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
body.extend(f'Content-Disposition: form-data; name="file"; filename="{file_path.name}"\r\n'.encode())
body.extend(f"Content-Type: {mime}\r\n\r\n".encode())
body.extend(file_path.read_bytes())
body.extend(b"\r\n")
body.extend(f"--{boundary}--\r\n".encode())
req = urllib.request.Request(url, data=bytes(body), method="POST")
req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
if api_key:
req.add_header("X-API-Key", api_key)
with urllib.request.urlopen(req, timeout=180) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> None:
parser = argparse.ArgumentParser(description="Upload a document, convert to Markdown, save to Obsidian, summarize with LLM, and commit to OpenViking.")
parser.add_argument("--file", required=True)
parser.add_argument("--title", default="")
parser.add_argument("--namespace", default="memory-gateway")
parser.add_argument("--knowledge-type", default="knowledge")
parser.add_argument("--tags", default="")
parser.add_argument("--source", default="")
parser.add_argument("--obsidian-dir", default="")
parser.add_argument("--resource-uri", default="")
parser.add_argument("--persist-as", choices=["memory", "resource", "both", "none"], default="resource")
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL)
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY)
args = parser.parse_args()
fields = {
"title": args.title,
"namespace": args.namespace,
"knowledge_type": args.knowledge_type,
"tags": args.tags,
"source": args.source,
"obsidian_dir": args.obsidian_dir,
"resource_uri": args.resource_uri,
"persist_as": args.persist_as,
}
result = multipart_upload(args.gateway_url.rstrip("/") + "/api/knowledge/upload", fields, Path(args.file), args.api_key)
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -1,314 +0,0 @@
---
name: soc-memory-poc
description: Load this skill whenever Hermes is handling SOC alert triage, phishing investigation, suspicious O365 login analysis, historical case lookup, Obsidian note lookup, case-note generation, or committing high-value SOC findings into the SOC Memory POC. It provides a strict triage workflow using the SOC Memory Gateway for search/write operations, local Obsidian vault search, and local SOC Memory POC scripts for Obsidian case note generation.
version: 1.3.0
metadata:
hermes:
tags: [soc, memory, openviking, obsidian, incident-response, case-triage, phishing, o365]
related_skills: [hermes-agent]
---
# SOC Memory POC
Use this skill for SOC case workflows only. It is the default procedure for phishing-style alerts, suspicious O365 / Entra ID login cases, historical case comparison, Obsidian knowledge lookup, and case-note generation.
## Mandatory Trigger Rule
Load this skill immediately when the user asks Hermes to do any of the following:
- investigate or triage a SOC alert
- find similar phishing or O365 suspicious-login cases
- retrieve related KB or playbook context before concluding a case
- check whether Obsidian already has a related case note or knowledge note
- generate an Obsidian case note from a normalized case
- commit a normalized case or knowledge artifact into the SOC memory system
If the task is clearly SOC triage related, do not proceed without using this skill.
## What This Skill Connects To
This skill assumes:
- SOC Memory POC root: `/home/tom/soc_memory_poc`
- Memory Gateway URL: `http://127.0.0.1:1934`
- Gateway API key: empty by default unless configured otherwise
- Obsidian vault root: `/home/tom/soc_memory_poc/obsidian-vault`
Override with environment variables when needed:
- `SOC_MEMORY_POC_ROOT`
- `SOC_MEMORY_GATEWAY_URL`
- `SOC_MEMORY_GATEWAY_API_KEY`
Capabilities:
- search SOC case / knowledge context through the Memory Gateway
- search existing Obsidian notes by case ID, scenario, keywords, or tags
- commit normalized case / knowledge JSON through the Memory Gateway
- generate Obsidian case notes from normalized case JSON
## Triage Workflow
Follow this order unless the user explicitly asks for something narrower.
### Preferred Path For Structured Alerts (Scheme A)
If the user provides a structured alert summary with fields like user, host, sender, subject, attachment, URL, IP, alert type, or known facts, do **not** manually improvise the final answer from memory search results alone.
Use the deterministic triage helper first:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/triage_alert.py \
--scenario phishing \
--alert-type mail_suspicious_attachment \
--user alice@corp.example \
--host FIN-LAPTOP-12 \
--sender billing@vendor-payments.com \
--subject "Invoice overdue notice" \
--attachment invoice_review.html \
--url https://vendor-payments-login.com/review \
--ip 198.51.100.20 \
--summary "Invoice-themed phishing email with HTML attachment and credential harvesting link" \
--fact "DMARC failed" \
--fact "User may have clicked the link"
```
This script performs:
- case retrieval from the SOC Memory Gateway
- knowledge retrieval from the SOC Memory Gateway
- Obsidian note lookup from the local vault
- final markdown rendering with all required sections populated
For scheme A, prefer returning the script output with only light cleanup. Do not drop the `关联 Memory Retrieval` or `关联 Obsidian 文档` sections.
### Preferred Path For Freeform Alerts Or Raw Email Content
If the user does **not** provide neatly separated fields, or pastes raw email content / ticket text / freeform alert text, do not force them into Scheme A manually.
Use the unified triage helper:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/triage_email.py --text "From: billing@vendor-payments.com
To: alice@corp.example
Subject: Invoice overdue notice
Attachment: invoice_review.html
User clicked the link after opening the HTML attachment. DMARC failed. Review at https://vendor-payments-login.com/review from IP 198.51.100.20 on host FIN-LAPTOP-12."
```
Or point it at a file:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/triage_email.py --file /path/to/raw_email.txt
```
This helper will:
- infer the most likely scenario and alert type
- extract sender, user, subject, attachment, URL, IP, and host when possible
- carry over important facts like DMARC failure, user click, MFA fatigue, inbox rule, or OAuth consent
- run the deterministic triage pipeline so the final answer still contains `关联 Memory Retrieval` and `关联 Obsidian 文档`
For non-structured input, prefer this helper over freehand reasoning.
For all SOC triage inputs, `triage_email.py` is the preferred single entrypoint. It accepts raw text, a file, or optional structured overrides, then calls the deterministic retrieval pipeline.
### Phase 1: Ground The Case
First identify:
- scenario: `phishing`, `o365_suspicious_login`, or another SOC scenario
- likely alert type
- short case summary in one sentence
- key observables if available: sender, URL, domain, IP, mailbox, user, hash
Do not start by writing memory. Start by grounding the case.
### Phase 2: Retrieve Memory Context Before Judging
Before concluding the case, search both related history and related knowledge.
1. Search similar historical cases.
2. Search KB / playbook context.
3. Compare the current case against what comes back.
Run these separately for better precision.
Case search example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/search_context.py \
--query "invoice phishing html attachment credential harvesting" \
--kind case --limit 5
```
Knowledge search example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/search_context.py \
--query "invoice phishing html attachment credential harvesting" \
--kind knowledge --limit 5
```
O365 example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/search_context.py \
--query "impossible travel MFA fatigue inbox rule oauth consent" \
--kind knowledge --limit 5
```
Search scopes:
- `case` -> `viking://resources/soc-memory-poc/case`
- `knowledge` -> `viking://resources/soc-memory-poc/knowledge`
- `all` -> `viking://resources/soc-memory-poc`
### Phase 3: Retrieve Obsidian References
After memory retrieval, look for related notes in the Obsidian vault so the final answer can reference existing human-readable documentation.
Example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/search_obsidian_docs.py \
--query "invoice phishing html attachment credential harvesting" \
--scenario phishing \
--limit 5
```
Use this to surface:
- existing case notes
- related scenario notes
- notes whose names, tags, or content closely match the current case
When reporting Obsidian references, include at least:
- note title or file name
- relative path under `obsidian-vault/`
- why the note is relevant
### Phase 4: Produce The Triage Output
After retrieval, synthesize a result that includes:
- likely verdict or current assessment
- strongest evidence
- closest matching historical cases
- most relevant KB / playbook guidance
- related Obsidian notes
- recommended next investigation or response actions
Do not just paste raw search output. Summarize why the returned items matter.
## Final Output Template
Unless the user asks for a different format, use this structure for final SOC triage answers:
### 研判结果
- one short paragraph with the likely verdict / current assessment
### 关键证据
- 2 to 5 flat bullets with the strongest evidence
### 关联 Memory Retrieval
- one flat bullet per retrieved case / knowledge item
- include: ID + short relevance reason
- example: `CASE-2026-0001`: same invoice lure + HTML attachment + credential harvesting flow
### 关联 Obsidian 文档
- one flat bullet per note
- include: note name + relative path + one-line relevance reason
- example: `CASE-2026-0001 - Finance user ...md``02_Cases/phishing/...` — already documents a near-identical phishing pattern
### 建议动作
- 2 to 5 flat bullets with next investigation or response steps
If no Obsidian note matches, explicitly say `未找到直接关联的 Obsidian 文档`.
### Phase 5: Generate Case Note When The Case Is Mature Enough
If the task includes documenting the result, or the case already has a normalized JSON artifact, generate an Obsidian case note.
Example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/generate_case_note.py \
--input /home/tom/soc_memory_poc/evaluation/datasets/normalized_cases/CASE-2026-0001.json \
--enrich-from-openviking \
--top-k 3
```
This writes under `obsidian-vault/02_Cases/<scenario>/`.
Use `--enrich-from-openviking` by default when the gateway is available.
### Phase 6: Commit Only High-Value Artifacts
If Hermes has a normalized case or knowledge JSON that is worth preserving, commit it through the Gateway.
Example:
```bash
python /home/tom/.hermes/skills/soc-memory-poc/scripts/commit_case_memory.py \
--input /home/tom/soc_memory_poc/evaluation/datasets/normalized_cases/CASE-2026-0001.json
```
Only commit normalized, reusable artifacts. Do not commit raw logs, raw tool traces, or ad hoc chat text.
## Recommended Defaults By Scenario
### Phishing
Default order:
1. search `case`
2. search `knowledge`
3. search related Obsidian notes
4. assess sender auth, lure type, landing page, user interaction
5. generate case note if the case is already structured
6. commit only if the case artifact is normalized and high value
Good query ingredients:
- lure theme
- attachment type
- credential harvesting
- fake M365 login
- sender domain
- landing URL pattern
### O365 Suspicious Login
Default order:
1. search `case`
2. search `knowledge`
3. search related Obsidian notes
4. assess impossible travel, MFA fatigue, inbox rule abuse, OAuth consent, legacy auth
5. generate case note if the case is already structured
6. commit only if the case artifact is normalized and high value
Good query ingredients:
- impossible travel
- MFA fatigue
- inbox rule
- foreign login
- OAuth consent
- legacy protocol
## Failure Handling
If Gateway search fails:
- say explicitly that the SOC Memory Gateway is unavailable
- do not pretend retrieval succeeded
- continue with local reasoning only if the user still wants that
If Obsidian search fails:
- say explicitly that Obsidian references could not be retrieved
- do not invent note names or paths
If note generation fails:
- report the failing path or command
- do not claim the note was written
If commit fails:
- report the URI or file that failed
- do not claim the memory was stored
## Guardrails
- Search `case` and `knowledge` separately before concluding a triage result.
- Search Obsidian notes after memory retrieval so final output can point to human-readable references.
- Prefer narrow, scenario-specific queries over vague long prompts.
- Do not dump raw investigative process into memory.
- Generate case notes from normalized case JSON, not from freeform chat.
- Commit only high-value, reusable artifacts.
- When Gateway results look noisy, explain that retrieval quality may still need SOC-specific reranking.

View File

@ -1,66 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_GATEWAY_URL = os.environ.get("SOC_MEMORY_GATEWAY_URL", "http://127.0.0.1:1934")
DEFAULT_GATEWAY_API_KEY = os.environ.get("SOC_MEMORY_GATEWAY_API_KEY", "")
def load_item(path: str | Path) -> dict[str, Any]:
with Path(path).open("r", encoding="utf-8") as f:
return json.load(f)
def build_resource_uri(item: dict[str, Any]) -> str:
memory_type = item.get("memory_type")
item_id = item["id"]
if memory_type == "case":
scenario = item.get("scenario", "general")
return f"viking://resources/soc-memory-poc/case/{scenario}/{item_id}.json"
if memory_type == "knowledge":
doc_type = item.get("doc_type", "general")
return f"viking://resources/soc-memory-poc/knowledge/{doc_type}/{item_id}.json"
raise SystemExit(f"Unsupported memory_type: {memory_type}")
def post_json(url: str, payload: dict[str, Any], api_key: str = "") -> dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("X-API-Key", api_key)
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> None:
parser = argparse.ArgumentParser(description="Commit a normalized SOC case / knowledge JSON through the Memory Gateway.")
parser.add_argument("--input", required=True, help="Normalized JSON file path")
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL, help="Memory Gateway base URL")
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY, help="Gateway API key if required")
args = parser.parse_args()
item = load_item(args.input)
payload = {
"uri": build_resource_uri(item),
"content": json.dumps(item, ensure_ascii=False, indent=2),
"resource_type": "json",
}
try:
result = post_json(args.gateway_url.rstrip("/") + "/api/resource", payload, api_key=args.api_key)
except urllib.error.URLError as exc:
raise SystemExit(f"Gateway resource commit failed: {exc}") from exc
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -1,48 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import subprocess
import sys
from pathlib import Path
DEFAULT_POC_ROOT = os.environ.get("SOC_MEMORY_POC_ROOT", "/home/tom/soc_memory_poc")
def main() -> None:
parser = argparse.ArgumentParser(description="Generate an Obsidian case note from a normalized SOC case JSON file.")
parser.add_argument("--input", required=True, help="Normalized case JSON path")
parser.add_argument("--output-dir", default=None, help="Override Obsidian output directory")
parser.add_argument("--enrich-from-openviking", action="store_true", help="Enrich with OpenViking recommendations")
parser.add_argument("--top-k", type=int, default=3, help="Recommendation count per type")
parser.add_argument("--poc-root", default=DEFAULT_POC_ROOT, help="SOC Memory POC root")
args = parser.parse_args()
poc_root = Path(args.poc_root)
script_path = poc_root / "skills" / "summarize_case_skill" / "generate_case_note.py"
if not script_path.exists():
raise SystemExit(f"SOC Memory POC summarize script not found: {script_path}")
output_dir = args.output_dir or str(poc_root / "obsidian-vault" / "02_Cases")
cmd = [
sys.executable,
str(script_path),
"--input",
args.input,
"--output-dir",
output_dir,
"--top-k",
str(args.top_k),
]
if args.enrich_from_openviking:
cmd.append("--enrich-from-openviking")
env = os.environ.copy()
existing = env.get("PYTHONPATH", "")
env["PYTHONPATH"] = str(poc_root) + (os.pathsep + existing if existing else "")
subprocess.run(cmd, check=True, env=env)
if __name__ == "__main__":
main()

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import urllib.error
import urllib.request
from typing import Any
DEFAULT_GATEWAY_URL = os.environ.get("SOC_MEMORY_GATEWAY_URL", "http://127.0.0.1:1934")
DEFAULT_GATEWAY_API_KEY = os.environ.get("SOC_MEMORY_GATEWAY_API_KEY", "")
URI_PREFIXES = {
"case": "viking://resources/soc-memory-poc/case",
"knowledge": "viking://resources/soc-memory-poc/knowledge",
"all": "viking://resources/soc-memory-poc",
}
def post_json(url: str, payload: dict[str, Any], api_key: str = "") -> dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("X-API-Key", api_key)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def canonicalize_uri(uri: str) -> str:
if ".json/" in uri:
return uri.split(".json/", 1)[0] + ".json"
return uri
def filter_results(results: list[dict[str, Any]], prefix: str) -> list[dict[str, Any]]:
deduped: dict[str, dict[str, Any]] = {}
for item in results:
uri = item.get("uri") or ""
canonical = canonicalize_uri(uri)
if not canonical.startswith(prefix):
continue
score = item.get("score") or 0
payload = dict(item)
payload["uri"] = canonical
if canonical not in deduped or score > (deduped[canonical].get("score") or 0):
deduped[canonical] = payload
return sorted(deduped.values(), key=lambda entry: entry.get("score") or 0, reverse=True)
def main() -> None:
parser = argparse.ArgumentParser(description="Search SOC Memory Gateway for case / knowledge context.")
parser.add_argument("--query", required=True, help="Search query")
parser.add_argument("--kind", choices=["case", "knowledge", "all"], default="all", help="SOC resource scope")
parser.add_argument("--limit", type=int, default=5, help="Max results")
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL, help="Memory Gateway base URL")
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY, help="Gateway API key if required")
args = parser.parse_args()
prefix = URI_PREFIXES[args.kind]
payload = {
"query": args.query,
"limit": max(args.limit * 5, 10),
"uri": prefix,
}
try:
result = post_json(args.gateway_url.rstrip("/") + "/api/search", payload, api_key=args.api_key)
except urllib.error.URLError as exc:
raise SystemExit(f"Gateway search failed: {exc}") from exc
raw_results = result.get("results", [])
filtered = filter_results(raw_results, prefix)
output = {
"query": args.query,
"kind": args.kind,
"uri_prefix": prefix,
"results": filtered[: args.limit],
"total": len(filtered),
}
print(json.dumps(output, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -1,205 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from pathlib import Path
from typing import Any
DEFAULT_POC_ROOT = os.environ.get("SOC_MEMORY_POC_ROOT", "/home/tom/soc_memory_poc")
DEFAULT_VAULT_ROOT = str(Path(DEFAULT_POC_ROOT) / "obsidian-vault")
TOKEN_RE = re.compile(r"[A-Za-z0-9_./:-]+")
SKIP_DIRS = {"05_Templates"}
SKIP_FILES = {"README.md"}
def tokenize(text: str) -> list[str]:
lowered = (text or "").lower()
tokens = TOKEN_RE.findall(lowered)
return [token for token in tokens if len(token) >= 3]
def parse_frontmatter(text: str) -> tuple[dict[str, str], str]:
if not text.startswith("---\n"):
return {}, text
parts = text.split("\n---\n", 1)
if len(parts) != 2:
return {}, text
raw_frontmatter = parts[0].splitlines()[1:]
body = parts[1]
data: dict[str, str] = {}
for line in raw_frontmatter:
if ":" not in line:
continue
key, value = line.split(":", 1)
data[key.strip()] = value.strip()
return data, body
def extract_title(body: str, fallback: str) -> str:
for line in body.splitlines():
if line.startswith("# "):
return line[2:].strip()
return fallback
def extract_section_text(body: str, heading: str) -> str:
lines = body.splitlines()
marker = f"## {heading}"
collecting = False
collected: list[str] = []
for line in lines:
if line.strip() == marker:
collecting = True
continue
if collecting and line.startswith("## "):
break
if collecting:
stripped = line.strip()
if stripped:
collected.append(stripped)
return " ".join(collected[:4]).strip()
def extract_tags(body: str) -> list[str]:
tags: list[str] = []
in_tag_section = False
for line in body.splitlines():
if line.strip() == "## 标签":
in_tag_section = True
continue
if in_tag_section and line.startswith("## "):
break
if in_tag_section:
for token in re.findall(r"#[^\s,]+", line):
tags.append(token)
return tags
def score_doc(query: str, tokens: list[str], doc: dict[str, Any]) -> tuple[int, list[str]]:
score = 0
matched: list[str] = []
path_text = f"{doc['relative_path']} {doc['file_name']}".lower()
title_text = doc["title"].lower()
summary_text = doc.get("summary", "").lower()
body_text = doc.get("body", "").lower()
frontmatter_text = " ".join(f"{k}:{v}" for k, v in doc.get("frontmatter", {}).items()).lower()
tags_text = " ".join(doc.get("tags", [])).lower()
if query and query.lower() in body_text:
score += 8
matched.append(query.lower())
case_id = doc.get("frontmatter", {}).get("case_id", "")
if case_id and case_id.lower() in query.lower():
score += 80
matched.append(case_id.lower())
scenario = doc.get("frontmatter", {}).get("scenario", "")
if scenario and scenario.lower() in query.lower():
score += 20
matched.append(scenario.lower())
for token in tokens:
token_hit = False
if token in title_text:
score += 12
token_hit = True
elif token in summary_text:
score += 7
token_hit = True
elif token in path_text:
score += 6
token_hit = True
elif token in frontmatter_text:
score += 5
token_hit = True
elif token in tags_text:
score += 4
token_hit = True
elif token in body_text:
score += 1
token_hit = True
if token_hit and token not in matched:
matched.append(token)
return score, matched[:8]
def load_docs(vault_root: str | Path) -> list[dict[str, Any]]:
vault_root = Path(vault_root)
docs: list[dict[str, Any]] = []
for path in sorted(vault_root.rglob("*.md")):
rel = path.relative_to(vault_root)
if any(part in SKIP_DIRS for part in rel.parts):
continue
if path.name in SKIP_FILES:
continue
text = path.read_text(encoding="utf-8")
frontmatter, body = parse_frontmatter(text)
docs.append(
{
"file_name": path.name,
"relative_path": str(rel),
"absolute_path": str(path),
"category": rel.parts[0] if rel.parts else "",
"directory": str(rel.parent),
"frontmatter": frontmatter,
"title": extract_title(body, path.stem),
"summary": extract_section_text(body, "告警摘要") or extract_section_text(body, "Summary"),
"tags": extract_tags(body),
"body": body,
}
)
return docs
def main() -> None:
parser = argparse.ArgumentParser(description="Search Obsidian SOC notes and return matching document references.")
parser.add_argument("--query", required=True, help="Search query")
parser.add_argument("--vault-root", default=DEFAULT_VAULT_ROOT, help="Obsidian vault root")
parser.add_argument("--limit", type=int, default=5, help="Maximum results")
parser.add_argument("--scenario", default="", help="Optional scenario filter")
args = parser.parse_args()
docs = load_docs(args.vault_root)
tokens = tokenize(args.query)
results: list[dict[str, Any]] = []
for doc in docs:
scenario = doc.get("frontmatter", {}).get("scenario", "")
if args.scenario and scenario != args.scenario:
continue
score, matched_terms = score_doc(args.query, tokens, doc)
if score <= 0:
continue
results.append(
{
"score": score,
"title": doc["title"],
"file_name": doc["file_name"],
"relative_path": doc["relative_path"],
"directory": doc["directory"],
"category": doc["category"],
"scenario": scenario,
"summary": doc.get("summary", ""),
"tags": doc.get("tags", []),
"matched_terms": matched_terms,
}
)
results.sort(key=lambda item: item["score"], reverse=True)
payload = {
"query": args.query,
"vault_root": str(Path(args.vault_root)),
"matched_docs": results[: args.limit],
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@ -1,282 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_GATEWAY_URL = os.environ.get("SOC_MEMORY_GATEWAY_URL", "http://127.0.0.1:1934")
DEFAULT_GATEWAY_API_KEY = os.environ.get("SOC_MEMORY_GATEWAY_API_KEY", "")
DEFAULT_POC_ROOT = os.environ.get("SOC_MEMORY_POC_ROOT", "/home/tom/soc_memory_poc")
DEFAULT_VAULT_ROOT = str(Path(DEFAULT_POC_ROOT) / "obsidian-vault")
CASE_URI = "viking://resources/soc-memory-poc/case"
KNOWLEDGE_URI = "viking://resources/soc-memory-poc/knowledge"
def post_json(url: str, payload: dict[str, Any], api_key: str = "") -> dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if api_key:
req.add_header("X-API-Key", api_key)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
def canonicalize_uri(uri: str) -> str:
if ".json/" in uri:
return uri.split(".json/", 1)[0] + ".json"
return uri
def filter_results(results: list[dict[str, Any]], prefix: str) -> list[dict[str, Any]]:
deduped: dict[str, dict[str, Any]] = {}
for item in results:
uri = item.get("uri") or ""
canonical = canonicalize_uri(uri)
if not canonical.startswith(prefix):
continue
score = item.get("score") or 0
payload = dict(item)
payload["uri"] = canonical
if canonical not in deduped or score > (deduped[canonical].get("score") or 0):
deduped[canonical] = payload
return sorted(deduped.values(), key=lambda entry: entry.get("score") or 0, reverse=True)
def gateway_search(query: str, uri: str, limit: int, gateway_url: str, api_key: str) -> list[dict[str, Any]]:
payload = {"query": query, "limit": max(limit * 5, 10), "uri": uri}
raw = post_json(gateway_url.rstrip("/") + "/api/search", payload, api_key=api_key)
return filter_results(raw.get("results", []), uri)[:limit]
def obsidian_search(query: str, scenario: str, limit: int, vault_root: str) -> dict[str, Any]:
from search_obsidian_docs import load_docs, score_doc, tokenize
docs = load_docs(vault_root)
tokens = tokenize(query)
results: list[dict[str, Any]] = []
for doc in docs:
doc_scenario = doc.get("frontmatter", {}).get("scenario", "")
if scenario and doc_scenario != scenario:
continue
score, matched_terms = score_doc(query, tokens, doc)
if score <= 0:
continue
results.append(
{
"score": score,
"title": doc["title"],
"file_name": doc["file_name"],
"relative_path": doc["relative_path"],
"directory": doc["directory"],
"absolute_path": str(Path(vault_root) / doc["relative_path"]),
"summary": doc.get("summary", ""),
"matched_terms": matched_terms,
}
)
results.sort(key=lambda item: item["score"], reverse=True)
return {"matched_docs": results[:limit]}
def build_query(args: argparse.Namespace) -> str:
parts = [
args.scenario,
args.alert_type,
args.user,
args.host,
args.sender,
args.subject,
args.attachment,
args.url,
args.ip,
args.summary,
]
parts.extend(args.fact)
return " ".join(part.strip() for part in parts if part and part.strip())
def bullet(lines: list[str], fallback: str) -> str:
if not lines:
return f"- {fallback}"
return "\n".join(f"- {line}" for line in lines)
def top_results(items: list[dict[str, Any]], limit: int = 3) -> list[dict[str, Any]]:
return items[:limit]
def has_fact(args: argparse.Namespace, needle: str) -> bool:
haystacks = [args.summary, args.subject, args.alert_type, *args.fact]
lowered = needle.lower()
return any(lowered in (item or "").lower() for item in haystacks)
def summarize_evidence(args: argparse.Namespace) -> list[str]:
evidence: list[str] = []
if args.subject:
evidence.append(f"邮件主题/诱饵:{args.subject}")
if args.attachment:
evidence.append(f"恶意附件:{args.attachment}")
if args.url:
evidence.append(f"可疑链接:{args.url}")
if args.sender:
evidence.append(f"发件人:{args.sender}")
if args.ip:
evidence.append(f"相关 IP{args.ip}")
for fact in args.fact[:4]:
evidence.append(fact)
return evidence[:6]
def uri_to_id(uri: str) -> str:
return uri.rsplit('/', 1)[-1].replace('.json', '')
def infer_assessment(args: argparse.Namespace, case_results: list[dict[str, Any]]) -> str:
top_case = case_results[0] if case_results else None
if args.scenario == "phishing":
if args.url and args.attachment and (has_fact(args, "dmarc failed") or has_fact(args, "clicked")):
base = "当前告警高度符合凭证收割型钓鱼攻击特征,属于高可信 True Positive且存在凭证泄露风险。"
elif args.url or args.attachment:
base = "当前告警具备明显钓鱼迹象,尤其是附件与落地页组合,倾向于高风险钓鱼事件。"
else:
base = "当前告警呈现出邮件钓鱼模式,但仍需补充落地页、附件和用户交互证据进一步确认。"
elif args.scenario == "o365_suspicious_login":
if has_fact(args, "impossible travel") and (has_fact(args, "mfa fatigue") or has_fact(args, "inbox rule") or has_fact(args, "oauth")):
base = "当前告警高度符合 O365 账号接管链路,属于高可信身份威胁事件。"
else:
base = "当前告警表现为异常身份登录需要结合登录轨迹、MFA 和邮箱规则进一步确认是否账号接管。"
else:
base = "当前告警具备明显的可疑特征,需要结合历史案例和关联知识继续判断。"
if top_case:
return base + f" 最相近的历史案例为 `{uri_to_id(top_case.get('uri', ''))}`,说明当前 case 与既有攻击模式存在明显重合。"
return base
def format_memory_results(case_results: list[dict[str, Any]], knowledge_results: list[dict[str, Any]]) -> str:
lines: list[str] = []
for item in top_results(case_results, 2):
uri = item.get("uri", "")
abstract = (item.get("abstract") or "").strip()
snippet = abstract[:140] + "..." if len(abstract) > 140 else abstract
lines.append(f"`{uri_to_id(uri)}`{uri})— {snippet}")
for item in top_results(knowledge_results, 2):
uri = item.get("uri", "")
abstract = (item.get("abstract") or "").strip()
snippet = abstract[:140] + "..." if len(abstract) > 140 else abstract
lines.append(f"`{uri_to_id(uri)}`{uri})— {snippet}")
return bullet(lines, "未检索到直接关联的 Memory 条目")
def format_obsidian_results(obsidian_docs: list[dict[str, Any]]) -> str:
lines = []
for doc in top_results(obsidian_docs, 3):
reason = doc.get("summary") or ", ".join(doc.get("matched_terms", [])) or "与当前场景相关"
lines.append(
f"`{doc['file_name']}` — `obsidian-vault/{doc['relative_path']}` "
f"absolute: `{doc['absolute_path']}`)— {reason}"
)
return bullet(lines, "未找到直接关联的 Obsidian 文档")
def recommend_actions(args: argparse.Namespace, case_results: list[dict[str, Any]]) -> list[str]:
actions: list[str] = []
if args.scenario == "phishing":
actions.extend([
"检查用户是否已点击链接或提交凭据,必要时立即重置账号并撤销会话。",
"搜索同主题、同发件人、同 URL 或同附件的邮件是否已投递给其他用户。",
"封锁相关域名、URL 和可疑 IP并保留附件样本用于沙箱分析。",
"如邮件面向财务或高价值角色,优先排查是否存在 BEC 或后续横向利用。",
])
elif args.scenario == "o365_suspicious_login":
actions.extend([
"复核登录日志、MFA 记录和后续邮箱规则 / OAuth 变更。",
"若确认账号接管迹象,立即重置凭据并撤销所有活跃会话。",
"检查同源 IP、同设备指纹和同时间窗口内的其他用户活动。",
"对邮箱转发、隐藏规则、恶意 OAuth 授权进行专项排查。",
])
else:
actions.append("基于当前高风险迹象继续扩充调查和处置。")
if case_results:
actions.append("对照最相近历史案例,复用已有 IOC 和调查路径。")
return actions[:5]
def main() -> None:
parser = argparse.ArgumentParser(description="Run a structured SOC triage using memory retrieval and Obsidian lookup.")
parser.add_argument("--scenario", required=True, help="Scenario, e.g. phishing or o365_suspicious_login")
parser.add_argument("--alert-type", default="", help="Alert type")
parser.add_argument("--user", default="", help="Target user")
parser.add_argument("--host", default="", help="Target host")
parser.add_argument("--sender", default="", help="Sender email")
parser.add_argument("--subject", default="", help="Email subject or short title")
parser.add_argument("--attachment", default="", help="Attachment name")
parser.add_argument("--url", default="", help="Suspicious URL")
parser.add_argument("--ip", default="", help="Relevant IP")
parser.add_argument("--summary", default="", help="One-sentence alert summary")
parser.add_argument("--fact", action="append", default=[], help="Additional known fact; repeatable")
parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL, help="Memory Gateway URL")
parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY, help="Memory Gateway API key")
parser.add_argument("--vault-root", default=DEFAULT_VAULT_ROOT, help="Obsidian vault root")
parser.add_argument("--limit", type=int, default=5, help="Search limit")
args = parser.parse_args()
query = build_query(args)
case_results: list[dict[str, Any]] = []
knowledge_results: list[dict[str, Any]] = []
obsidian_docs: list[dict[str, Any]] = []
memory_error = ""
obsidian_error = ""
try:
case_results = gateway_search(query, CASE_URI, args.limit, args.gateway_url, args.api_key)
knowledge_results = gateway_search(query, KNOWLEDGE_URI, args.limit, args.gateway_url, args.api_key)
except urllib.error.URLError as exc:
memory_error = f"Memory Gateway 不可用:{exc}"
try:
obsidian_resp = obsidian_search(query, args.scenario, args.limit, args.vault_root)
obsidian_docs = obsidian_resp.get("matched_docs", [])
except Exception as exc: # noqa: BLE001
obsidian_error = f"Obsidian 检索失败:{exc}"
lines = [
"## 研判结果",
infer_assessment(args, case_results),
"",
"## 关键证据",
bullet(summarize_evidence(args), "当前输入只提供了有限证据,需要继续补充调查信息"),
"",
"## 关联 Memory Retrieval",
]
if memory_error:
lines.append(f"- {memory_error}")
else:
lines.append(format_memory_results(case_results, knowledge_results))
lines.extend([
"",
"## 关联 Obsidian 文档",
])
if obsidian_error:
lines.append(f"- {obsidian_error}")
else:
lines.append(format_obsidian_results(obsidian_docs))
lines.extend([
"",
"## 建议动作",
bullet(recommend_actions(args, case_results), "继续补充告警细节后再执行更精确的响应动作"),
])
print("\n".join(lines))
if __name__ == "__main__":
main()

View File

@ -1,201 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import re
import subprocess
import sys
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
TRIAGE_ALERT = SCRIPT_DIR / "triage_alert.py"
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
URL_RE = re.compile(r"https?://[^\s<>\"]+")
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
HOST_RE = re.compile(r"\b[A-Z]{2,}(?:-[A-Z0-9]+)+\b")
ATTACHMENT_RE = re.compile(r"\b[\w.-]+\.(?:html|htm|pdf|zip|docx|xlsx|eml)\b", re.IGNORECASE)
HEADER_RE = re.compile(
r"^(From|To|Subject|Attachment|URL|IP|Host|User|Alert type|Scenario)\s*:\s*(.+)$",
re.IGNORECASE | re.MULTILINE,
)
def first_nonempty(*values: str) -> str:
for value in values:
if value and value.strip():
return value.strip()
return ""
def load_text(args: argparse.Namespace) -> str:
if args.file:
return Path(args.file).read_text(encoding="utf-8")
if args.text:
return args.text
data = sys.stdin.read()
if data.strip():
return data
return ""
def find_header(text: str, name: str) -> str:
for key, value in HEADER_RE.findall(text):
if key.lower() == name.lower():
return value.strip()
return ""
def unique_matches(pattern: re.Pattern[str], text: str) -> list[str]:
seen: list[str] = []
for match in pattern.findall(text):
if match not in seen:
seen.append(match)
return seen
def infer_scenario(text: str, explicit_scenario: str = "", explicit_alert_type: str = "") -> tuple[str, str]:
if explicit_scenario:
return explicit_scenario, explicit_alert_type
lowered = text.lower()
if any(token in lowered for token in ["impossible travel", "mfa fatigue", "oauth consent", "inbox rule", "entra", "azuread", "sign-in", "signin"]):
alert_type = explicit_alert_type or ("azuread_impossible_travel" if "impossible travel" in lowered else "o365_suspicious_login")
return "o365_suspicious_login", alert_type
if any(token in lowered for token in ["phishing", "invoice", "attachment", "credential harvest", "fake microsoft 365", "dmarc", "mail_suspicious", "wire transfer"]):
if explicit_alert_type:
return "phishing", explicit_alert_type
if "wire transfer" in lowered or "executive impersonation" in lowered or "bec" in lowered:
return "phishing", "mail_bec_impersonation"
if "link" in lowered and "attachment" not in lowered:
return "phishing", "mail_suspicious_link"
return "phishing", "mail_suspicious_attachment"
return "phishing", explicit_alert_type
def collect_facts(text: str, provided: list[str]) -> list[str]:
facts: list[str] = []
for fact in provided:
if fact and fact not in facts:
facts.append(fact)
lowered = text.lower()
fact_patterns = [
("DMARC failed", ["dmarc failed"]),
("SPF failed", ["spf failed"]),
("User may have clicked the link", ["clicked", "user clicked"]),
("Credential submission suspected", ["submitted credentials", "credential submission", "entered credentials"]),
("Impossible travel observed", ["impossible travel"]),
("MFA fatigue observed", ["mfa fatigue", "repeated mfa"]),
("Inbox rule creation observed", ["inbox rule"]),
("OAuth consent activity observed", ["oauth consent"]),
]
for label, needles in fact_patterns:
if any(needle in lowered for needle in needles) and label not in facts:
facts.append(label)
for line in text.splitlines():
stripped = line.strip("-* \t")
if not stripped or len(stripped) > 160:
continue
lower = stripped.lower()
if any(word in lower for word in ["dmarc", "spf", "clicked", "credential", "impossible travel", "mfa", "inbox rule", "oauth"]):
if stripped not in facts:
facts.append(stripped)
return facts[:8]
def build_summary(text: str, subject: str, provided_summary: str = "") -> str:
if provided_summary:
return provided_summary[:240]
if subject:
return subject[:180]
for line in text.splitlines():
stripped = line.strip()
if len(stripped) >= 20 and ":" not in stripped[:20]:
return stripped[:240]
return text.strip()[:240]
def parse_input(args: argparse.Namespace) -> dict[str, str | list[str]]:
text = load_text(args)
scenario, alert_type = infer_scenario(text, args.scenario, args.alert_type)
emails = unique_matches(EMAIL_RE, text)
urls = unique_matches(URL_RE, text)
ips = unique_matches(IP_RE, text)
hosts = unique_matches(HOST_RE, text)
attachments = unique_matches(ATTACHMENT_RE, text)
sender = first_nonempty(args.sender, find_header(text, "From"), emails[0] if emails else "")
user = first_nonempty(args.user, find_header(text, "User"), find_header(text, "To"), emails[1] if len(emails) > 1 else "")
subject = first_nonempty(args.subject, find_header(text, "Subject"))
attachment = first_nonempty(args.attachment, find_header(text, "Attachment"), attachments[0] if attachments else "")
url = first_nonempty(args.url, find_header(text, "URL"), urls[0] if urls else "")
ip = first_nonempty(args.ip, find_header(text, "IP"), ips[0] if ips else "")
host = first_nonempty(args.host, find_header(text, "Host"), hosts[0] if hosts else "")
summary = build_summary(text, subject, args.summary)
facts = collect_facts(text, args.fact)
return {
"scenario": scenario,
"alert_type": alert_type,
"user": user,
"host": host,
"sender": sender,
"subject": subject,
"attachment": attachment,
"url": url,
"ip": ip,
"summary": summary,
"facts": facts,
}
def run_triage(parsed: dict[str, str | list[str]], limit: int) -> None:
cmd = [
sys.executable,
str(TRIAGE_ALERT),
"--scenario", str(parsed["scenario"]),
"--alert-type", str(parsed["alert_type"]),
"--user", str(parsed["user"]),
"--host", str(parsed["host"]),
"--sender", str(parsed["sender"]),
"--subject", str(parsed["subject"]),
"--attachment", str(parsed["attachment"]),
"--url", str(parsed["url"]),
"--ip", str(parsed["ip"]),
"--summary", str(parsed["summary"]),
"--limit", str(limit),
]
for fact in parsed["facts"]:
cmd.extend(["--fact", str(fact)])
subprocess.run(cmd, check=True, env=os.environ.copy())
def main() -> None:
parser = argparse.ArgumentParser(description="Unified SOC alert/email triage entrypoint with memory and Obsidian retrieval.")
parser.add_argument("--text", help="Raw email, ticket text, or freeform alert text")
parser.add_argument("--file", help="Path to a raw email/ticket/alert text file")
parser.add_argument("--scenario", default="", help="Optional scenario override")
parser.add_argument("--alert-type", default="", help="Optional alert type override")
parser.add_argument("--user", default="", help="Optional user override")
parser.add_argument("--host", default="", help="Optional host override")
parser.add_argument("--sender", default="", help="Optional sender override")
parser.add_argument("--subject", default="", help="Optional subject override")
parser.add_argument("--attachment", default="", help="Optional attachment override")
parser.add_argument("--url", default="", help="Optional URL override")
parser.add_argument("--ip", default="", help="Optional IP override")
parser.add_argument("--summary", default="", help="Optional summary override")
parser.add_argument("--fact", action="append", default=[], help="Additional known fact; repeatable")
parser.add_argument("--limit", type=int, default=5, help="Search limit")
args = parser.parse_args()
parsed = parse_input(args)
run_triage(parsed, args.limit)
if __name__ == "__main__":
main()

View File

@ -1,13 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import os
import subprocess
import sys
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
TRIAGE_EMAIL = SCRIPT_DIR / "triage_email.py"
if __name__ == "__main__":
subprocess.run([sys.executable, str(TRIAGE_EMAIL), *sys.argv[1:]], check=True, env=os.environ.copy())