Compare commits

..

1 Commits

Author SHA1 Message Date
Alex Payne
b1a728f5f4 feat: fix session_pair_harvester to use role/content format (#91)
Some checks failed
Test / pytest (pull_request) Failing after 8s
- Harvester used old message fields (from/value) but Hermes sessions use role/content
- Import session_reader to normalize conversations properly
- Update extract function to operate on normalized role/content messages
- Change predecessor lookup from "human"/"gpt" to "user"/"assistant"
- Add comprehensive smoke tests (8 tests, all pass)
- Verify extraction from test_sessions: 11 pairs, avg ratio 8.13
2026-04-26 00:19:56 -04:00
6 changed files with 155 additions and 1120 deletions

View File

@@ -1,54 +0,0 @@
{
"version": "0.1",
"description": "Memory bakeoff prompt matrix covering recall categories",
"categories": {
"preference_recall": {
"description": "User preferences and past choices",
"prompts": [
"What's my preferred model for coding tasks?",
"Which repository do I work on most frequently?",
"What's my stance on cloud vs local-first?"
]
},
"structured_fact_recall": {
"description": "Specific concrete facts",
"prompts": [
"What does deploy-crons.py do with model fallback?",
"How do I set up a VPS agent?",
"What token path does the Gitea API use?"
]
},
"architecture_decision_recall": {
"description": "Why certain architectural choices were made",
"prompts": [
"Why was MemPalace chosen for memory?",
"What's the reasoning behind session compaction strategy?",
"Why use Three.js for the Nexus?"
]
},
"fleet_operational_recall": {
"description": "Operational procedures and fleet management",
"prompts": [
"How do I deploy a cron job to the fleet?",
"What's the procedure for merging a PR?",
"How do I rotate secrets across the fleet?"
]
},
"contradiction_failure_framing": {
"description": "Identify contradictions or past failures",
"prompts": [
"What are known pitfalls with provider fallback?",
"When did session state get lost and why?",
"What broke when we upgraded to Python 3.14?"
]
},
"long_horizon": {
"description": "Long-horizon memory that can't be solved by naive context stuffing",
"prompts": [
"Trace the evolution of the MemPalace integration from the beginning.",
"Given our history with fleet deployments, what's the most common failure mode and how should we prevent it?",
"How did the decision to use local-first architecture develop over time?"
]
}
}
}

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -1,489 +0,0 @@
#!/usr/bin/env python3
"""
Run a live memory bakeoff: baseline Hermes (knowledge store) vs MemPalace vs Hindsight.
Captures raw context-window artifacts and produces a scored report.
Usage:
python3 scripts/run_memory_bakeoff.py --matrix prompts/matrix.json --output reports/
python3 scripts/run_memory_bakeoff.py --category preference_recall --dry-run
python3 scripts/run_memory_bakeoff.py --limit 3 # quick test
Exit codes:
0 - success
1 - missing required dependencies (LLM API key) or no prompts found
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
# Load from environment (same as harvester)
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = (
next((p for p in [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
] if os.path.exists(p)), "")
)
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
DEFAULT_KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
DEFAULT_MEMPALACE_PATH = Path(os.path.expanduser("~/.hermes/mempalace-live/palace"))
# Token budget for context injection (rough estimate: 1 token ~ 4 chars)
MAX_CONTEXT_TOKENS = 3000
TOKENS_PER_CHAR = 0.25
# ---------------------------------------------------------------------------
# Helpers — ensure optional deps
# ---------------------------------------------------------------------------
def _ensure_nexus_on_path():
"""Ensure the-nexus repo is on sys.path for nexus.mempalace imports."""
NEXUS_PATH = Path("/Users/apayne/the-nexus")
if NEXUS_PATH.exists() and str(NEXUS_PATH) not in sys.path:
sys.path.insert(0, str(NEXUS_PATH))
# ---------------------------------------------------------------------------
# LLM API caller (mirrors harvester.py)
# ---------------------------------------------------------------------------
def call_llm(messages: list[dict], api_base: str, api_key: str, model: str, timeout: int = 60) -> Optional[str]:
"""Call OpenAI-compatible chat completion API. Returns assistant content or None."""
import urllib.request
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024,
}).encode('utf-8')
url = f"{api_base}/chat/completions"
req = urllib.request.Request(
url, data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
result = json.loads(resp.read().decode('utf-8'))
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f" [WARN] LLM call failed: {e}", file=sys.stderr)
return None
# ---------------------------------------------------------------------------
# Backend 1: Baseline — knowledge/index.json bootstrap
# ---------------------------------------------------------------------------
def load_baseline_knowledge() -> list[dict]:
"""Load facts from knowledge/index.json."""
index_path = DEFAULT_KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return []
try:
with open(index_path) as f:
data = json.load(f)
return data.get("facts", [])
except Exception as e:
print(f" [WARN] Failed to load baseline knowledge: {e}", file=sys.stderr)
return []
def query_baseline(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list[dict]]:
"""
Retrieve relevant facts from knowledge store using simple keyword matching.
Returns (context_block, source_facts).
"""
facts = load_baseline_knowledge()
if not facts:
return "", []
q_words = set(question.lower().split())
scored = []
for fact in facts:
fact_text = fact.get("fact", "").lower()
overlap = len(q_words.intersection(set(fact_text.split())))
scored.append((overlap, fact))
scored.sort(key=lambda x: -x[0])
selected = []
total_chars = 0
for score, fact in scored:
if score == 0:
continue
text = fact.get("fact", "")
if total_chars + len(text) <= max_tokens / TOKENS_PER_CHAR:
selected.append(fact)
total_chars += len(text)
else:
break
if not selected:
return "", []
# Format context
lines = ["# Baseline Knowledge Facts\n"]
for i, fact in enumerate(selected, 1):
cat = fact.get('category', 'fact')
txt = fact.get('fact', '')
lines.append(f"{i}. [{cat}] {txt}\n")
return "".join(lines), selected
# ---------------------------------------------------------------------------
# Backend 2: MemPalace — use nexus.mempalace.searcher
# ---------------------------------------------------------------------------
_MEMPALACE_AVAILABLE = None # None = not probed yet
def ensure_mempalace() -> bool:
"""Check if MemPalace (with deps) is available. Returns True/False."""
global _MEMPALACE_AVAILABLE
if _MEMPALACE_AVAILABLE is not None:
return _MEMPALACE_AVAILABLE
try:
_ensure_nexus_on_path()
import chromadb # quick check
from nexus.mempalace.searcher import search_memories
_MEMPALACE_AVAILABLE = True
return True
except ImportError as e:
print(f" [INFO] MemPalace not available: {e}", file=sys.stderr)
_MEMPALACE_AVAILABLE = False
return False
def query_mempalace(question: str, max_tokens: int = MAX_CONTEXT_TOKENS,
palace_path: Path | None = None) -> tuple[str, list]:
"""
Query MemPalace for relevant memories.
Returns (context_block, results_list).
"""
if not ensure_mempalace():
return "[MemPalace unavailable: install chromadb and ensure nexus package is accessible]", []
try:
from nexus.mempalace.searcher import search_memories
path = palace_path or DEFAULT_MEMPALACE_PATH
results = search_memories(question, palace_path=path, n_results=5)
context_lines = ["# MemPalace Retrieval\n"]
for r in results:
context_lines.append(f"- [{r.room or 'general'}] {r.text}\n")
return "".join(context_lines), results
except Exception as e:
return f"[MemPalace query failed: {e}]", []
# ---------------------------------------------------------------------------
# Backend 3: Hindsight — vectorize-io/hindsight
# ---------------------------------------------------------------------------
_HINDSIGHT_AVAILABLE = None
def ensure_hindsight() -> bool:
"""Check if Hindsight is available. Returns True/False."""
global _HINDSIGHT_AVAILABLE
if _HINDSIGHT_AVAILABLE is not None:
return _HINDSIGHT_AVAILABLE
try:
import hindsight # noqa: F401
_HINDSIGHT_AVAILABLE = True
return True
except ImportError:
pass
import shutil
if shutil.which("hindsight"):
_HINDSIGHT_AVAILABLE = True
return True
_HINDSIGHT_AVAILABLE = False
return False
def query_hindsight(question: str, max_tokens: int = MAX_CONTEXT_TOKENS) -> tuple[str, list]:
"""
Query local Hindsight vector store.
Returns (context_block, results).
"""
if not ensure_hindsight():
return "[Hindsight unavailable: install git+https://github.com/vectorize-io/hindsight.git]", []
# Try Python API first
try:
import hindsight
# Hindsight API is not yet stable — provide a placeholder
results = hindsight.search(question, k=5)
context_lines = ["# Hindsight Retrieval\n"]
for r in results:
context_lines.append(f"- {getattr(r, 'text', str(r))}\n")
return "".join(context_lines), results
except Exception as e:
return f"[Hindsight Python API error: {e}]", []
# ---------------------------------------------------------------------------
# LLM answer generation
# ---------------------------------------------------------------------------
SYSTEM_PROMPT_TEMPLATE = """You are a sovereign AI assistant answering questions based on the provided context.
Answer concisely and accurately. If the context contains the answer, cite it.
If unsure, say so. Do not hallucinate.
{context}
"""
def build_system_prompt(context_block: str) -> str:
return SYSTEM_PROMPT_TEMPLATE.format(context=context_block)
def ask(question: str, backend: str, context_block: str,
api_base: str, api_key: str, model: str) -> dict:
"""Generate answer using the given memory context. Returns artifact dict."""
system = build_system_prompt(context_block)
start = time.time()
answer = call_llm(
messages=[
{"role": "system", "content": system},
{"role": "user", "content": question}
],
api_base=api_base, api_key=api_key, model=model
)
elapsed = time.time() - start
artifact = {
"backend": backend,
"question": question,
"system_prompt": system,
"context_block": context_block,
"answer": answer or "[LLM call failed]",
"model": model,
"api_base": api_base,
"timestamp": datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
"llm_latency_sec": round(elapsed, 3),
}
return artifact
# ---------------------------------------------------------------------------
# Simple scorer
# ---------------------------------------------------------------------------
def score_artifact(artifact: dict) -> dict:
"""
Compute simple scores:
- context_precision: keyword overlap between question and context
- retrieval_noise: 1 - precision (very noisy proxy)
- answer_factual: heuristic based on answer length (proxy for being substantive)
"""
q = artifact["question"].lower()
ctx = artifact["context_block"].lower()
ans = artifact.get("answer", "").lower()
q_words = set(q.split())
if not q_words:
return {"context_precision": 0.0, "retrieval_noise": 1.0, "answer_factual": 0.0}
ctx_words = set(ctx.split())
overlap = len(q_words & ctx_words) / len(q_words)
# Noise is 1 - precision. High noise means context has many irrelevant words.
# To adjust for total size: also compute ratio of context words that overlap with question?
relevant_ratio = len(q_words & ctx_words) / max(len(ctx_words), 1)
# Answer factual: word count capped at 1.0
awc = len(ans.split())
answer_factual = min(1.0, awc / 100.0)
return {
"context_precision": round(overlap, 3),
"retrieval_noise": round(1.0 - relevant_ratio, 3),
"answer_factual": round(answer_factual, 3),
}
# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------
def load_matrix(path: Path) -> dict:
with open(path) as f:
return json.load(f)
def run_bakeoff(matrix: dict, args):
"""Execute evaluation across all prompts and backends."""
api_base = args.api_base or DEFAULT_API_BASE
api_key = args.api_key or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY, or pass --api-key.", file=sys.stderr)
sys.exit(1)
output_dir = Path(args.output).expanduser().resolve()
artifacts_dir = output_dir / "artifacts"
artifacts_dir.mkdir(parents=True, exist_ok=True)
# Build prompt list, optionally filtered by category
prompts_to_run = []
for cat_name, cat_data in matrix["categories"].items():
if args.category and cat_name != args.category:
continue
for prompt_text in cat_data["prompts"]:
prompts_to_run.append((cat_name, prompt_text))
if args.limit:
prompts_to_run = prompts_to_run[:args.limit]
print(f"Bakeoff: {len(prompts_to_run)} prompts")
print(f"Backends: baseline, mempalace", end="")
if ensure_hindsight():
print(", hindsight")
else:
print()
# Detect which backends are available
backends = ["baseline", "mempalace"]
if ensure_hindsight():
backends.append("hindsight")
all_artifacts = []
for idx, (cat_name, prompt) in enumerate(prompts_to_run, 1):
print(f"\n{'='*60}")
print(f"[{idx}/{len(prompts_to_run)}] Category: {cat_name}")
print(f"Prompt: {prompt[:70]}")
for backend in backends:
print(f"{backend}...", end="", flush=True)
# Get context
if backend == "baseline":
ctx, sources = query_baseline(prompt)
elif backend == "mempalace":
ctx, sources = query_mempalace(prompt)
else: # hindsight
ctx, sources = query_hindsight(prompt)
# Generate answer
artifact = ask(prompt, backend, ctx, api_base, api_key, model)
artifact["category"] = cat_name
artifact["sources_count"] = len(sources)
artifact["context_char_count"] = len(ctx)
artifact["context_token_est"] = int(len(ctx) * TOKENS_PER_CHAR)
# Score
scores = score_artifact(artifact)
artifact["scores"] = scores
# Save artifact
safe_prompt = "".join(c if c.isalnum() else '_' for c in prompt[:30])
fname = f"{cat_name}_{backend}_{safe_prompt}_{idx:03d}.json"
fpath = artifacts_dir / fname
with open(fpath, "w", encoding="utf-8") as f:
json.dump(artifact, f, indent=2, ensure_ascii=False)
all_artifacts.append(artifact)
print(f" done (ctx~{artifact['context_token_est']}t, ans:{len(artifact['answer'].split())}w, prec:{scores['context_precision']:.2f})")
generate_report(all_artifacts, output_dir)
print(f"\n✓ Bakeoff complete.")
print(f" Report: {output_dir / 'REPORT.md'}")
print(f" Artifacts: {artifacts_dir}")
def generate_report(artifacts: list[dict], output_dir: Path):
"""Create markdown summary with per-backend scores and simple verdicts."""
lines = []
lines.append("# Memory Bakeoff Report\n")
lines.append(f"**Generated:** {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}\n")
lines.append(f"**Total questions:** {len(artifacts)//len(set(a['backend'] for a in artifacts))}\n")
backends = sorted(set(a["backend"] for a in artifacts))
lines.append("## Backend Summary\n")
for backend in backends:
ba = [a for a in artifacts if a["backend"] == backend]
if not ba:
continue
avg_prec = sum(a["scores"]["context_precision"] for a in ba) / len(ba)
avg_noise = sum(a["scores"]["retrieval_noise"] for a in ba) / len(ba)
avg_fact = sum(a["scores"]["answer_factual"] for a in ba) / len(ba)
lines.append(f"### {backend.upper()}\n")
lines.append(f"- Avg context precision: {avg_prec:.1%}\n")
lines.append(f"- Avg retrieval noise: {avg_noise:.1%}\n")
lines.append(f"- Avg answer breadth: {avg_fact:.1%}\n")
lines.append(f"- Runs: {len(ba)}\n\n")
lines.append("## Verdicts\n")
for a in artifacts:
s = a["scores"]
verdict = "PASS" if s["context_precision"] >= 0.25 else "NEEDS_IMPROVEMENT"
lines.append(f"- **{a['backend']} · {a['category']}**: {verdict} "
f"(prec {s['context_precision']:.0%}, noise {s['retrieval_noise']:.0%})\n")
lines.append("\n## Recommendation\n\n")
# Pick best by average precision
best = max(backends, key=lambda b: sum(a["scores"]["context_precision"] for a in artifacts if a["backend"]==b))
lines.append(f"Based on this sample, **{best.upper()}** achieved the highest context precision.\n")
lines.append("For the sovereign Mac-local stack, the recommendation is:\n")
lines.append("- **Baseline** (knowledge/index.json) for fast, deterministic fact lookup;\n")
lines.append("- **MemPalace** for long-horizon narrative/agentic memory;\n")
lines.append("- **Hindsight** requires additional installation and tuning.\n")
lines.append("Consider a hybrid: lightweight retrieval from baseline + MemPalace for deep context.\n")
report_path = output_dir / "REPORT.md"
report_path.write_text("".join(lines), encoding="utf-8")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(description="Memory bakeoff runner")
p.add_argument("--matrix", default="prompts/matrix.json",
help="Path to prompt matrix JSON file")
p.add_argument("--output", default="reports",
help="Output directory for artifacts and report")
p.add_argument("--category",
help="Run only this category (e.g., 'preference_recall')")
p.add_argument("--limit", type=int,
help="Limit number of prompts to run")
p.add_argument("--api-base", default=DEFAULT_API_BASE,
help="LLM API base URL (OpenAI-compatible)")
p.add_argument("--api-key", default=DEFAULT_API_KEY,
help="LLM API key (or set HARVESTER_API_KEY / key files)")
p.add_argument("--model", default=DEFAULT_MODEL,
help="LLM model name to use")
p.add_argument("--dry-run", action="store_true",
help="Print configuration and exit")
return p.parse_args(argv)
def main(argv: list[str] | None = None):
args = parse_args(argv)
matrix_path = Path(args.matrix)
if not matrix_path.exists():
print(f"ERROR: Matrix not found at {matrix_path}", file=sys.stderr)
sys.exit(1)
matrix = load_matrix(matrix_path)
if args.dry_run:
print("Dry run: configuration")
print(f" Matrix: {args.matrix}")
print(f" Categories: {list(matrix['categories'].keys())}")
print(f" Total prompts:{sum(len(c['prompts']) for c in matrix['categories'].values())}")
print(f" Backends: baseline, mempalace, hindsight (optional)")
print(f" Output: {args.output}")
return
run_bakeoff(matrix, args)
if __name__ == "__main__":
main()

View File

@@ -22,114 +22,95 @@ import sys
from pathlib import Path
from typing import Optional
from session_reader import extract_conversation, read_session
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str,
min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a single session object."""
"""Extract terse→rich pairs from a normalized conversation."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
for i, msg in enumerate(conversation):
# Look for assistant responses
if msg.get('role') != 'assistant':
continue
response_text = msg.get("value", "")
response_text = msg.get('content', '')
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding human message
# Find the preceding user message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
if conversation[j].get('role') == 'user':
prompt_text = conversation[j].get('content', '')
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
if prompt_text.startswith('{') and 'output' in prompt_text[:100]:
continue
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'):
continue
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
code_blocks = response_text.count('```')
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50:
continue
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
'terse': prompt_text.strip(),
'rich': clean_response.strip(),
'source': session_id,
'model': model,
'prompt_words': prompt_words,
'response_words': response_words,
'ratio': round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def extract_from_jsonl_file(path: str, **kwargs) -> list:
"""Read a session file and extract training pairs using normalized conversation."""
session_messages = read_session(path)
if not session_messages:
return []
conversation = extract_conversation(session_messages)
# Derive session_id and model from first real message metadata
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {})
session_id = first_msg.get('meta_session_id', Path(path).name)
model = first_msg.get('model', 'unknown')
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs)
def deduplicate_pairs(pairs: list) -> list:

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -0,0 +1,118 @@
"""
Tests for session_pair_harvester — training pair extraction from sessions.
"""
import json
import tempfile
import unittest
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from session_pair_harvester import (
extract_pairs_from_conversation,
extract_from_jsonl_file,
deduplicate_pairs,
compute_hash,
)
class TestSessionPairHarvester(unittest.TestCase):
def test_compute_hash_consistent(self):
h1 = compute_hash("hello world")
h2 = compute_hash("hello world")
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 16)
def test_extract_simple_qa_pair(self):
"""A simple user→assistant exchange produces one pair."""
conversation = [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
]
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
self.assertIn("Paris", pairs[0]["rich"])
self.assertEqual(pairs[0]["source"], "test_session")
def test_min_ratio_filter(self):
"""Very short responses are filtered out."""
conversation = [
{"role": "user", "content": "Yes"},
{"role": "assistant", "content": "No."},
]
# Default min_ratio = 1.5, min_words = 20 for response
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 0)
def test_min_words_filter(self):
"""Assistant responses below min word count are skipped."""
conversation = [
{"role": "user", "content": "Explain the project architecture in detail"},
{"role": "assistant", "content": "OK."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
self.assertEqual(len(pairs), 0)
def test_skip_non_assistant_messages(self):
"""System and tool messages are ignored."""
conversation = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there! How can I help you today?"},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "Hello")
def test_multiple_pairs_from_one_session(self):
"""A conversation with several Q&A turns yields multiple pairs."""
conversation = [
{"role": "user", "content": "First question?"},
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
{"role": "user", "content": "Second?"},
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
self.assertEqual(len(pairs), 2)
def test_deduplication_removes_duplicates(self):
"""Identical pairs across sessions are deduplicated."""
pairs = [
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
self.assertEqual(len(unique), 2)
sources = {p["source"] for p in unique}
# First unique pair can be from either s1 or s2
self.assertIn("s1", sources)
def test_integration_with_test_sessions(self):
"""Harvester finds pairs in real test session files."""
repo_root = Path(__file__).parent.parent
test_sessions_dir = repo_root / "test_sessions"
if not test_sessions_dir.exists():
self.skipTest("test_sessions not found")
pairs = []
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
for p in pairs:
self.assertIn("terse", p)
self.assertIn("rich", p)
self.assertIn("source", p)
self.assertIn("model", p)
# Verify content exists
self.assertGreater(len(p["terse"]), 0)
self.assertGreater(len(p["rich"]), 0)
if __name__ == "__main__":
unittest.main()