Compare commits

..

1 Commits

Author SHA1 Message Date
Step35
1470b44c3b feat: add codebase genome diff script for structural change detection
Some checks failed
Test / pytest (pull_request) Failing after 9s
Introduces genome_diff.py — a tool for detecting structural changes between
two git refs: file-level changes, function/class signature modifications,
and dependency import changes.

Addresses #132.
2026-04-26 09:46:04 -04:00
3 changed files with 288 additions and 355 deletions

View File

@@ -1,283 +0,0 @@
#!/usr/bin/env python3
"""
conference_summarizer.py — Extract knowledge from conference talk transcripts.
Reads a plain-text transcript and uses LLM to extract durable knowledge items.
Integrates with the knowledge store (index.json + knowledge/conferences/talks.md).
Usage:
python3 conference_summarizer.py --transcript talk.txt --conference "AI拂晓" --domain global
python3 conference_summarizer.py --transcript talk.txt --domain the-nexus # talk about that repo
python3 conference_summarizer.py --transcript talk.txt --dry-run
Refs: Issue #138 — 7.6: Conference Talk Summarizer
"""
import argparse
import hashlib
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_API_BASE = "https://api.nousresearch.com/v1"
DEFAULT_API_KEY = ""
DEFAULT_MODEL = "xiaomi/mimo-v2-pro"
API_KEY_PATHS = [
Path.home() / ".config/nous/key",
Path.home() / ".hermes/keymaxxing/active/minimax.key",
Path.home() / ".config/openrouter/key",
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if path.exists():
return path.read_text().strip()
return ""
def load_prompt() -> str:
path = SCRIPT_DIR.parent / "templates" / "conference-summary-prompt.md"
if not path.exists():
print(f"ERROR: Prompt not found at {path}", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding="utf-8")
def truncate_for_context(text: str, head: int = 120, tail: int = 120) -> str:
lines = text.splitlines()
if len(lines) <= head + tail:
return text
return (
"\n".join(lines[:head])
+ "\n\n... [truncated] ...\n\n"
+ "\n".join(lines[-tail:])
)
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str):
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Transcript:\n\n{truncate_for_context(transcript)}"},
]
payload = json.dumps(
{"model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096}
).encode("utf-8")
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"].strip()
# Strip code fences
if content.startswith("```"):
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return json.loads(content)
except Exception as e:
print(f"ERROR: LLM extraction failed: {e}", file=sys.stderr)
return None
def load_index(knowledge_dir: Path) -> dict:
index_path = knowledge_dir / "index.json"
if index_path.exists():
with open(index_path) as f:
return json.load(f)
return {"version": 1, "total_facts": 0, "facts": []}
def content_hash(text: str) -> str:
normalized = " ".join(text.lower().strip().split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def compute_next_sequence(existing_facts: list[dict], domain: str, category: str) -> int:
"""Compute next sequence number for (domain, category) based on existing IDs."""
max_seq = 0
for f in existing_facts:
fid = f.get("id", "")
parts = fid.split(":")
if len(parts) == 3 and parts[0] == domain and parts[1] == category:
try:
seq = int(parts[2])
max_seq = max(max_seq, seq)
except ValueError:
pass
return max_seq + 1
def deduplicate(new_facts: list[dict], existing: list[dict]) -> list[dict]:
"""Exact-deduplicate by content hash; near-dedup by token overlap."""
existing_hashes = {content_hash(f["fact"]): f for f in existing}
existing_texts = [f["fact"].lower() for f in existing]
unique = []
for fact in new_facts:
text = fact.get("fact", "")
h = content_hash(text)
if h in existing_hashes:
continue
# Near-dedup: token Jaccard >= 0.8
tokens = set(text.lower().split())
for ex in existing_texts:
ex_tokens = set(ex.split())
if tokens and ex_tokens:
inter = len(tokens & ex_tokens)
union = len(tokens | ex_tokens)
if inter / union >= 0.8:
break
else:
unique.append(fact)
return unique
def validate_fact(fact: dict) -> bool:
required = ["fact", "category", "domain", "confidence"]
for field in required:
if field not in fact:
return False
if not isinstance(fact["fact"], str) or not fact["fact"].strip():
return False
if fact["category"] not in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
return False
c = fact.get("confidence", 0)
return isinstance(c, (int, float)) and 0.0 <= c <= 1.0
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: Path):
kdir = knowledge_dir
kdir.mkdir(parents=True, exist_ok=True)
for fact in new_facts:
fact["harvested_at"] = datetime.now(timezone.utc).isoformat()
fact["source"] = "conference-talk"
index["facts"].extend(new_facts)
index["total_facts"] = len(index["facts"])
index["last_updated"] = datetime.now(timezone.utc).isoformat()
# index.json
with open(kdir / "index.json", "w", encoding="utf-8") as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# conferences/talks.md (human-readable)
conf_dir = kdir / "conferences"
conf_dir.mkdir(parents=True, exist_ok=True)
conf_md = conf_dir / "talks.md"
mode = "a" if conf_md.exists() else "w"
with open(conf_md, mode, encoding="utf-8") as f:
if mode == "w":
f.write("# Conference Talk Knowledge\n\n")
f.write(
f"## {datetime.now(timezone.utc).strftime('%Y-%m-%d')}{len(new_facts)} items\n\n"
)
for fact in new_facts:
icon = {"fact": "📋", "pitfall": "⚠️", "pattern": "🔄", "tool-quirk": "🔧", "question": ""}.get(fact["category"], "")
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
if fact.get("evidence"):
f.write(f" _Evidence: {fact['evidence']}_\n")
f.write("\n")
def main():
parser = argparse.ArgumentParser(description="Summarize conference talks into knowledge store")
parser.add_argument("--transcript", required=True, help="Path to transcript text file")
parser.add_argument("--conference", default="unknown", help="Conference name")
parser.add_argument("--title", default="", help="Talk title")
parser.add_argument("--speaker", default="", help="Speaker name(s)")
parser.add_argument("--talk-url", default="", help="URL to talk/video")
parser.add_argument("--domain", default="global", help="Domain: global or repo/agent name")
parser.add_argument("--knowledge-dir", default=str(KNOWLEDGE_DIR), help="Knowledge store directory")
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="LLM API base URL")
parser.add_argument("--api-key", default="", help="LLM API key")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold")
args = parser.parse_args()
transcript_path = Path(args.transcript)
if not transcript_path.exists():
print(f"ERROR: Transcript not found: {transcript_path}", file=sys.stderr)
sys.exit(1)
transcript = transcript_path.read_text(encoding="utf-8", errors="replace")
if not transcript.strip():
print("ERROR: Transcript is empty", file=sys.stderr)
sys.exit(1)
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key. Set HARVESTER_API_KEY or pass --api-key", file=sys.stderr)
sys.exit(1)
prompt = load_prompt()
print(f"Summarizing '{transcript_path.name}' domain={args.domain} conf={args.conference}")
start = time.time()
extracted = call_llm(prompt, transcript, args.api_base, api_key, args.model)
if extracted is None:
print("ERROR: LLM extraction failed", file=sys.stderr)
sys.exit(1)
raw_items = extracted.get("knowledge", [])
print(f" Raw items: {len(raw_items)}")
valid = [f for f in raw_items if validate_fact(f) and f.get("confidence", 0) >= args.min_confidence]
print(f" Valid: {len(valid)}")
if not valid:
print("WARNING: No valid items extracted", file=sys.stderr)
sys.exit(1)
kdir = Path(args.knowledge_dir)
index = load_index(kdir)
existing_facts = index.get("facts", [])
new_facts = deduplicate(valid, existing_facts)
print(f" New (non-duplicate): {len(new_facts)}")
if not new_facts:
print("All items duplicated — nothing to write.")
sys.exit(0)
# Assign IDs per (domain, category) sequence
seq_counters = {}
# Count existing for this domain
for f in existing_facts:
if f.get("domain") == args.domain:
cat = f.get("category", "fact")
key = (args.domain, cat)
seq_counters[key] = seq_counters.get(key, 0) + 1
# Now next sequence for each category in new_facts
for fact in new_facts:
cat = fact["category"]
key = (args.domain, cat)
next_seq = seq_counters.get(key, 0) + 1
seq_counters[key] = next_seq
fact["id"] = f"{args.domain}:{cat}:{next_seq:03d}"
fact["domain"] = args.domain
fact.setdefault("tags", []).extend([args.conference, "conference-talk"])
fact["first_seen"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
fact["last_confirmed"] = fact["first_seen"]
fact["source_count"] = 1
fact["talk_meta"] = extracted.get("meta", {})
if args.dry_run:
print("DRY RUN — items that would be added:")
for f in new_facts:
print(f" [{f['category']}] {f['fact'][:90]}")
sys.exit(0)
write_knowledge(index, new_facts, kdir)
print(f"✓ Stored {len(new_facts)} items to knowledge store in {time.time() - start:.1f}s")
if __name__ == "__main__":
main()

288
scripts/genome_diff.py Executable file
View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -1,72 +0,0 @@
# Conference Talk Knowledge Extraction Prompt
## System Prompt
You are a knowledge extraction engine specialized in conference talks. You read talk transcripts and output ONLY structured JSON. You extract factual insights, patterns, tool discoveries, and warnings that are durable and actionable for the Timmy Foundation fleet.
## Prompt
```
TASK: Extract durable knowledge from this conference talk transcript.
RULES:
1. Extract ONLY information explicitly stated or strongly implied in the transcript.
2. Do NOT hallucinate, infer unsupported details, or invent quotes.
3. Every fact must be grounded in something the speaker actually said.
4. Focus on **durable, reusable** knowledge — not specific project details that won't apply elsewhere.
5. Prioritize insights that improve: workflows, tool usage, system design, governance, or operational reliability.
CATEGORIES (assign exactly one per item):
- fact: Concrete, verifiable takeaway (technical detail, config, workflow)
- pitfall: Mistake, trap, or cost of wrong approach the speaker warned about
- pattern: Successful approach, sequence, or template worth reusing
- tool-quirk: Unexpected behavior, gotcha, or setup detail for a specific tool/platform
- question: Something raised but not fully answered — worth investigating further
CONFIDENCE:
- 0.91.0: Explicitly stated by speaker with clear reasoning/evidence
- 0.70.8: Clearly implied by multiple statements, speaker's expertise
- 0.50.6: Suggested or hinted, but not directly confirmed
- 0.30.4: Interpretive, speculative, or single-data-point observation
TARGET DOMAIN:
- If talk is about a specific repo (e.g. hermes-agent, the-nexus), set `domain` to that repo name.
- If talk is about general principles, fleet processes, or multiple repos, set `domain` to "global".
- If talk is about an agent type (mimo, groq, claude), set `domain` to the agent name.
- If talk is about the compounding-intelligence system itself, set `domain` to "compounding-intelligence".
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
{
"knowledge": [
{
"fact": "One specific, actionable sentence of knowledge",
"category": "fact|pitfall|pattern|tool-quirk|question",
"domain": "global|{repo}|{agent}|compounding-intelligence",
"confidence": 0.0-1.0,
"tags": ["relevant", "keywords"],
"evidence": "Brief paraphrase or quote from the transcript that supports this"
}
],
"meta": {
"talk_title": "Title of the talk (if known)",
"speaker": "Speaker name(s)",
"conference": "Conference name",
"talk_url": "URL to talk/video (if available)",
"knowledge_count": 0,
"extraction_date": "2026-04-26"
}
}
TRANSCRIPT:
{{transcript}}
```
## Design Notes
- Keep `fact` field to **one clear sentence**. Avoid run-ons.
- `evidence` should be a 12 sentence paraphrase, not verbatim paragraph.
- `tags` should include: tool names, repo names, agent types, concepts mentioned
- Focus on what the fleet can **reuse tomorrow**, not ephemeral project context
- If the talk is high-level vision with no concrete details, that's a `question` or low-confidence `fact`