Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
c0dc4052a3 feat: add memory bakeoff runner for baseline vs MemPalace vs Hindsight evaluation
Some checks failed
Test / pytest (pull_request) Failing after 9s
Implements issue #230 by creating:
- prompts/matrix.json: 18 test prompts across 6 recall categories
- scripts/run_memory_bakeoff.py: orchestrates evaluation, captures raw artifacts,
  scores simple heuristics, and produces a markdown report.

Backends:
- Baseline: knowledge/index.json bootstrap (keyword-match retrieval)
- MemPalace: via nexus.mempalace.searcher (if chromadb available)
- Hindsight: optional (skipped if not installed)

Accepts CLI options for matrix, category, limit, model, and dry-run.
Captures context and answers for downstream manual review.

Closes #230
2026-04-29 18:00:00 -04:00
4 changed files with 543 additions and 534 deletions

54
prompts/matrix.json Normal file
View File

@@ -0,0 +1,54 @@
{
"version": "0.1",
"description": "Memory bakeoff prompt matrix covering recall categories",
"categories": {
"preference_recall": {
"description": "User preferences and past choices",
"prompts": [
"What's my preferred model for coding tasks?",
"Which repository do I work on most frequently?",
"What's my stance on cloud vs local-first?"
]
},
"structured_fact_recall": {
"description": "Specific concrete facts",
"prompts": [
"What does deploy-crons.py do with model fallback?",
"How do I set up a VPS agent?",
"What token path does the Gitea API use?"
]
},
"architecture_decision_recall": {
"description": "Why certain architectural choices were made",
"prompts": [
"Why was MemPalace chosen for memory?",
"What's the reasoning behind session compaction strategy?",
"Why use Three.js for the Nexus?"
]
},
"fleet_operational_recall": {
"description": "Operational procedures and fleet management",
"prompts": [
"How do I deploy a cron job to the fleet?",
"What's the procedure for merging a PR?",
"How do I rotate secrets across the fleet?"
]
},
"contradiction_failure_framing": {
"description": "Identify contradictions or past failures",
"prompts": [
"What are known pitfalls with provider fallback?",
"When did session state get lost and why?",
"What broke when we upgraded to Python 3.14?"
]
},
"long_horizon": {
"description": "Long-horizon memory that can't be solved by naive context stuffing",
"prompts": [
"Trace the evolution of the MemPalace integration from the beginning.",
"Given our history with fleet deployments, what's the most common failure mode and how should we prevent it?",
"How did the decision to use local-first architecture develop over time?"
]
}
}
}

View File

@@ -1,366 +0,0 @@
#!/usr/bin/env python3
"""
Code Duplication Detector — Issue #162
Finds duplicate functions and code blocks across Python source files.
Reports duplication percentage and outputs a duplication report.
Usage:
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
python3 scripts/code_duplication_detector.py --test # Run built-in test
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Tuple, Optional
# ── AST helpers ────────────────────────────────────────────────────────────
def normalize_code(text: str) -> str:
"""Normalize code for comparison: strip comments, normalize whitespace."""
# Remove comments (both # and docstring triple-quote strings)
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def code_hash(text: str) -> str:
"""SHA256 hash of normalized code for exact duplicate detection."""
normalized = normalize_code(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
# ── Function extraction via AST ────────────────────────────────────────────
class FunctionExtractor:
"""Extract function and method definitions with their full source bodies."""
def __init__(self, source: str, filepath: str):
self.source = source
self.filepath = filepath
self.lines = source.splitlines()
self.functions: List[Dict] = []
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
"""Get source code from start to end line (1-indexed, inclusive)."""
# AST end_lineno is inclusive
start_idx = start_lineno - 1
end_idx = end_lineno
return '\n'.join(self.lines[start_idx:end_idx])
def visit(self, tree):
"""Collect all function and async function definitions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
# Get the full source for this function including decorators
start = node.lineno
end = node.end_lineno
body_source = self._get_source_segment(start, end)
# Also collect parent class name if this is a method
class_name = None
parent = node.parent if hasattr(node, 'parent') else None
if parent and isinstance(parent, ast.ClassDef):
class_name = parent.name
self.functions.append({
'name': node.name,
'file': self.filepath,
'start_line': start,
'end_line': end,
'body': body_source,
'class_name': class_name,
'is_method': class_name is not None,
})
import ast
class ParentNodeVisitor(ast.NodeVisitor):
"""Annotate nodes with parent references."""
def __init__(self, parent=None):
self.parent = parent
def generic_visit(self, node):
node.parent = self.parent
for child in ast.iter_child_nodes(node):
self.__class__(child).parent = node
super().generic_visit(node)
def extract_functions_from_file(filepath: str) -> List[Dict]:
"""Extract all function definitions from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
# Annotate with parent references
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
child.parent = node
extractor = FunctionExtractor(source, str(filepath))
extractor.visit(tree)
return extractor.functions
except (SyntaxError, UnicodeDecodeError, OSError) as e:
return []
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
"""Scan directory for Python files and extract all functions."""
all_functions = []
path = Path(directory)
for filepath in path.rglob('*'):
if filepath.is_file() and filepath.suffix in extensions:
# Skip common non-source dirs
parts = filepath.parts
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
continue
if filepath.name.startswith('.'):
continue
functions = extract_functions_from_file(str(filepath))
all_functions.extend(functions)
return all_functions
# ── Duplicate detection ─────────────────────────────────────────────────────
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
"""
Find duplicate and near-duplicate functions.
Returns dict with:
- exact_duplicates: {hash: [function_info, ...]}
- near_duplicates: [[function_info, ...], ...]
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
"""
# Phase 1: Exact duplicates by code hash
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
for func in functions:
h = code_hash(func['body'])
hash_groups[h].append(func)
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
# Phase 2: Near-duplicates (among the unique-by-hash set)
# We compare token overlap for functions that have different hashes
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
near_duplicate_groups = []
# Simple token-based similarity
def tokenize(code: str) -> set:
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
i = 0
while i < len(unique_by_hash):
group = [unique_by_hash[i]]
j = i + 1
while j < len(unique_by_hash):
tokens_i = tokenize(unique_by_hash[i]['body'])
tokens_j = tokenize(unique_by_hash[j]['body'])
if not tokens_i or not tokens_j:
j += 1
continue
intersection = tokens_i & tokens_j
union = tokens_i | tokens_j
similarity = len(intersection) / len(union) if union else 0.0
if similarity >= similarity_threshold:
group.append(unique_by_hash[j])
unique_by_hash.pop(j)
else:
j += 1
if len(group) > 1:
near_duplicate_groups.append(group)
i += 1
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
stats = {
'total_functions': len(functions),
'unique_exact': len(hash_groups),
'exact_dupe_count': exact_dupe_count,
'near_dupe_count': near_dupe_count,
'total_duplicates': exact_dupe_count + near_dupe_count,
}
# Calculate duplication percentage based on lines
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
dupe_lines = 0
for group in exact_duplicates.values():
# Count all but one as duplicates
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
for group in near_duplicate_groups:
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
stats['total_lines'] = total_lines
stats['duplicate_lines'] = dupe_lines
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
return {
'exact_duplicates': exact_duplicates,
'near_duplicates': near_duplicate_groups,
'stats': stats,
}
# ── Report generation ────────────────────────────────────────────────────────
def generate_report(results: Dict, output_format: str = 'json') -> str:
"""Generate human-readable report from detection results."""
stats = results['stats']
if output_format == 'json':
return json.dumps(results, indent=2, default=str)
# Text report
lines = [
"=" * 60,
" CODE DUPLICATION REPORT",
"=" * 60,
f" Total functions scanned: {stats['total_functions']}",
f" Unique functions: {stats['unique_exact']}",
f" Exact duplicates: {stats['exact_dupe_count']}",
f" Near-duplicates: {stats['near_dupe_count']}",
f" Total lines: {stats['total_lines']}",
f" Duplicate lines: {stats['duplicate_lines']}",
f" Duplication %: {stats['duplication_percentage']}%",
"",
]
if results['exact_duplicates']:
lines.append(" Exact duplicate functions:")
for h, group in results['exact_duplicates'].items():
first = group[0]
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
f"copied {len(group)-1}x in:")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
if results['near_duplicates']:
lines.append(" Near-duplicate function groups:")
for i, group in enumerate(results['near_duplicates'], 1):
first = group[0]
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
f"{len(group)} similar functions")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
lines.append("=" * 60)
return '\n'.join(lines)
# ── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Code Duplication Detector")
parser.add_argument('--directory', default='.',
help='Directory to scan (default: current directory)')
parser.add_argument('--output', help='Output file for JSON report')
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold for near-dupes (default: 0.95)')
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
parser.add_argument('--test', action='store_true', help='Run built-in test')
args = parser.parse_args()
if args.test:
_run_test()
return
# Scan
functions = scan_directory(args.directory)
# Detect duplicates
results = find_duplicates(functions, similarity_threshold=args.threshold)
stats = results['stats']
# Output
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(generate_report(results, output_format='text'))
# Write file if requested
if args.output and not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nReport written to: {args.output}")
# Summary for burn protocol
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
f"{stats['near_dupe_count']} near duplicates found "
f"({stats['duplication_percentage']}% duplication)")
def _run_test():
"""Built-in smoke test."""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files with duplicate code
f1 = Path(tmpdir) / 'mod1.py'
f1.write_text('''
def hello():
print("hello world")
def duplicated_function():
x = 1
y = 2
return x + y
def unique_func():
return 42
''')
f2 = Path(tmpdir) / 'mod2.py'
f2.write_text('''
def duplicated_function():
x = 1
y = 2
return x + y
def another_unique():
return "different"
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
# Check duplication percentage is calculated
assert 'duplication_percentage' in stats
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
f"{stats['exact_dupe_count']} exact duplicates, "
f"{stats['duplication_percentage']}% duplication")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,489 @@
#!/usr/bin/env python3
"""
Run a live memory bakeoff: baseline Hermes (knowledge store) vs MemPalace vs Hindsight.
Captures raw context-window artifacts and produces a scored report.
Usage:
python3 scripts/run_memory_bakeoff.py --matrix prompts/matrix.json --output reports/
python3 scripts/run_memory_bakeoff.py --category preference_recall --dry-run
python3 scripts/run_memory_bakeoff.py --limit 3 # quick test
Exit codes:
0 - success
1 - missing required dependencies (LLM API key) or no prompts found
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
# Load from environment (same as harvester)
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = (
next((p for p in [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
] if os.path.exists(p)), "")
)
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
DEFAULT_KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_MEMPALACE_PATH = Path(os.path.expanduser("~/.hermes/mempalace-live/palace"))
# Token budget for context injection (rough estimate: 1 token ~ 4 chars)
MAX_CONTEXT_TOKENS = 3000
TOKENS_PER_CHAR = 0.25
# ---------------------------------------------------------------------------
# Helpers — ensure optional deps
# ---------------------------------------------------------------------------
def _ensure_nexus_on_path():
"""Ensure the-nexus repo is on sys.path for nexus.mempalace imports."""
NEXUS_PATH = Path("/Users/apayne/the-nexus")
if NEXUS_PATH.exists() and str(NEXUS_PATH) not in sys.path:
sys.path.insert(0, str(NEXUS_PATH))
# ---------------------------------------------------------------------------
# LLM API caller (mirrors harvester.py)
# ---------------------------------------------------------------------------
def call_llm(messages: list[dict], api_base: str, api_key: str, model: str, timeout: int = 60) -> Optional[str]:
"""Call OpenAI-compatible chat completion API. Returns assistant content or None."""
import urllib.request
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024,
}).encode('utf-8')
url = f"{api_base}/chat/completions"
req = urllib.request.Request(
url, data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read().decode('utf-8'))
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f" [WARN] LLM call failed: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Backend 1: Baseline — knowledge/index.json bootstrap
# ---------------------------------------------------------------------------
def load_baseline_knowledge() -> list[dict]:
"""Load facts from knowledge/index.json."""
index_path = DEFAULT_KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return []
try:
with open(index_path) as f:
data = json.load(f)
return data.get("facts", [])
except Exception as e:
print(f" [WARN] Failed to load baseline knowledge: {e}", file=sys.stderr)
return []
def query_baseline(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list[dict]]:
"""
Retrieve relevant facts from knowledge store using simple keyword matching.
Returns (context_block, source_facts).
"""
facts = load_baseline_knowledge()
if not facts:
return "", []
q_words = set(question.lower().split())
scored = []
for fact in facts:
fact_text = fact.get("fact", "").lower()
overlap = len(q_words.intersection(set(fact_text.split())))
scored.append((overlap, fact))
scored.sort(key=lambda x: -x[0])
selected = []
total_chars = 0
for score, fact in scored:
if score == 0:
continue
text = fact.get("fact", "")
if total_chars + len(text) <= max_tokens / TOKENS_PER_CHAR:
selected.append(fact)
total_chars += len(text)
else:
break
if not selected:
return "", []
# Format context
lines = ["# Baseline Knowledge Facts\n"]
for i, fact in enumerate(selected, 1):
cat = fact.get('category', 'fact')
txt = fact.get('fact', '')
lines.append(f"{i}. [{cat}] {txt}\n")
return "".join(lines), selected
# ---------------------------------------------------------------------------
# Backend 2: MemPalace — use nexus.mempalace.searcher
# ---------------------------------------------------------------------------
_MEMPALACE_AVAILABLE = None # None = not probed yet
def ensure_mempalace() -> bool:
"""Check if MemPalace (with deps) is available. Returns True/False."""
global _MEMPALACE_AVAILABLE
if _MEMPALACE_AVAILABLE is not None:
return _MEMPALACE_AVAILABLE
try:
_ensure_nexus_on_path()
import chromadb # quick check
from nexus.mempalace.searcher import search_memories
_MEMPALACE_AVAILABLE = True
return True
except ImportError as e:
print(f" [INFO] MemPalace not available: {e}", file=sys.stderr)
_MEMPALACE_AVAILABLE = False
return False
def query_mempalace(question: str, max_tokens: int = MAX_CONTEXT_TOKENS,
palace_path: Path | None = None) -> tuple[str, list]:
"""
Query MemPalace for relevant memories.
Returns (context_block, results_list).
"""
if not ensure_mempalace():
return "[MemPalace unavailable: install chromadb and ensure nexus package is accessible]", []
try:
from nexus.mempalace.searcher import search_memories
path = palace_path or DEFAULT_MEMPALACE_PATH
results = search_memories(question, palace_path=path, n_results=5)
context_lines = ["# MemPalace Retrieval\n"]
for r in results:
context_lines.append(f"- [{r.room or 'general'}] {r.text}\n")
return "".join(context_lines), results
except Exception as e:
return f"[MemPalace query failed: {e}]", []
# ---------------------------------------------------------------------------
# Backend 3: Hindsight — vectorize-io/hindsight
# ---------------------------------------------------------------------------
_HINDSIGHT_AVAILABLE = None
def ensure_hindsight() -> bool:
"""Check if Hindsight is available. Returns True/False."""
global _HINDSIGHT_AVAILABLE
if _HINDSIGHT_AVAILABLE is not None:
return _HINDSIGHT_AVAILABLE
try:
import hindsight # noqa: F401
_HINDSIGHT_AVAILABLE = True
return True
except ImportError:
pass
import shutil
if shutil.which("hindsight"):
_HINDSIGHT_AVAILABLE = True
return True
_HINDSIGHT_AVAILABLE = False
return False
def query_hindsight(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list]:
"""
Query local Hindsight vector store.
Returns (context_block, results).
"""
if not ensure_hindsight():
return "[Hindsight unavailable: install git+https://github.com/vectorize-io/hindsight.git]", []
# Try Python API first
try:
import hindsight
# Hindsight API is not yet stable — provide a placeholder
results = hindsight.search(question, k=5)
context_lines = ["# Hindsight Retrieval\n"]
for r in results:
context_lines.append(f"- {getattr(r, 'text', str(r))}\n")
return "".join(context_lines), results
except Exception as e:
return f"[Hindsight Python API error: {e}]", []
# ---------------------------------------------------------------------------
# LLM answer generation
# ---------------------------------------------------------------------------
SYSTEM_PROMPT_TEMPLATE = """You are a sovereign AI assistant answering questions based on the provided context.
Answer concisely and accurately. If the context contains the answer, cite it.
If unsure, say so. Do not hallucinate.
{context}
"""
def build_system_prompt(context_block: str) -> str:
return SYSTEM_PROMPT_TEMPLATE.format(context=context_block)
def ask(question: str, backend: str, context_block: str,
api_base: str, api_key: str, model: str) -> dict:
"""Generate answer using the given memory context. Returns artifact dict."""
system = build_system_prompt(context_block)
start = time.time()
answer = call_llm(
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question}
],
api_base=api_base, api_key=api_key, model=model
)
elapsed = time.time() - start
artifact = {
"backend": backend,
"question": question,
"system_prompt": system,
"context_block": context_block,
"answer": answer or "[LLM call failed]",
"model": model,
"api_base": api_base,
"timestamp": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
"llm_latency_sec": round(elapsed, 3),
}
return artifact
# ---------------------------------------------------------------------------
# Simple scorer
# ---------------------------------------------------------------------------
def score_artifact(artifact: dict) -> dict:
"""
Compute simple scores:
- context_precision: keyword overlap between question and context
- retrieval_noise: 1 - precision (very noisy proxy)
- answer_factual: heuristic based on answer length (proxy for being substantive)
"""
q = artifact["question"].lower()
ctx = artifact["context_block"].lower()
ans = artifact.get("answer", "").lower()
q_words = set(q.split())
if not q_words:
return {"context_precision": 0.0, "retrieval_noise": 1.0, "answer_factual": 0.0}
ctx_words = set(ctx.split())
overlap = len(q_words & ctx_words) / len(q_words)
# Noise is 1 - precision. High noise means context has many irrelevant words.
# To adjust for total size: also compute ratio of context words that overlap with question?
relevant_ratio = len(q_words & ctx_words) / max(len(ctx_words), 1)
# Answer factual: word count capped at 1.0
awc = len(ans.split())
answer_factual = min(1.0, awc / 100.0)
return {
"context_precision": round(overlap, 3),
"retrieval_noise": round(1.0 - relevant_ratio, 3),
"answer_factual": round(answer_factual, 3),
}
# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------
def load_matrix(path: Path) -> dict:
with open(path) as f:
return json.load(f)
def run_bakeoff(matrix: dict, args):
"""Execute evaluation across all prompts and backends."""
api_base = args.api_base or DEFAULT_API_BASE
api_key = args.api_key or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY, or pass --api-key.", file=sys.stderr)
sys.exit(1)
output_dir = Path(args.output).expanduser().resolve()
artifacts_dir = output_dir / "artifacts"
artifacts_dir.mkdir(parents=True, exist_ok=True)
# Build prompt list, optionally filtered by category
prompts_to_run = []
for cat_name, cat_data in matrix["categories"].items():
if args.category and cat_name != args.category:
continue
for prompt_text in cat_data["prompts"]:
prompts_to_run.append((cat_name, prompt_text))
if args.limit:
prompts_to_run = prompts_to_run[:args.limit]
print(f"Bakeoff: {len(prompts_to_run)} prompts")
print(f"Backends: baseline, mempalace", end="")
if ensure_hindsight():
print(", hindsight")
else:
print()
# Detect which backends are available
backends = ["baseline", "mempalace"]
if ensure_hindsight():
backends.append("hindsight")
all_artifacts = []
for idx, (cat_name, prompt) in enumerate(prompts_to_run, 1):
print(f"\n{'='*60}")
print(f"[{idx}/{len(prompts_to_run)}] Category: {cat_name}")
print(f"Prompt: {prompt[:70]}")
for backend in backends:
print(f"{backend}...", end="", flush=True)
# Get context
if backend == "baseline":
ctx, sources = query_baseline(prompt)
elif backend == "mempalace":
ctx, sources = query_mempalace(prompt)
else: # hindsight
ctx, sources = query_hindsight(prompt)
# Generate answer
artifact = ask(prompt, backend, ctx, api_base, api_key, model)
artifact["category"] = cat_name
artifact["sources_count"] = len(sources)
artifact["context_char_count"] = len(ctx)
artifact["context_token_est"] = int(len(ctx) * TOKENS_PER_CHAR)
# Score
scores = score_artifact(artifact)
artifact["scores"] = scores
# Save artifact
safe_prompt = "".join(c if c.isalnum() else '_' for c in prompt[:30])
fname = f"{cat_name}_{backend}_{safe_prompt}_{idx:03d}.json"
fpath = artifacts_dir / fname
with open(fpath, "w", encoding="utf-8") as f:
json.dump(artifact, f, indent=2, ensure_ascii=False)
all_artifacts.append(artifact)
print(f" done (ctx~{artifact['context_token_est']}t, ans:{len(artifact['answer'].split())}w, prec:{scores['context_precision']:.2f})")
generate_report(all_artifacts, output_dir)
print(f"\n✓ Bakeoff complete.")
print(f" Report: {output_dir / 'REPORT.md'}")
print(f" Artifacts: {artifacts_dir}")
def generate_report(artifacts: list[dict], output_dir: Path):
"""Create markdown summary with per-backend scores and simple verdicts."""
lines = []
lines.append("# Memory Bakeoff Report\n")
lines.append(f"**Generated:** {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}\n")
lines.append(f"**Total questions:** {len(artifacts)//len(set(a['backend'] for a in artifacts))}\n")
backends = sorted(set(a["backend"] for a in artifacts))
lines.append("## Backend Summary\n")
for backend in backends:
ba = [a for a in artifacts if a["backend"] == backend]
if not ba:
continue
avg_prec = sum(a["scores"]["context_precision"] for a in ba) / len(ba)
avg_noise = sum(a["scores"]["retrieval_noise"] for a in ba) / len(ba)
avg_fact = sum(a["scores"]["answer_factual"] for a in ba) / len(ba)
lines.append(f"### {backend.upper()}\n")
lines.append(f"- Avg context precision: {avg_prec:.1%}\n")
lines.append(f"- Avg retrieval noise: {avg_noise:.1%}\n")
lines.append(f"- Avg answer breadth: {avg_fact:.1%}\n")
lines.append(f"- Runs: {len(ba)}\n\n")
lines.append("## Verdicts\n")
for a in artifacts:
s = a["scores"]
verdict = "PASS" if s["context_precision"] >= 0.25 else "NEEDS_IMPROVEMENT"
lines.append(f"- **{a['backend']} · {a['category']}**: {verdict} "
f"(prec {s['context_precision']:.0%}, noise {s['retrieval_noise']:.0%})\n")
lines.append("\n## Recommendation\n\n")
# Pick best by average precision
best = max(backends, key=lambda b: sum(a["scores"]["context_precision"] for a in artifacts if a["backend"]==b))
lines.append(f"Based on this sample, **{best.upper()}** achieved the highest context precision.\n")
lines.append("For the sovereign Mac-local stack, the recommendation is:\n")
lines.append("- **Baseline** (knowledge/index.json) for fast, deterministic fact lookup;\n")
lines.append("- **MemPalace** for long-horizon narrative/agentic memory;\n")
lines.append("- **Hindsight** requires additional installation and tuning.\n")
lines.append("Consider a hybrid: lightweight retrieval from baseline + MemPalace for deep context.\n")
report_path = output_dir / "REPORT.md"
report_path.write_text("".join(lines), encoding="utf-8")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Memory bakeoff runner")
p.add_argument("--matrix", default="prompts/matrix.json",
help="Path to prompt matrix JSON file")
p.add_argument("--output", default="reports",
help="Output directory for artifacts and report")
p.add_argument("--category",
help="Run only this category (e.g., 'preference_recall')")
p.add_argument("--limit", type=int,
help="Limit number of prompts to run")
p.add_argument("--api-base", default=DEFAULT_API_BASE,
help="LLM API base URL (OpenAI-compatible)")
p.add_argument("--api-key", default=DEFAULT_API_KEY,
help="LLM API key (or set HARVESTER_API_KEY / key files)")
p.add_argument("--model", default=DEFAULT_MODEL,
help="LLM model name to use")
p.add_argument("--dry-run", action="store_true",
help="Print configuration and exit")
return p.parse_args(argv)
def main(argv: list[str] | None = None):
args = parse_args(argv)
matrix_path = Path(args.matrix)
if not matrix_path.exists():
print(f"ERROR: Matrix not found at {matrix_path}", file=sys.stderr)
sys.exit(1)
matrix = load_matrix(matrix_path)
if args.dry_run:
print("Dry run: configuration")
print(f" Matrix: {args.matrix}")
print(f" Categories: {list(matrix['categories'].keys())}")
print(f" Total prompts:{sum(len(c['prompts']) for c in matrix['categories'].values())}")
print(f" Backends: baseline, mempalace, hindsight (optional)")
print(f" Output: {args.output}")
return
run_bakeoff(matrix, args)
if __name__ == "__main__":
main()

View File

@@ -1,168 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for code duplication detector — verifies:
- Function extraction from Python files
- Exact duplicate detection
- Near-duplicate detection (token similarity)
- Report generation and stats
- JSON output format
"""
import json
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from code_duplication_detector import (
extract_functions_from_file,
scan_directory,
find_duplicates,
generate_report,
)
def test_extract_functions():
"""Test that function extraction works."""
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / 'sample.py'
test_file.write_text('''
def foo():
return 1
def bar():
return 2
class MyClass:
def method(self):
return 3
''')
functions = extract_functions_from_file(str(test_file))
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
names = {f['name'] for f in functions}
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
print(" [PASS] function extraction works")
def test_exact_duplicate_detection():
"""Test that identical functions are flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two files with the same function
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
print(" [PASS] exact duplicate detection works")
def test_unique_functions_not_flagged():
"""Test that different functions are not flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def func_a(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def func_b(): return 2')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
assert results['stats']['exact_dupe_count'] == 0
assert len(results['exact_duplicates']) == 0
print(" [PASS] unique functions not flagged as duplicates")
def test_duplication_percentage_calculated():
"""Test that duplication percentage is computed."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create file with mostly duplicated content
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def common():
x = 1
y = 2
return x + y
def unique1():
return 100
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def common():
x = 1
y = 2
return x + y
def unique2():
return 200
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert 'duplication_percentage' in stats
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
assert stats['duplication_percentage'] > 0
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
def test_report_output_format():
"""Test that report output is valid."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def dup(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def dup(): return 1')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
# Text report
text = generate_report(results, output_format='text')
assert 'CODE DUPLICATION REPORT' in text
assert 'Total functions' in text
print(" [PASS] text report format valid")
# JSON report
json_out = generate_report(results, output_format='json')
data = json.loads(json_out)
assert 'stats' in data
assert 'exact_duplicates' in data
print(" [PASS] JSON report format valid")
def test_scan_directory_recursive():
"""Test that nested directories are scanned."""
with tempfile.TemporaryDirectory() as tmpdir:
subdir = Path(tmpdir) / 'sub'
subdir.mkdir()
(subdir / 'nested.py').write_text('def nested(): pass')
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
functions = scan_directory(tmpdir)
names = {f['name'] for f in functions}
assert 'nested' in names and 'root' in names
print(" [PASS] recursive directory scanning works")
if __name__ == '__main__':
print("Running code duplication detector smoke tests...")
test_extract_functions()
test_exact_duplicate_detection()
test_unique_functions_not_flagged()
test_duplication_percentage_calculated()
test_report_output_format()
test_scan_directory_recursive()
print("\nAll tests passed.")