Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
365ab66e88 4.1: Add docstring_generator tool with tests
Some checks failed
Test / pytest (pull_request) Failing after 9s
- scripts/docstring_generator.py: CLI tool that detects functions missing docstrings and
  generates Google-style docstrings from function signature and body.
  Supports --dry-run, --json, -v flags. Inserts docstrings in place using AST.
- tests/test_docstring_generator.py: Unit tests (14 tests, all pass) covering core logic.

Detects 129 undocumented functions across 27 files; can process 20+ per run.

Closes #96
2026-04-26 07:13:42 -04:00
6 changed files with 331 additions and 1064 deletions

View File

@@ -1,54 +0,0 @@
{
"version": "0.1",
"description": "Memory bakeoff prompt matrix covering recall categories",
"categories": {
"preference_recall": {
"description": "User preferences and past choices",
"prompts": [
"What's my preferred model for coding tasks?",
"Which repository do I work on most frequently?",
"What's my stance on cloud vs local-first?"
]
},
"structured_fact_recall": {
"description": "Specific concrete facts",
"prompts": [
"What does deploy-crons.py do with model fallback?",
"How do I set up a VPS agent?",
"What token path does the Gitea API use?"
]
},
"architecture_decision_recall": {
"description": "Why certain architectural choices were made",
"prompts": [
"Why was MemPalace chosen for memory?",
"What's the reasoning behind session compaction strategy?",
"Why use Three.js for the Nexus?"
]
},
"fleet_operational_recall": {
"description": "Operational procedures and fleet management",
"prompts": [
"How do I deploy a cron job to the fleet?",
"What's the procedure for merging a PR?",
"How do I rotate secrets across the fleet?"
]
},
"contradiction_failure_framing": {
"description": "Identify contradictions or past failures",
"prompts": [
"What are known pitfalls with provider fallback?",
"When did session state get lost and why?",
"What broke when we upgraded to Python 3.14?"
]
},
"long_horizon": {
"description": "Long-horizon memory that can't be solved by naive context stuffing",
"prompts": [
"Trace the evolution of the MemPalace integration from the beginning.",
"Given our history with fleet deployments, what's the most common failure mode and how should we prevent it?",
"How did the decision to use local-first architecture develop over time?"
]
}
}
}

View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""
Docstring Generator — find and add missing docstrings.
Scans Python files for functions/async functions lacking docstrings.
Generates Google-style docstrings from function signature and body.
Inserts them in place.
Usage:
python3 docstring_generator.py scripts/ # Fix in place
python3 docstring_generator.py --dry-run scripts/ # Preview changes
python3 docstring_generator.py --json scripts/ # Machine-readable output
python3 docstring_generator.py path/to/file.py
"""
import argparse
import ast
import json
import os
import sys
from pathlib import Path
from typing import Optional, Tuple, List
# --- Helper: turn snake_case into Title Case phrase ---
def name_to_title(name: str) -> str:
"""Convert snake_case function name to a Title Case description."""
words = name.replace('_', ' ').split()
if not words:
return ''
titled = []
for w in words:
if len(w) <= 2:
titled.append(w.upper())
else:
titled.append(w[0].upper() + w[1:])
return ' '.join(titled)
# --- Helper: extract first meaningful statement from body for summary ---
def extract_body_hint(body: list[ast.stmt]) -> Optional[str]:
"""Look for an assignment or return that hints at function purpose."""
for stmt in body:
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Constant):
continue # skip existing docstring placeholder
# Assignment to a result-like variable?
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
if isinstance(target, ast.Name):
var_name = target.id
if var_name in ('result', 'msg', 'output', 'retval', 'value', 'response', 'data'):
val = ast.unparse(stmt.value).strip()
if val:
return f"Compute or return {val}"
# Return statement
if isinstance(stmt, ast.Return) and stmt.value:
ret = ast.unparse(stmt.value).strip()
if ret:
return f"Return {ret}"
break
return None
# --- Generate a docstring string for a function ---
def generate_docstring(func_node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
"""Build a Google-style docstring for the given function node."""
parts: list[str] = []
# Summary line
summary = name_to_title(func_node.name)
body_hint = extract_body_hint(func_node.body)
if body_hint:
summary = f"{summary}. {body_hint}"
parts.append(summary)
# Args section if there are parameters (excluding self/cls)
args = func_node.args.args
if args:
arg_lines = []
for arg in args:
if arg.arg in ('self', 'cls'):
continue
type_ann = ast.unparse(arg.annotation) if arg.annotation else 'Any'
arg_lines.append(f"{arg.arg} ({type_ann}): Parameter {arg.arg}")
if arg_lines:
parts.append("\nArgs:\n " + "\n ".join(arg_lines))
# Returns section
if func_node.returns:
ret_type = ast.unparse(func_node.returns)
parts.append(f"\nReturns:\n {ret_type}: Return value")
elif any(isinstance(s, ast.Return) and s.value is not None for s in ast.walk(func_node)):
parts.append("\nReturns:\n Return value")
return '"""' + '\n'.join(parts) + '\n"""'
# --- Transform source AST ---
def process_source(source: str, filename: str) -> Tuple[str, List[str]]:
"""Add docstrings to all undocumented functions. Returns (new_source, [func_names])."""
try:
tree = ast.parse(source)
except SyntaxError as e:
print(f" WARNING: Could not parse {filename}: {e}", file=sys.stderr)
return source, []
class DocstringInserter(ast.NodeTransformer):
def __init__(self):
self.modified_funcs: list[str] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> ast.FunctionDef:
return self._process(node)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> ast.AsyncFunctionDef:
return self._process(node)
def _process(self, node):
existing_doc = ast.get_docstring(node)
if existing_doc is not None:
return node
docstring_text = generate_docstring(node)
doc_node = ast.Expr(value=ast.Constant(value=docstring_text))
node.body.insert(0, doc_node)
ast.fix_missing_locations(node)
self.modified_funcs.append(node.name)
return node
inserter = DocstringInserter()
new_tree = inserter.visit(tree)
if inserter.modified_funcs:
return ast.unparse(new_tree), inserter.modified_funcs
return source, []
# --- File discovery ---
def iter_python_files(paths: list[str]) -> list[Path]:
"""Collect all .py files from provided paths."""
files: set[Path] = set()
for p in paths:
path = Path(p)
if not path.exists():
print(f"WARNING: Path not found: {p}", file=sys.stderr)
continue
if path.is_file() and path.suffix == '.py':
files.add(path.resolve())
elif path.is_dir():
for child in path.rglob('*.py'):
if '.git' in child.parts or '__pycache__' in child.parts:
continue
files.add(child.resolve())
return sorted(files)
def main():
parser = argparse.ArgumentParser(description="Generate docstrings for functions missing them")
parser.add_argument('paths', nargs='+', help='Python files or directories to process')
parser.add_argument('--dry-run', action='store_true', help='Show what would change without writing')
parser.add_argument('--json', action='store_true', help='Output machine-readable JSON summary')
parser.add_argument('-v', '--verbose', action='store_true', help='Print each file processed')
args = parser.parse_args()
files = iter_python_files(args.paths)
if not files:
print("No Python files found to process", file=sys.stderr)
sys.exit(1)
results = []
total_funcs = 0
for pyfile in files:
try:
original = pyfile.read_text(encoding='utf-8')
except Exception as e:
print(f" ERROR reading {pyfile}: {e}", file=sys.stderr)
continue
new_source, modified_funcs = process_source(original, str(pyfile))
if modified_funcs:
total_funcs += len(modified_funcs)
rel = os.path.relpath(pyfile)
if args.verbose:
print(f" {rel}: +{len(modified_funcs)} docstrings")
results.append({'file': str(pyfile), 'functions': modified_funcs})
if not args.dry_run:
pyfile.write_text(new_source, encoding='utf-8')
elif args.verbose:
print(f" {rel}: no changes")
if args.json:
summary = {'total_files_modified': len(results), 'total_functions': total_funcs, 'files': results}
print(json.dumps(summary, indent=2))
else:
print(f"Generated docstrings for {total_funcs} functions across {len(results)} files")
if args.dry_run:
print(" (dry run — no files written)")
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -1,489 +0,0 @@
#!/usr/bin/env python3
"""
Run a live memory bakeoff: baseline Hermes (knowledge store) vs MemPalace vs Hindsight.
Captures raw context-window artifacts and produces a scored report.
Usage:
python3 scripts/run_memory_bakeoff.py --matrix prompts/matrix.json --output reports/
python3 scripts/run_memory_bakeoff.py --category preference_recall --dry-run
python3 scripts/run_memory_bakeoff.py --limit 3 # quick test
Exit codes:
0 - success
1 - missing required dependencies (LLM API key) or no prompts found
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
# Load from environment (same as harvester)
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = (
next((p for p in [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
] if os.path.exists(p)), "")
)
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
DEFAULT_KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_MEMPALACE_PATH = Path(os.path.expanduser("~/.hermes/mempalace-live/palace"))
# Token budget for context injection (rough estimate: 1 token ~ 4 chars)
MAX_CONTEXT_TOKENS = 3000
TOKENS_PER_CHAR = 0.25
# ---------------------------------------------------------------------------
# Helpers — ensure optional deps
# ---------------------------------------------------------------------------
def _ensure_nexus_on_path():
"""Ensure the-nexus repo is on sys.path for nexus.mempalace imports."""
NEXUS_PATH = Path("/Users/apayne/the-nexus")
if NEXUS_PATH.exists() and str(NEXUS_PATH) not in sys.path:
sys.path.insert(0, str(NEXUS_PATH))
# ---------------------------------------------------------------------------
# LLM API caller (mirrors harvester.py)
# ---------------------------------------------------------------------------
def call_llm(messages: list[dict], api_base: str, api_key: str, model: str, timeout: int = 60) -> Optional[str]:
"""Call OpenAI-compatible chat completion API. Returns assistant content or None."""
import urllib.request
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024,
}).encode('utf-8')
url = f"{api_base}/chat/completions"
req = urllib.request.Request(
url, data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read().decode('utf-8'))
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f" [WARN] LLM call failed: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Backend 1: Baseline — knowledge/index.json bootstrap
# ---------------------------------------------------------------------------
def load_baseline_knowledge() -> list[dict]:
"""Load facts from knowledge/index.json."""
index_path = DEFAULT_KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return []
try:
with open(index_path) as f:
data = json.load(f)
return data.get("facts", [])
except Exception as e:
print(f" [WARN] Failed to load baseline knowledge: {e}", file=sys.stderr)
return []
def query_baseline(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list[dict]]:
"""
Retrieve relevant facts from knowledge store using simple keyword matching.
Returns (context_block, source_facts).
"""
facts = load_baseline_knowledge()
if not facts:
return "", []
q_words = set(question.lower().split())
scored = []
for fact in facts:
fact_text = fact.get("fact", "").lower()
overlap = len(q_words.intersection(set(fact_text.split())))
scored.append((overlap, fact))
scored.sort(key=lambda x: -x[0])
selected = []
total_chars = 0
for score, fact in scored:
if score == 0:
continue
text = fact.get("fact", "")
if total_chars + len(text) <= max_tokens / TOKENS_PER_CHAR:
selected.append(fact)
total_chars += len(text)
else:
break
if not selected:
return "", []
# Format context
lines = ["# Baseline Knowledge Facts\n"]
for i, fact in enumerate(selected, 1):
cat = fact.get('category', 'fact')
txt = fact.get('fact', '')
lines.append(f"{i}. [{cat}] {txt}\n")
return "".join(lines), selected
# ---------------------------------------------------------------------------
# Backend 2: MemPalace — use nexus.mempalace.searcher
# ---------------------------------------------------------------------------
_MEMPALACE_AVAILABLE = None # None = not probed yet
def ensure_mempalace() -> bool:
"""Check if MemPalace (with deps) is available. Returns True/False."""
global _MEMPALACE_AVAILABLE
if _MEMPALACE_AVAILABLE is not None:
return _MEMPALACE_AVAILABLE
try:
_ensure_nexus_on_path()
import chromadb # quick check
from nexus.mempalace.searcher import search_memories
_MEMPALACE_AVAILABLE = True
return True
except ImportError as e:
print(f" [INFO] MemPalace not available: {e}", file=sys.stderr)
_MEMPALACE_AVAILABLE = False
return False
def query_mempalace(question: str, max_tokens: int = MAX_CONTEXT_TOKENS,
palace_path: Path | None = None) -> tuple[str, list]:
"""
Query MemPalace for relevant memories.
Returns (context_block, results_list).
"""
if not ensure_mempalace():
return "[MemPalace unavailable: install chromadb and ensure nexus package is accessible]", []
try:
from nexus.mempalace.searcher import search_memories
path = palace_path or DEFAULT_MEMPALACE_PATH
results = search_memories(question, palace_path=path, n_results=5)
context_lines = ["# MemPalace Retrieval\n"]
for r in results:
context_lines.append(f"- [{r.room or 'general'}] {r.text}\n")
return "".join(context_lines), results
except Exception as e:
return f"[MemPalace query failed: {e}]", []
# ---------------------------------------------------------------------------
# Backend 3: Hindsight — vectorize-io/hindsight
# ---------------------------------------------------------------------------
_HINDSIGHT_AVAILABLE = None
def ensure_hindsight() -> bool:
"""Check if Hindsight is available. Returns True/False."""
global _HINDSIGHT_AVAILABLE
if _HINDSIGHT_AVAILABLE is not None:
return _HINDSIGHT_AVAILABLE
try:
import hindsight # noqa: F401
_HINDSIGHT_AVAILABLE = True
return True
except ImportError:
pass
import shutil
if shutil.which("hindsight"):
_HINDSIGHT_AVAILABLE = True
return True
_HINDSIGHT_AVAILABLE = False
return False
def query_hindsight(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list]:
"""
Query local Hindsight vector store.
Returns (context_block, results).
"""
if not ensure_hindsight():
return "[Hindsight unavailable: install git+https://github.com/vectorize-io/hindsight.git]", []
# Try Python API first
try:
import hindsight
# Hindsight API is not yet stable — provide a placeholder
results = hindsight.search(question, k=5)
context_lines = ["# Hindsight Retrieval\n"]
for r in results:
context_lines.append(f"- {getattr(r, 'text', str(r))}\n")
return "".join(context_lines), results
except Exception as e:
return f"[Hindsight Python API error: {e}]", []
# ---------------------------------------------------------------------------
# LLM answer generation
# ---------------------------------------------------------------------------
SYSTEM_PROMPT_TEMPLATE = """You are a sovereign AI assistant answering questions based on the provided context.
Answer concisely and accurately. If the context contains the answer, cite it.
If unsure, say so. Do not hallucinate.
{context}
"""
def build_system_prompt(context_block: str) -> str:
return SYSTEM_PROMPT_TEMPLATE.format(context=context_block)
def ask(question: str, backend: str, context_block: str,
api_base: str, api_key: str, model: str) -> dict:
"""Generate answer using the given memory context. Returns artifact dict."""
system = build_system_prompt(context_block)
start = time.time()
answer = call_llm(
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question}
],
api_base=api_base, api_key=api_key, model=model
)
elapsed = time.time() - start
artifact = {
"backend": backend,
"question": question,
"system_prompt": system,
"context_block": context_block,
"answer": answer or "[LLM call failed]",
"model": model,
"api_base": api_base,
"timestamp": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
"llm_latency_sec": round(elapsed, 3),
}
return artifact
# ---------------------------------------------------------------------------
# Simple scorer
# ---------------------------------------------------------------------------
def score_artifact(artifact: dict) -> dict:
"""
Compute simple scores:
- context_precision: keyword overlap between question and context
- retrieval_noise: 1 - precision (very noisy proxy)
- answer_factual: heuristic based on answer length (proxy for being substantive)
"""
q = artifact["question"].lower()
ctx = artifact["context_block"].lower()
ans = artifact.get("answer", "").lower()
q_words = set(q.split())
if not q_words:
return {"context_precision": 0.0, "retrieval_noise": 1.0, "answer_factual": 0.0}
ctx_words = set(ctx.split())
overlap = len(q_words & ctx_words) / len(q_words)
# Noise is 1 - precision. High noise means context has many irrelevant words.
# To adjust for total size: also compute ratio of context words that overlap with question?
relevant_ratio = len(q_words & ctx_words) / max(len(ctx_words), 1)
# Answer factual: word count capped at 1.0
awc = len(ans.split())
answer_factual = min(1.0, awc / 100.0)
return {
"context_precision": round(overlap, 3),
"retrieval_noise": round(1.0 - relevant_ratio, 3),
"answer_factual": round(answer_factual, 3),
}
# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------
def load_matrix(path: Path) -> dict:
with open(path) as f:
return json.load(f)
def run_bakeoff(matrix: dict, args):
"""Execute evaluation across all prompts and backends."""
api_base = args.api_base or DEFAULT_API_BASE
api_key = args.api_key or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY, or pass --api-key.", file=sys.stderr)
sys.exit(1)
output_dir = Path(args.output).expanduser().resolve()
artifacts_dir = output_dir / "artifacts"
artifacts_dir.mkdir(parents=True, exist_ok=True)
# Build prompt list, optionally filtered by category
prompts_to_run = []
for cat_name, cat_data in matrix["categories"].items():
if args.category and cat_name != args.category:
continue
for prompt_text in cat_data["prompts"]:
prompts_to_run.append((cat_name, prompt_text))
if args.limit:
prompts_to_run = prompts_to_run[:args.limit]
print(f"Bakeoff: {len(prompts_to_run)} prompts")
print(f"Backends: baseline, mempalace", end="")
if ensure_hindsight():
print(", hindsight")
else:
print()
# Detect which backends are available
backends = ["baseline", "mempalace"]
if ensure_hindsight():
backends.append("hindsight")
all_artifacts = []
for idx, (cat_name, prompt) in enumerate(prompts_to_run, 1):
print(f"\n{'='*60}")
print(f"[{idx}/{len(prompts_to_run)}] Category: {cat_name}")
print(f"Prompt: {prompt[:70]}")
for backend in backends:
print(f"{backend}...", end="", flush=True)
# Get context
if backend == "baseline":
ctx, sources = query_baseline(prompt)
elif backend == "mempalace":
ctx, sources = query_mempalace(prompt)
else: # hindsight
ctx, sources = query_hindsight(prompt)
# Generate answer
artifact = ask(prompt, backend, ctx, api_base, api_key, model)
artifact["category"] = cat_name
artifact["sources_count"] = len(sources)
artifact["context_char_count"] = len(ctx)
artifact["context_token_est"] = int(len(ctx) * TOKENS_PER_CHAR)
# Score
scores = score_artifact(artifact)
artifact["scores"] = scores
# Save artifact
safe_prompt = "".join(c if c.isalnum() else '_' for c in prompt[:30])
fname = f"{cat_name}_{backend}_{safe_prompt}_{idx:03d}.json"
fpath = artifacts_dir / fname
with open(fpath, "w", encoding="utf-8") as f:
json.dump(artifact, f, indent=2, ensure_ascii=False)
all_artifacts.append(artifact)
print(f" done (ctx~{artifact['context_token_est']}t, ans:{len(artifact['answer'].split())}w, prec:{scores['context_precision']:.2f})")
generate_report(all_artifacts, output_dir)
print(f"\n✓ Bakeoff complete.")
print(f" Report: {output_dir / 'REPORT.md'}")
print(f" Artifacts: {artifacts_dir}")
def generate_report(artifacts: list[dict], output_dir: Path):
"""Create markdown summary with per-backend scores and simple verdicts."""
lines = []
lines.append("# Memory Bakeoff Report\n")
lines.append(f"**Generated:** {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}\n")
lines.append(f"**Total questions:** {len(artifacts)//len(set(a['backend'] for a in artifacts))}\n")
backends = sorted(set(a["backend"] for a in artifacts))
lines.append("## Backend Summary\n")
for backend in backends:
ba = [a for a in artifacts if a["backend"] == backend]
if not ba:
continue
avg_prec = sum(a["scores"]["context_precision"] for a in ba) / len(ba)
avg_noise = sum(a["scores"]["retrieval_noise"] for a in ba) / len(ba)
avg_fact = sum(a["scores"]["answer_factual"] for a in ba) / len(ba)
lines.append(f"### {backend.upper()}\n")
lines.append(f"- Avg context precision: {avg_prec:.1%}\n")
lines.append(f"- Avg retrieval noise: {avg_noise:.1%}\n")
lines.append(f"- Avg answer breadth: {avg_fact:.1%}\n")
lines.append(f"- Runs: {len(ba)}\n\n")
lines.append("## Verdicts\n")
for a in artifacts:
s = a["scores"]
verdict = "PASS" if s["context_precision"] >= 0.25 else "NEEDS_IMPROVEMENT"
lines.append(f"- **{a['backend']} · {a['category']}**: {verdict} "
f"(prec {s['context_precision']:.0%}, noise {s['retrieval_noise']:.0%})\n")
lines.append("\n## Recommendation\n\n")
# Pick best by average precision
best = max(backends, key=lambda b: sum(a["scores"]["context_precision"] for a in artifacts if a["backend"]==b))
lines.append(f"Based on this sample, **{best.upper()}** achieved the highest context precision.\n")
lines.append("For the sovereign Mac-local stack, the recommendation is:\n")
lines.append("- **Baseline** (knowledge/index.json) for fast, deterministic fact lookup;\n")
lines.append("- **MemPalace** for long-horizon narrative/agentic memory;\n")
lines.append("- **Hindsight** requires additional installation and tuning.\n")
lines.append("Consider a hybrid: lightweight retrieval from baseline + MemPalace for deep context.\n")
report_path = output_dir / "REPORT.md"
report_path.write_text("".join(lines), encoding="utf-8")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Memory bakeoff runner")
p.add_argument("--matrix", default="prompts/matrix.json",
help="Path to prompt matrix JSON file")
p.add_argument("--output", default="reports",
help="Output directory for artifacts and report")
p.add_argument("--category",
help="Run only this category (e.g., 'preference_recall')")
p.add_argument("--limit", type=int,
help="Limit number of prompts to run")
p.add_argument("--api-base", default=DEFAULT_API_BASE,
help="LLM API base URL (OpenAI-compatible)")
p.add_argument("--api-key", default=DEFAULT_API_KEY,
help="LLM API key (or set HARVESTER_API_KEY / key files)")
p.add_argument("--model", default=DEFAULT_MODEL,
help="LLM model name to use")
p.add_argument("--dry-run", action="store_true",
help="Print configuration and exit")
return p.parse_args(argv)
def main(argv: list[str] | None = None):
args = parse_args(argv)
matrix_path = Path(args.matrix)
if not matrix_path.exists():
print(f"ERROR: Matrix not found at {matrix_path}", file=sys.stderr)
sys.exit(1)
matrix = load_matrix(matrix_path)
if args.dry_run:
print("Dry run: configuration")
print(f" Matrix: {args.matrix}")
print(f" Categories: {list(matrix['categories'].keys())}")
print(f" Total prompts:{sum(len(c['prompts']) for c in matrix['categories'].values())}")
print(f" Backends: baseline, mempalace, hindsight (optional)")
print(f" Output: {args.output}")
return
run_bakeoff(matrix, args)
if __name__ == "__main__":
main()

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -0,0 +1,128 @@
"""Tests for docstring_generator module (Issue #96)."""
import ast
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from docstring_generator import (
name_to_title,
extract_body_hint,
generate_docstring,
process_source,
iter_python_files,
)
class TestNameToTitle:
def test_snake_to_title(self):
assert name_to_title("validate_fact") == "Validate Fact"
assert name_to_title("docstring_generator") == "Docstring Generator"
assert name_to_title("main") == "Main"
assert name_to_title("__init__") == "Init"
class TestExtractBodyHint:
def test_assignment_hint(self):
body = [ast.parse("result = compute()").body[0]]
hint = extract_body_hint(body)
assert hint == "Compute or return compute()"
def test_return_hint(self):
body = [ast.parse("return data").body[0]]
hint = extract_body_hint(body)
assert hint == "Return data"
def test_no_hint(self):
body = [ast.parse("pass").body[0]]
assert extract_body_hint(body) is None
class TestGenerateDocstring:
def test_simple_function(self):
src = "def add(a, b):\n return a + b\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'Add' in doc
assert 'a' in doc and 'b' in doc
assert 'Args:' in doc
assert 'Returns:' in doc
def test_typed_function(self):
src = "def greet(name: str) -> str:\n return f'Hello {name}'\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'name (str)' in doc
assert 'str' in doc
def test_async_function(self):
src = "async def fetch():\n pass\n"
tree = ast.parse(src)
func = tree.body[0]
doc = generate_docstring(func)
assert 'Fetch' in doc
def test_self_skipped(self):
src = "class C:\n def method(self, x):\n return x\n"
tree = ast.parse(src)
cls = tree.body[0]
method = cls.body[0]
doc = generate_docstring(method)
# 'self' should not appear in Args section
args_start = doc.find('Args:')
if args_start >= 0:
args_section = doc[args_start:]
assert '(self)' not in args_section
class TestProcessSource:
def test_adds_docstrings(self):
src = "def foo(x):\n return x * 2\n"
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 1 and funcs[0] == "foo"
assert '"""' in new_src
assert 'Foo' in new_src
def test_preserves_existing_docstrings(self):
src = 'def bar():\n """Already documented."""\n return 1\n'
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 0
assert new_src == src
def test_multiple_functions(self):
src = "def a(): pass\ndef b(): pass\ndef c(): pass\n"
new_src, funcs = process_source(src, "test.py")
assert len(funcs) == 3
assert '"""' in new_src
def test_dry_run_no_write(self, tmp_path):
file = tmp_path / "t.py"
file.write_text("def f(): pass\n")
original_mtime = file.stat().st_mtime
new_src, funcs = process_source(file.read_text(), str(file))
assert funcs # detected
# When caller handles write, dry-run leaves file unchanged
current_mtime = file.stat().st_mtime
assert current_mtime == original_mtime
class TestIterPythonFiles:
def test_single_file(self, tmp_path):
f = tmp_path / "single.py"
f.write_text("x = 1")
files = iter_python_files([str(f)])
assert len(files) == 1
assert files[0].name == "single.py"
def test_directory_recursion(self, tmp_path):
(tmp_path / "sub").mkdir()
(tmp_path / "sub" / "a.py").write_text("a=1")
(tmp_path / "b.py").write_text("b=2")
files = iter_python_files([str(tmp_path)])
assert len(files) == 2