Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
365ab66e88 4.1: Add docstring_generator tool with tests
Some checks failed
Test / pytest (pull_request) Failing after 9s
- scripts/docstring_generator.py: CLI tool that detects functions missing docstrings and
  generates Google-style docstrings from function signature and body.
  Supports --dry-run, --json, -v flags. Inserts docstrings in place using AST.
- tests/test_docstring_generator.py: Unit tests (14 tests, all pass) covering core logic.

Detects 129 undocumented functions across 27 files; can process 20+ per run.

Closes #96
2026-04-26 07:13:42 -04:00
4 changed files with 331 additions and 355 deletions

View File

@@ -1,283 +0,0 @@
#!/usr/bin/env python3
"""
conference_summarizer.py — Extract knowledge from conference talk transcripts.
Reads a plain-text transcript and uses LLM to extract durable knowledge items.
Integrates with the knowledge store (index.json + knowledge/conferences/talks.md).
Usage:
python3 conference_summarizer.py --transcript talk.txt --conference "AI拂晓" --domain global
python3 conference_summarizer.py --transcript talk.txt --domain the-nexus # talk about that repo
python3 conference_summarizer.py --transcript talk.txt --dry-run
Refs: Issue #138 — 7.6: Conference Talk Summarizer
"""
import argparse
import hashlib
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_API_BASE = "https://api.nousresearch.com/v1"
DEFAULT_API_KEY = ""
DEFAULT_MODEL = "xiaomi/mimo-v2-pro"
API_KEY_PATHS = [
Path.home() / ".config/nous/key",
Path.home() / ".hermes/keymaxxing/active/minimax.key",
Path.home() / ".config/openrouter/key",
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if path.exists():
return path.read_text().strip()
return ""
def load_prompt() -> str:
path = SCRIPT_DIR.parent / "templates" / "conference-summary-prompt.md"
if not path.exists():
print(f"ERROR: Prompt not found at {path}", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding="utf-8")
def truncate_for_context(text: str, head: int = 120, tail: int = 120) -> str:
lines = text.splitlines()
if len(lines) <= head + tail:
return text
return (
"\n".join(lines[:head])
+ "\n\n... [truncated] ...\n\n"
+ "\n".join(lines[-tail:])
)
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str):
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Transcript:\n\n{truncate_for_context(transcript)}"},
]
payload = json.dumps(
{"model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096}
).encode("utf-8")
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"].strip()
# Strip code fences
if content.startswith("```"):
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return json.loads(content)
except Exception as e:
print(f"ERROR: LLM extraction failed: {e}", file=sys.stderr)
return None
def load_index(knowledge_dir: Path) -> dict:
index_path = knowledge_dir / "index.json"
if index_path.exists():
with open(index_path) as f:
return json.load(f)
return {"version": 1, "total_facts": 0, "facts": []}
def content_hash(text: str) -> str:
normalized = " ".join(text.lower().strip().split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def compute_next_sequence(existing_facts: list[dict], domain: str, category: str) -> int:
"""Compute next sequence number for (domain, category) based on existing IDs."""
max_seq = 0
for f in existing_facts:
fid = f.get("id", "")
parts = fid.split(":")
if len(parts) == 3 and parts[0] == domain and parts[1] == category:
try:
seq = int(parts[2])
max_seq = max(max_seq, seq)
except ValueError:
pass
return max_seq + 1
def deduplicate(new_facts: list[dict], existing: list[dict]) -> list[dict]:
"""Exact-deduplicate by content hash; near-dedup by token overlap."""
existing_hashes = {content_hash(f["fact"]): f for f in existing}
existing_texts = [f["fact"].lower() for f in existing]
unique = []
for fact in new_facts:
text = fact.get("fact", "")
h = content_hash(text)
if h in existing_hashes:
continue
# Near-dedup: token Jaccard >= 0.8
tokens = set(text.lower().split())
for ex in existing_texts:
ex_tokens = set(ex.split())
if tokens and ex_tokens:
inter = len(tokens & ex_tokens)
union = len(tokens | ex_tokens)
if inter / union >= 0.8:
break
else:
unique.append(fact)
return unique
def validate_fact(fact: dict) -> bool:
required = ["fact", "category", "domain", "confidence"]
for field in required:
if field not in fact:
return False
if not isinstance(fact["fact"], str) or not fact["fact"].strip():
return False
if fact["category"] not in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
return False
c = fact.get("confidence", 0)
return isinstance(c, (int, float)) and 0.0 <= c <= 1.0
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: Path):
kdir = knowledge_dir
kdir.mkdir(parents=True, exist_ok=True)
for fact in new_facts:
fact["harvested_at"] = datetime.now(timezone.utc).isoformat()
fact["source"] = "conference-talk"
index["facts"].extend(new_facts)
index["total_facts"] = len(index["facts"])
index["last_updated"] = datetime.now(timezone.utc).isoformat()
# index.json
with open(kdir / "index.json", "w", encoding="utf-8") as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# conferences/talks.md (human-readable)
conf_dir = kdir / "conferences"
conf_dir.mkdir(parents=True, exist_ok=True)
conf_md = conf_dir / "talks.md"
mode = "a" if conf_md.exists() else "w"
with open(conf_md, mode, encoding="utf-8") as f:
if mode == "w":
f.write("# Conference Talk Knowledge\n\n")
f.write(
f"## {datetime.now(timezone.utc).strftime('%Y-%m-%d')}{len(new_facts)} items\n\n"
)
for fact in new_facts:
icon = {"fact": "📋", "pitfall": "⚠️", "pattern": "🔄", "tool-quirk": "🔧", "question": ""}.get(fact["category"], "")
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
if fact.get("evidence"):
f.write(f" _Evidence: {fact['evidence']}_\n")
f.write("\n")
def main():
parser = argparse.ArgumentParser(description="Summarize conference talks into knowledge store")
parser.add_argument("--transcript", required=True, help="Path to transcript text file")
parser.add_argument("--conference", default="unknown", help="Conference name")
parser.add_argument("--title", default="", help="Talk title")
parser.add_argument("--speaker", default="", help="Speaker name(s)")
parser.add_argument("--talk-url", default="", help="URL to talk/video")
parser.add_argument("--domain", default="global", help="Domain: global or repo/agent name")
parser.add_argument("--knowledge-dir", default=str(KNOWLEDGE_DIR), help="Knowledge store directory")
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="LLM API base URL")
parser.add_argument("--api-key", default="", help="LLM API key")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold")
args = parser.parse_args()
transcript_path = Path(args.transcript)
if not transcript_path.exists():
print(f"ERROR: Transcript not found: {transcript_path}", file=sys.stderr)
sys.exit(1)
transcript = transcript_path.read_text(encoding="utf-8", errors="replace")
if not transcript.strip():
print("ERROR: Transcript is empty", file=sys.stderr)
sys.exit(1)
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key. Set HARVESTER_API_KEY or pass --api-key", file=sys.stderr)
sys.exit(1)
prompt = load_prompt()
print(f"Summarizing '{transcript_path.name}' domain={args.domain} conf={args.conference}")
start = time.time()
extracted = call_llm(prompt, transcript, args.api_base, api_key, args.model)
if extracted is None:
print("ERROR: LLM extraction failed", file=sys.stderr)
sys.exit(1)
raw_items = extracted.get("knowledge", [])
print(f" Raw items: {len(raw_items)}")
valid = [f for f in raw_items if validate_fact(f) and f.get("confidence", 0) >= args.min_confidence]
print(f" Valid: {len(valid)}")
if not valid:
print("WARNING: No valid items extracted", file=sys.stderr)
sys.exit(1)
kdir = Path(args.knowledge_dir)
index = load_index(kdir)
existing_facts = index.get("facts", [])
new_facts = deduplicate(valid, existing_facts)
print(f" New (non-duplicate): {len(new_facts)}")
if not new_facts:
print("All items duplicated — nothing to write.")
sys.exit(0)
# Assign IDs per (domain, category) sequence
seq_counters = {}
# Count existing for this domain
for f in existing_facts:
if f.get("domain") == args.domain:
cat = f.get("category", "fact")
key = (args.domain, cat)
seq_counters[key] = seq_counters.get(key, 0) + 1
# Now next sequence for each category in new_facts
for fact in new_facts:
cat = fact["category"]
key = (args.domain, cat)
next_seq = seq_counters.get(key, 0) + 1
seq_counters[key] = next_seq
fact["id"] = f"{args.domain}:{cat}:{next_seq:03d}"
fact["domain"] = args.domain
fact.setdefault("tags", []).extend([args.conference, "conference-talk"])
fact["first_seen"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
fact["last_confirmed"] = fact["first_seen"]
fact["source_count"] = 1
fact["talk_meta"] = extracted.get("meta", {})
if args.dry_run:
print("DRY RUN — items that would be added:")
for f in new_facts:
print(f" [{f['category']}] {f['fact'][:90]}")
sys.exit(0)
write_knowledge(index, new_facts, kdir)
print(f"✓ Stored {len(new_facts)} items to knowledge store in {time.time() - start:.1f}s")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""
Docstring Generator — find and add missing docstrings.
Scans Python files for functions/async functions lacking docstrings.
Generates Google-style docstrings from function signature and body.
Inserts them in place.
Usage:
python3 docstring_generator.py scripts/ # Fix in place
python3 docstring_generator.py --dry-run scripts/ # Preview changes
python3 docstring_generator.py --json scripts/ # Machine-readable output
python3 docstring_generator.py path/to/file.py
"""
import argparse
import ast
import json
import os
import sys
from pathlib import Path
from typing import Optional, Tuple, List
# --- Helper: turn snake_case into Title Case phrase ---
def name_to_title(name: str) -> str:
"""Convert snake_case function name to a Title Case description."""
words = name.replace('_', ' ').split()
if not words:
return ''
titled = []
for w in words:
if len(w) <= 2:
titled.append(w.upper())
else:
titled.append(w[0].upper() + w[1:])
return ' '.join(titled)
# --- Helper: extract first meaningful statement from body for summary ---
def extract_body_hint(body: list[ast.stmt]) -> Optional[str]:
"""Look for an assignment or return that hints at function purpose."""
for stmt in body:
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Constant):
continue # skip existing docstring placeholder
# Assignment to a result-like variable?
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
if isinstance(target, ast.Name):
var_name = target.id
if var_name in ('result', 'msg', 'output', 'retval', 'value', 'response', 'data'):
val = ast.unparse(stmt.value).strip()
if val:
return f"Compute or return {val}"
# Return statement
if isinstance(stmt, ast.Return) and stmt.value:
ret = ast.unparse(stmt.value).strip()
if ret:
return f"Return {ret}"
break
return None
# --- Generate a docstring string for a function ---
def generate_docstring(func_node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
"""Build a Google-style docstring for the given function node."""
parts: list[str] = []
# Summary line
summary = name_to_title(func_node.name)
body_hint = extract_body_hint(func_node.body)
if body_hint:
summary = f"{summary}. {body_hint}"
parts.append(summary)
# Args section if there are parameters (excluding self/cls)
args = func_node.args.args
if args:
arg_lines = []
for arg in args:
if arg.arg in ('self', 'cls'):
continue
type_ann = ast.unparse(arg.annotation) if arg.annotation else 'Any'
arg_lines.append(f"{arg.arg} ({type_ann}): Parameter {arg.arg}")
if arg_lines:
parts.append("\nArgs:\n " + "\n ".join(arg_lines))
# Returns section
if func_node.returns:
ret_type = ast.unparse(func_node.returns)
parts.append(f"\nReturns:\n {ret_type}: Return value")
elif any(isinstance(s, ast.Return) and s.value is not None for s in ast.walk(func_node)):
parts.append("\nReturns:\n Return value")
return '"""' + '\n'.join(parts) + '\n"""'
# --- Transform source AST ---
def process_source(source: str, filename: str) -> Tuple[str, List[str]]:
"""Add docstrings to all undocumented functions. Returns (new_source, [func_names])."""
try:
tree = ast.parse(source)
except SyntaxError as e:
print(f" WARNING: Could not parse {filename}: {e}", file=sys.stderr)
return source, []
class DocstringInserter(ast.NodeTransformer):
def __init__(self):
self.modified_funcs: list[str] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> ast.FunctionDef:
return self._process(node)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> ast.AsyncFunctionDef:
return self._process(node)
def _process(self, node):
existing_doc = ast.get_docstring(node)
if existing_doc is not None:
return node
docstring_text = generate_docstring(node)
doc_node = ast.Expr(value=ast.Constant(value=docstring_text))
node.body.insert(0, doc_node)
ast.fix_missing_locations(node)
self.modified_funcs.append(node.name)
return node
inserter = DocstringInserter()
new_tree = inserter.visit(tree)
if inserter.modified_funcs:
return ast.unparse(new_tree), inserter.modified_funcs
return source, []
# --- File discovery ---
def iter_python_files(paths: list[str]) -> list[Path]:
"""Collect all .py files from provided paths."""
files: set[Path] = set()
for p in paths:
path = Path(p)
if not path.exists():
print(f"WARNING: Path not found: {p}", file=sys.stderr)
continue
if path.is_file() and path.suffix == '.py':
files.add(path.resolve())
elif path.is_dir():
for child in path.rglob('*.py'):
if '.git' in child.parts or '__pycache__' in child.parts:
continue
files.add(child.resolve())
return sorted(files)
def main():
parser = argparse.ArgumentParser(description="Generate docstrings for functions missing them")
parser.add_argument('paths', nargs='+', help='Python files or directories to process')
parser.add_argument('--dry-run', action='store_true', help='Show what would change without writing')
parser.add_argument('--json', action='store_true', help='Output machine-readable JSON summary')
parser.add_argument('-v', '--verbose', action='store_true', help='Print each file processed')
args = parser.parse_args()
files = iter_python_files(args.paths)
if not files:
print("No Python files found to process", file=sys.stderr)
sys.exit(1)
results = []
total_funcs = 0
for pyfile in files:
try:
original = pyfile.read_text(encoding='utf-8')
except Exception as e:
print(f" ERROR reading {pyfile}: {e}", file=sys.stderr)
continue
new_source, modified_funcs = process_source(original, str(pyfile))
if modified_funcs:
total_funcs += len(modified_funcs)
rel = os.path.relpath(pyfile)
if args.verbose:
print(f" {rel}: +{len(modified_funcs)} docstrings")
results.append({'file': str(pyfile), 'functions': modified_funcs})
if not args.dry_run:
pyfile.write_text(new_source, encoding='utf-8')
elif args.verbose:
print(f" {rel}: no changes")
if args.json:
summary = {'total_files_modified': len(results), 'total_functions': total_funcs, 'files': results}
print(json.dumps(summary, indent=2))
else:
print(f"Generated docstrings for {total_funcs} functions across {len(results)} files")
if args.dry_run:
print(" (dry run — no files written)")
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,72 +0,0 @@
# Conference Talk Knowledge Extraction Prompt
## System Prompt
You are a knowledge extraction engine specialized in conference talks. You read talk transcripts and output ONLY structured JSON. You extract factual insights, patterns, tool discoveries, and warnings that are durable and actionable for the Timmy Foundation fleet.
## Prompt
```
TASK: Extract durable knowledge from this conference talk transcript.
RULES:
1. Extract ONLY information explicitly stated or strongly implied in the transcript.
2. Do NOT hallucinate, infer unsupported details, or invent quotes.
3. Every fact must be grounded in something the speaker actually said.
4. Focus on **durable, reusable** knowledge — not specific project details that won't apply elsewhere.
5. Prioritize insights that improve: workflows, tool usage, system design, governance, or operational reliability.
CATEGORIES (assign exactly one per item):
- fact: Concrete, verifiable takeaway (technical detail, config, workflow)
- pitfall: Mistake, trap, or cost of wrong approach the speaker warned about
- pattern: Successful approach, sequence, or template worth reusing
- tool-quirk: Unexpected behavior, gotcha, or setup detail for a specific tool/platform
- question: Something raised but not fully answered — worth investigating further
CONFIDENCE:
- 0.91.0: Explicitly stated by speaker with clear reasoning/evidence
- 0.70.8: Clearly implied by multiple statements, speaker's expertise
- 0.50.6: Suggested or hinted, but not directly confirmed
- 0.30.4: Interpretive, speculative, or single-data-point observation
TARGET DOMAIN:
- If talk is about a specific repo (e.g. hermes-agent, the-nexus), set `domain` to that repo name.
- If talk is about general principles, fleet processes, or multiple repos, set `domain` to "global".
- If talk is about an agent type (mimo, groq, claude), set `domain` to the agent name.
- If talk is about the compounding-intelligence system itself, set `domain` to "compounding-intelligence".
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
{
"knowledge": [
{
"fact": "One specific, actionable sentence of knowledge",
"category": "fact|pitfall|pattern|tool-quirk|question",
"domain": "global|{repo}|{agent}|compounding-intelligence",
"confidence": 0.0-1.0,
"tags": ["relevant", "keywords"],
"evidence": "Brief paraphrase or quote from the transcript that supports this"
}
],
"meta": {
"talk_title": "Title of the talk (if known)",
"speaker": "Speaker name(s)",
"conference": "Conference name",
"talk_url": "URL to talk/video (if available)",
"knowledge_count": 0,
"extraction_date": "2026-04-26"
}
}
TRANSCRIPT:
{{transcript}}
```
## Design Notes
- Keep `fact` field to **one clear sentence**. Avoid run-ons.
- `evidence` should be a 12 sentence paraphrase, not verbatim paragraph.
- `tags` should include: tool names, repo names, agent types, concepts mentioned
- Focus on what the fleet can **reuse tomorrow**, not ephemeral project context
- If the talk is high-level vision with no concrete details, that's a `question` or low-confidence `fact`

View File

@@ -0,0 +1,128 @@
"""Tests for docstring_generator module (Issue #96)."""
import ast
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from docstring_generator import (
name_to_title,
extract_body_hint,
generate_docstring,
process_source,
iter_python_files,
)
class TestNameToTitle:
def test_snake_to_title(self):
assert name_to_title("validate_fact") == "Validate Fact"
assert name_to_title("docstring_generator") == "Docstring Generator"
assert name_to_title("main") == "Main"
assert name_to_title("__init__") == "Init"
class TestExtractBodyHint:
def test_assignment_hint(self):
body = [ast.parse("result = compute()").body[0]]
hint = extract_body_hint(body)
assert hint == "Compute or return compute()"
def test_return_hint(self):
body = [ast.parse("return data").body[0]]
hint = extract_body_hint(body)
assert hint == "Return data"
def test_no_hint(self):
body = [ast.parse("pass").body[0]]
assert extract_body_hint(body) is None
class TestGenerateDocstring:
def test_simple_function(self):
src = "def add(a, b):\n return a + b\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'Add' in doc
assert 'a' in doc and 'b' in doc
assert 'Args:' in doc
assert 'Returns:' in doc
def test_typed_function(self):
src = "def greet(name: str) -> str:\n return f'Hello {name}'\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'name (str)' in doc
assert 'str' in doc
def test_async_function(self):
src = "async def fetch():\n pass\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'Fetch' in doc
def test_self_skipped(self):
src = "class C:\n def method(self, x):\n return x\n"
tree = ast.parse(src)
cls = tree.body[0]
method = cls.body[0]
doc = generate_docstring(method)
# 'self' should not appear in Args section
args_start = doc.find('Args:')
if args_start >= 0:
args_section = doc[args_start:]
assert '(self)' not in args_section
class TestProcessSource:
def test_adds_docstrings(self):
src = "def foo(x):\n return x * 2\n"
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 1 and funcs[0] == "foo"
assert '"""' in new_src
assert 'Foo' in new_src
def test_preserves_existing_docstrings(self):
src = 'def bar():\n """Already documented."""\n return 1\n'
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 0
assert new_src == src
def test_multiple_functions(self):
src = "def a(): pass\ndef b(): pass\ndef c(): pass\n"
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 3
assert '"""' in new_src
def test_dry_run_no_write(self, tmp_path):
file = tmp_path / "t.py"
file.write_text("def f(): pass\n")
original_mtime = file.stat().st_mtime
new_src, funcs = process_source(file.read_text(), str(file))
assert funcs # detected
# When caller handles write, dry-run leaves file unchanged
current_mtime = file.stat().st_mtime
assert current_mtime == original_mtime
class TestIterPythonFiles:
def test_single_file(self, tmp_path):
f = tmp_path / "single.py"
f.write_text("x = 1")
files = iter_python_files([str(f)])
assert len(files) == 1
assert files[0].name == "single.py"
def test_directory_recursion(self, tmp_path):
(tmp_path / "sub").mkdir()
(tmp_path / "sub" / "a.py").write_text("a=1")
(tmp_path / "b.py").write_text("b=2")
files = iter_python_files([str(tmp_path)])
assert len(files) == 2