Files
compounding-intelligence/scripts/conference_summarizer.py
Timmy Time 181d4129ea
Some checks failed
Test / pytest (pull_request) Failing after 9s
feat(knowledge): add Conference Talk Summarizer
Issue #138 — 7.6: Conference Talk Summarizer.

Adds a complete pipeline for ingesting conference talk transcripts
into the compounding-intelligence knowledge store.

### New files

- scripts/conference_summarizer.py
  - Reads plain-text transcript files
  - Calls LLM (mimo-v2-pro default) to extract knowledge items
  - Deduplicates against existing store
  - Assigns IDs following {domain}:{category}:{NNN} schema
  - Writes to knowledge/index.json and knowledge/conferences/talks.md
  - Supports --dry-run, --domain, --conference tags

- templates/conference-summary-prompt.md
  - Specialized prompt for conference talk knowledge extraction
  - Mirrors harvester prompt structure but tuned for talk context
  - Categories: fact, pitfall, pattern, tool-quirk, question
  - Evidence required per item
  - Domain tagging (global|repo|agent|compounding-intelligence)

### Acceptance criteria

-  Finds talk transcripts — accepts any plain-text transcript file
-  Generates summary — LLM produces structured knowledge items
-  Extracts key takeaways — fact/pattern/pitfall/tool-quirk/question
-  Stores in knowledge base — writes to index.json + conferences/talks.md
-  Weekly — script can be scheduled via cron (usage example in doc)

### Usage example

  python3 scripts/conference_summarizer.py \
    --transcript ~/Downloads/ai拂晓-2026-04-10.txt \
    --conference "AI拂晓 2026" \
    --title "Scaling Autonomous Agents" \
    --speaker "Alexander" \
    --domain global \
    --dry-run

Run without --dry-run to actually write to knowledge store.
API key resolved from HARVESTER_API_KEY or ~/.config/nous/key etc.

Closes #138
2026-04-26 07:18:26 -04:00

284 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""
conference_summarizer.py — Extract knowledge from conference talk transcripts.
Reads a plain-text transcript and uses LLM to extract durable knowledge items.
Integrates with the knowledge store (index.json + knowledge/conferences/talks.md).
Usage:
python3 conference_summarizer.py --transcript talk.txt --conference "AI拂晓" --domain global
python3 conference_summarizer.py --transcript talk.txt --domain the-nexus # talk about that repo
python3 conference_summarizer.py --transcript talk.txt --dry-run
Refs: Issue #138 — 7.6: Conference Talk Summarizer
"""
import argparse
import hashlib
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_API_BASE = "https://api.nousresearch.com/v1"
DEFAULT_API_KEY = ""
DEFAULT_MODEL = "xiaomi/mimo-v2-pro"
API_KEY_PATHS = [
Path.home() / ".config/nous/key",
Path.home() / ".hermes/keymaxxing/active/minimax.key",
Path.home() / ".config/openrouter/key",
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if path.exists():
return path.read_text().strip()
return ""
def load_prompt() -> str:
path = SCRIPT_DIR.parent / "templates" / "conference-summary-prompt.md"
if not path.exists():
print(f"ERROR: Prompt not found at {path}", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding="utf-8")
def truncate_for_context(text: str, head: int = 120, tail: int = 120) -> str:
lines = text.splitlines()
if len(lines) <= head + tail:
return text
return (
"\n".join(lines[:head])
+ "\n\n... [truncated] ...\n\n"
+ "\n".join(lines[-tail:])
)
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str):
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Transcript:\n\n{truncate_for_context(transcript)}"},
]
payload = json.dumps(
{"model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096}
).encode("utf-8")
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"].strip()
# Strip code fences
if content.startswith("```"):
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return json.loads(content)
except Exception as e:
print(f"ERROR: LLM extraction failed: {e}", file=sys.stderr)
return None
def load_index(knowledge_dir: Path) -> dict:
index_path = knowledge_dir / "index.json"
if index_path.exists():
with open(index_path) as f:
return json.load(f)
return {"version": 1, "total_facts": 0, "facts": []}
def content_hash(text: str) -> str:
normalized = " ".join(text.lower().strip().split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def compute_next_sequence(existing_facts: list[dict], domain: str, category: str) -> int:
"""Compute next sequence number for (domain, category) based on existing IDs."""
max_seq = 0
for f in existing_facts:
fid = f.get("id", "")
parts = fid.split(":")
if len(parts) == 3 and parts[0] == domain and parts[1] == category:
try:
seq = int(parts[2])
max_seq = max(max_seq, seq)
except ValueError:
pass
return max_seq + 1
def deduplicate(new_facts: list[dict], existing: list[dict]) -> list[dict]:
"""Exact-deduplicate by content hash; near-dedup by token overlap."""
existing_hashes = {content_hash(f["fact"]): f for f in existing}
existing_texts = [f["fact"].lower() for f in existing]
unique = []
for fact in new_facts:
text = fact.get("fact", "")
h = content_hash(text)
if h in existing_hashes:
continue
# Near-dedup: token Jaccard >= 0.8
tokens = set(text.lower().split())
for ex in existing_texts:
ex_tokens = set(ex.split())
if tokens and ex_tokens:
inter = len(tokens & ex_tokens)
union = len(tokens | ex_tokens)
if inter / union >= 0.8:
break
else:
unique.append(fact)
return unique
def validate_fact(fact: dict) -> bool:
required = ["fact", "category", "domain", "confidence"]
for field in required:
if field not in fact:
return False
if not isinstance(fact["fact"], str) or not fact["fact"].strip():
return False
if fact["category"] not in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
return False
c = fact.get("confidence", 0)
return isinstance(c, (int, float)) and 0.0 <= c <= 1.0
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: Path):
kdir = knowledge_dir
kdir.mkdir(parents=True, exist_ok=True)
for fact in new_facts:
fact["harvested_at"] = datetime.now(timezone.utc).isoformat()
fact["source"] = "conference-talk"
index["facts"].extend(new_facts)
index["total_facts"] = len(index["facts"])
index["last_updated"] = datetime.now(timezone.utc).isoformat()
# index.json
with open(kdir / "index.json", "w", encoding="utf-8") as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# conferences/talks.md (human-readable)
conf_dir = kdir / "conferences"
conf_dir.mkdir(parents=True, exist_ok=True)
conf_md = conf_dir / "talks.md"
mode = "a" if conf_md.exists() else "w"
with open(conf_md, mode, encoding="utf-8") as f:
if mode == "w":
f.write("# Conference Talk Knowledge\n\n")
f.write(
f"## {datetime.now(timezone.utc).strftime('%Y-%m-%d')}{len(new_facts)} items\n\n"
)
for fact in new_facts:
icon = {"fact": "📋", "pitfall": "⚠️", "pattern": "🔄", "tool-quirk": "🔧", "question": ""}.get(fact["category"], "")
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
if fact.get("evidence"):
f.write(f" _Evidence: {fact['evidence']}_\n")
f.write("\n")
def main():
parser = argparse.ArgumentParser(description="Summarize conference talks into knowledge store")
parser.add_argument("--transcript", required=True, help="Path to transcript text file")
parser.add_argument("--conference", default="unknown", help="Conference name")
parser.add_argument("--title", default="", help="Talk title")
parser.add_argument("--speaker", default="", help="Speaker name(s)")
parser.add_argument("--talk-url", default="", help="URL to talk/video")
parser.add_argument("--domain", default="global", help="Domain: global or repo/agent name")
parser.add_argument("--knowledge-dir", default=str(KNOWLEDGE_DIR), help="Knowledge store directory")
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="LLM API base URL")
parser.add_argument("--api-key", default="", help="LLM API key")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold")
args = parser.parse_args()
transcript_path = Path(args.transcript)
if not transcript_path.exists():
print(f"ERROR: Transcript not found: {transcript_path}", file=sys.stderr)
sys.exit(1)
transcript = transcript_path.read_text(encoding="utf-8", errors="replace")
if not transcript.strip():
print("ERROR: Transcript is empty", file=sys.stderr)
sys.exit(1)
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key. Set HARVESTER_API_KEY or pass --api-key", file=sys.stderr)
sys.exit(1)
prompt = load_prompt()
print(f"Summarizing '{transcript_path.name}' domain={args.domain} conf={args.conference}")
start = time.time()
extracted = call_llm(prompt, transcript, args.api_base, api_key, args.model)
if extracted is None:
print("ERROR: LLM extraction failed", file=sys.stderr)
sys.exit(1)
raw_items = extracted.get("knowledge", [])
print(f" Raw items: {len(raw_items)}")
valid = [f for f in raw_items if validate_fact(f) and f.get("confidence", 0) >= args.min_confidence]
print(f" Valid: {len(valid)}")
if not valid:
print("WARNING: No valid items extracted", file=sys.stderr)
sys.exit(1)
kdir = Path(args.knowledge_dir)
index = load_index(kdir)
existing_facts = index.get("facts", [])
new_facts = deduplicate(valid, existing_facts)
print(f" New (non-duplicate): {len(new_facts)}")
if not new_facts:
print("All items duplicated — nothing to write.")
sys.exit(0)
# Assign IDs per (domain, category) sequence
seq_counters = {}
# Count existing for this domain
for f in existing_facts:
if f.get("domain") == args.domain:
cat = f.get("category", "fact")
key = (args.domain, cat)
seq_counters[key] = seq_counters.get(key, 0) + 1
# Now next sequence for each category in new_facts
for fact in new_facts:
cat = fact["category"]
key = (args.domain, cat)
next_seq = seq_counters.get(key, 0) + 1
seq_counters[key] = next_seq
fact["id"] = f"{args.domain}:{cat}:{next_seq:03d}"
fact["domain"] = args.domain
fact.setdefault("tags", []).extend([args.conference, "conference-talk"])
fact["first_seen"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
fact["last_confirmed"] = fact["first_seen"]
fact["source_count"] = 1
fact["talk_meta"] = extracted.get("meta", {})
if args.dry_run:
print("DRY RUN — items that would be added:")
for f in new_facts:
print(f" [{f['category']}] {f['fact'][:90]}")
sys.exit(0)
write_knowledge(index, new_facts, kdir)
print(f"✓ Stored {len(new_facts)} items to knowledge store in {time.time() - start:.1f}s")
if __name__ == "__main__":
main()