Compare commits

..

2 Commits

6 changed files with 324 additions and 993 deletions

View File

@@ -1,451 +0,0 @@
#!/usr/bin/env python3
"""
Improvement Proposal Generator for compounding-intelligence.
Analyzes fleet session data to identify waste patterns and generates
concrete improvement proposals with ROI estimates.
Input: Session analytics JSON (from fleet metrics or session database)
Output: Markdown proposal document + JSON proposals
Usage:
python3 scripts/improvement_proposals.py --input analytics.json
python3 scripts/improvement_proposals.py --input analytics.json --format json
python3 scripts/improvement_proposals.py --input analytics.json --output proposals.md
python3 scripts/improvement_proposals.py --input analytics.json --threshold 5.0
"""
import argparse
import json
import os
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
# ── Data types ─────────────────────────────────────────────────
@dataclass
class WastePattern:
"""A detected waste pattern in fleet sessions."""
pattern_type: str # "repeated_error", "manual_process", "slow_tool", "failed_retry"
description: str
occurrences: int
total_time_hours: float
affected_repos: List[str] = field(default_factory=list)
example_sessions: List[str] = field(default_factory=list)
@dataclass
class ImprovementProposal:
"""A concrete improvement proposal with ROI estimate."""
title: str
problem: str
proposed_solution: str
estimated_weekly_hours_saved: float
estimated_monthly_hours_saved: float
implementation_hours: float
roi_weeks: float # weeks to break even
priority: str # "critical", "high", "medium", "low"
affected_area: str
supporting_evidence: List[str] = field(default_factory=list)
# ── Session analysis ──────────────────────────────────────────
def analyze_sessions(sessions: List[dict]) -> List[WastePattern]:
"""
Analyze session data to find waste patterns.
Looks for:
- Repeated errors (same error across sessions)
- Manual processes (long sequences of similar actions)
- Slow tools (tools with high latency)
- Failed retries (multiple attempts at same task)
"""
patterns = []
# Track error frequency across sessions
error_counts: Dict[str, List[dict]] = defaultdict(list)
tool_latencies: Dict[str, List[float]] = defaultdict(list)
retry_counts: Dict[str, int] = defaultdict(int)
manual_sequences: List[dict] = []
for session in sessions:
session_id = session.get("session_id", "unknown")
repo = session.get("repo", "global")
messages = session.get("messages", [])
errors = session.get("errors", [])
tool_calls = session.get("tool_calls", [])
duration_min = session.get("duration_minutes", 0)
# 1. Repeated errors
for err in errors:
err_key = _normalize_error(err.get("message", ""))
if err_key:
error_counts[err_key].append({
"session_id": session_id,
"repo": repo,
})
# 2. Tool latency tracking
for tc in tool_calls:
tool_name = tc.get("tool", "unknown")
latency = tc.get("latency_ms", 0)
if latency > 0:
tool_latencies[tool_name].append(latency)
# 3. Failed retries (same tool called 3+ times in sequence)
prev_tool = None
streak = 0
for tc in tool_calls:
tool_name = tc.get("tool", "unknown")
if tool_name == prev_tool:
streak += 1
else:
if streak >= 3:
retry_counts[prev_tool] += 1
streak = 1
prev_tool = tool_name
if streak >= 3:
retry_counts[prev_tool] += 1
# 4. Manual processes (10+ sequential tool calls with no automation)
if len(tool_calls) > 10:
tool_sequence = [tc.get("tool") for tc in tool_calls]
unique_tools = set(tool_sequence)
if len(unique_tools) <= 3 and len(tool_calls) > 10:
manual_sequences.append({
"session_id": session_id,
"repo": repo,
"tool_count": len(tool_calls),
"unique_tools": list(unique_tools),
"duration_min": duration_min,
})
# Generate patterns from collected data
# Repeated errors (appearing in 3+ sessions)
for err_key, occurrences in error_counts.items():
if len(occurrences) >= 3:
repos = list(set(o["repo"] for o in occurrences))
sessions_list = [o["session_id"] for o in occurrences[:5]]
# Estimate time wasted: 5 min per error occurrence
hours = len(occurrences) * 5 / 60
patterns.append(WastePattern(
pattern_type="repeated_error",
description=f"Error: {err_key[:100]}",
occurrences=len(occurrences),
total_time_hours=round(hours, 1),
affected_repos=repos,
example_sessions=sessions_list,
))
# Slow tools (avg latency > 5000ms across 5+ calls)
for tool, latencies in tool_latencies.items():
if len(latencies) >= 5:
avg_ms = sum(latencies) / len(latencies)
if avg_ms > 5000:
hours = sum(latencies) / 1000 / 3600
patterns.append(WastePattern(
pattern_type="slow_tool",
description=f"Tool '{tool}' averages {avg_ms:.0f}ms latency",
occurrences=len(latencies),
total_time_hours=round(hours, 1),
affected_repos=["global"],
example_sessions=[],
))
# Failed retries
for tool, count in retry_counts.items():
if count >= 2:
hours = count * 10 / 60 # ~10 min per failed retry sequence
patterns.append(WastePattern(
pattern_type="failed_retry",
description=f"Tool '{tool}' had {count} retry sequences (3+ consecutive calls)",
occurrences=count,
total_time_hours=round(hours, 1),
affected_repos=["global"],
example_sessions=[],
))
# Manual processes
for seq in manual_sequences:
hours = seq["duration_min"] / 60
patterns.append(WastePattern(
pattern_type="manual_process",
description=f"Session {seq['session_id'][:12]}: {seq['tool_count']} tool calls with only {len(seq['unique_tools'])} unique tools",
occurrences=1,
total_time_hours=round(hours, 1),
affected_repos=[seq["repo"]],
example_sessions=[seq["session_id"]],
))
return sorted(patterns, key=lambda p: p.total_time_hours, reverse=True)
def _normalize_error(message: str) -> str:
"""Normalize error message to a common key."""
if not message:
return ""
msg = message.lower().strip()
# Remove variable parts (paths, IDs, timestamps)
import re
msg = re.sub(r'/\S+', '/PATH', msg)
msg = re.sub(r'\b[0-9a-f]{8,}\b', 'HASH', msg)
msg = re.sub(r'\d{4}-\d{2}-\d{2}[tT]\d{2}:\d{2}', 'TIME', msg)
return msg[:150]
# ── Proposal generation ───────────────────────────────────────
def generate_proposals(
patterns: List[WastePattern],
hourly_rate: float = 50.0,
implementation_overhead: float = 1.5,
) -> List[ImprovementProposal]:
"""
Generate improvement proposals from waste patterns.
Args:
patterns: Detected waste patterns
hourly_rate: Developer hourly rate for ROI calculation
implementation_overhead: Multiplier for implementation time estimate
"""
proposals = []
# Group patterns by type
by_type: Dict[str, List[WastePattern]] = defaultdict(list)
for p in patterns:
by_type[p.pattern_type].append(p)
# 1. Repeated errors → Create fix/skill
for p in by_type.get("repeated_error", []):
weekly_hours = p.total_time_hours / 4 # monthly → weekly
impl_hours = max(1.0, p.occurrences * 0.25) * implementation_overhead
roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf')
proposals.append(ImprovementProposal(
title=f"Fix repeated error: {p.description[:60]}",
problem=f"This error occurred {p.occurrences} times across {len(p.affected_repos)} repos, wasting ~{p.total_time_hours:.1f} hours.",
proposed_solution="Root-cause the error and create a permanent fix or mitigation skill.",
estimated_weekly_hours_saved=round(weekly_hours, 1),
estimated_monthly_hours_saved=round(p.total_time_hours, 1),
implementation_hours=round(impl_hours, 1),
roi_weeks=round(roi_weeks, 1),
priority=_priority_from_roi(roi_weeks),
affected_area="reliability",
supporting_evidence=[f"{p.occurrences} occurrences in sessions: {', '.join(p.example_sessions[:3])}"],
))
# 2. Slow tools → Optimize or replace
for p in by_type.get("slow_tool", []):
weekly_hours = p.total_time_hours / 4
impl_hours = 3.0 * implementation_overhead # optimization task
roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf')
proposals.append(ImprovementProposal(
title=f"Optimize slow tool: {p.description[:60]}",
problem=f"Tool has {p.occurrences} calls with high latency, wasting ~{p.total_time_hours:.1f} hours total.",
proposed_solution="Profile the tool, add caching, or replace with a faster alternative.",
estimated_weekly_hours_saved=round(weekly_hours, 1),
estimated_monthly_hours_saved=round(p.total_time_hours, 1),
implementation_hours=round(impl_hours, 1),
roi_weeks=round(roi_weeks, 1),
priority=_priority_from_roi(roi_weeks),
affected_area="performance",
supporting_evidence=[f"{p.occurrences} slow calls detected"],
))
# 3. Failed retries → Add retry logic or validation
for p in by_type.get("failed_retry", []):
weekly_hours = p.total_time_hours / 4
impl_hours = 2.0 * implementation_overhead
roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf')
proposals.append(ImprovementProposal(
title=f"Reduce retries for tool '{p.description[:50]}'",
problem=f"Tool had {p.occurrences} retry sequences, wasting ~{p.total_time_hours:.1f} hours.",
proposed_solution="Add input validation, pre-flight checks, or automatic retry with backoff.",
estimated_weekly_hours_saved=round(weekly_hours, 1),
estimated_monthly_hours_saved=round(p.total_time_hours, 1),
implementation_hours=round(impl_hours, 1),
roi_weeks=round(roi_weeks, 1),
priority=_priority_from_roi(roi_weeks),
affected_area="reliability",
supporting_evidence=[f"{p.occurrences} retry sequences detected"],
))
# 4. Manual processes → Automate
total_manual_hours = sum(p.total_time_hours for p in by_type.get("manual_process", []))
manual_patterns = by_type.get("manual_process", [])
if manual_patterns:
weekly_hours = total_manual_hours / 4
impl_hours = len(manual_patterns) * 2.0 * implementation_overhead
roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf')
proposals.append(ImprovementProposal(
title=f"Automate {len(manual_patterns)} manual processes",
problem=f"{len(manual_patterns)} sessions had long manual tool sequences, wasting ~{total_manual_hours:.1f} hours.",
proposed_solution="Create composite skills or scripts that combine the repeated tool sequences into single operations.",
estimated_weekly_hours_saved=round(weekly_hours, 1),
estimated_monthly_hours_saved=round(total_manual_hours, 1),
implementation_hours=round(impl_hours, 1),
roi_weeks=round(roi_weeks, 1),
priority=_priority_from_roi(roi_weeks),
affected_area="automation",
supporting_evidence=[f"{len(manual_patterns)} manual sessions detected"],
))
return sorted(proposals, key=lambda p: p.estimated_monthly_hours_saved, reverse=True)
def _priority_from_roi(roi_weeks: float) -> str:
"""Determine priority from ROI break-even time."""
if roi_weeks <= 1:
return "critical"
elif roi_weeks <= 4:
return "high"
elif roi_weeks <= 12:
return "medium"
return "low"
# ── Output formatting ─────────────────────────────────────────
def format_proposals_markdown(
proposals: List[ImprovementProposal],
patterns: List[WastePattern],
generated_at: str,
) -> str:
"""Format proposals as a markdown document."""
lines = [
"# Improvement Proposals",
"",
f"Generated: {generated_at}",
f"Based on analysis of {sum(p.occurrences for p in patterns)} waste events across {len(set(r for p in patterns for r in p.affected_repos))} repos.",
"",
"---",
"",
"## Summary",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total proposals | {len(proposals)} |",
f"| Critical priority | {sum(1 for p in proposals if p.priority == 'critical')} |",
f"| Total monthly hours wasted | {sum(p.estimated_monthly_hours_saved for p in proposals):.1f}h |",
f"| Total weekly hours recoverable | {sum(p.estimated_weekly_hours_saved for p in proposals):.1f}h |",
f"| Implementation investment | {sum(p.implementation_hours for p in proposals):.1f}h |",
"",
"---",
"",
]
for i, prop in enumerate(proposals, 1):
lines.extend([
f"## {i}. {prop.title}",
"",
f"**Priority:** {prop.priority.upper()} ",
f"**Area:** {prop.affected_area} ",
f"**ROI break-even:** {prop.roi_weeks:.1f} weeks",
"",
"### Problem",
"",
prop.problem,
"",
"### Proposed Solution",
"",
prop.proposed_solution,
"",
"### ROI Estimate",
"",
f"- Weekly hours saved: **{prop.estimated_weekly_hours_saved}h**",
f"- Monthly hours saved: **{prop.estimated_monthly_hours_saved}h**",
f"- Implementation time: **{prop.implementation_hours}h**",
f"- Break-even: **{prop.roi_weeks:.1f} weeks**",
"",
])
if prop.supporting_evidence:
lines.extend([
"### Evidence",
"",
])
for ev in prop.supporting_evidence:
lines.append(f"- {ev}")
lines.append("")
lines.extend(["---", ""])
# Waste pattern appendix
lines.extend([
"## Appendix: Detected Waste Patterns",
"",
"| Type | Description | Occurrences | Hours |",
"|------|-------------|-------------|-------|",
])
for p in patterns[:20]:
lines.append(
f"| {p.pattern_type} | {p.description[:60]} | {p.occurrences} | {p.total_time_hours}h |"
)
lines.append("")
return "\n".join(lines)
def format_proposals_json(proposals: List[ImprovementProposal]) -> str:
"""Format proposals as JSON."""
return json.dumps(
[asdict(p) for p in proposals],
indent=2,
default=str,
)
# ── Main ──────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Generate improvement proposals from session analytics")
parser.add_argument("--input", required=True, help="Path to session analytics JSON file")
parser.add_argument("--output", help="Output file path (default: stdout)")
parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format")
parser.add_argument("--hourly-rate", type=float, default=50.0, help="Developer hourly rate for ROI calc")
parser.add_argument("--threshold", type=float, default=2.0, help="Min monthly hours to include proposal")
args = parser.parse_args()
with open(args.input) as f:
data = json.load(f)
sessions = data if isinstance(data, list) else data.get("sessions", [])
if not sessions:
print("No sessions found in input file.", file=sys.stderr)
sys.exit(1)
# Analyze
patterns = analyze_sessions(sessions)
proposals = generate_proposals(patterns, hourly_rate=args.hourly_rate)
# Filter by threshold
proposals = [p for p in proposals if p.estimated_monthly_hours_saved >= args.threshold]
generated_at = datetime.now(timezone.utc).isoformat()
if args.format == "markdown":
output = format_proposals_markdown(proposals, patterns, generated_at)
else:
output = format_proposals_json(proposals)
if args.output:
with open(args.output, "w") as f:
f.write(output)
print(f"Wrote {len(proposals)} proposals to {args.output}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Session Transcript → Training Pair Harvester
Scans Hermes session JSONL files for Q&A patterns and extracts
terse→rich training pairs. Outputs JSONL matching the timmy-config
training pairs spec.
Usage:
python3 scripts/session_pair_harvester.py ~/.hermes/sessions/
python3 scripts/session_pair_harvester.py session.jsonl --output pairs.jsonl
python3 scripts/session_pair_harvester.py --dir ~/.hermes/sessions/ --min-ratio 2.0
Output format:
{"terse": "user short prompt", "rich": "ai detailed response", "source": "session_id", "model": "..."}
"""
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a single session object."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
continue
response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding human message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list:
"""Remove duplicate pairs across files."""
seen = set()
unique = []
for pair in pairs:
key = compute_hash(pair["terse"] + pair["rich"][:200])
if key not in seen:
seen.add(key)
unique.append(pair)
return unique
def main():
parser = argparse.ArgumentParser(description="Harvest training pairs from session transcripts")
parser.add_argument("input", nargs="?", help="Session JSONL file or directory")
parser.add_argument("--dir", "-d", help="Directory to scan for session files")
parser.add_argument("--output", "-o", default="harvested_pairs.jsonl", help="Output file")
parser.add_argument("--min-ratio", type=float, default=1.5, help="Min response/prompt word ratio")
parser.add_argument("--min-words", type=int, default=20, help="Min response word count")
parser.add_argument("--dry-run", action="store_true", help="Print stats without writing")
args = parser.parse_args()
all_pairs = []
files_scanned = 0
scan_dir = args.dir or args.input
if not scan_dir:
parser.print_help()
sys.exit(1)
scan_path = Path(scan_dir)
if scan_path.is_dir():
jsonl_files = sorted(scan_path.rglob("*.jsonl"))
print(f"Scanning {len(jsonl_files)} files in {scan_dir}...", file=sys.stderr)
for fpath in jsonl_files:
pairs = extract_from_jsonl_file(
str(fpath),
min_ratio=args.min_ratio,
min_response_words=args.min_words
)
all_pairs.extend(pairs)
files_scanned += 1
else:
pairs = extract_from_jsonl_file(
str(scan_path),
min_ratio=args.min_ratio,
min_response_words=args.min_words
)
all_pairs.extend(pairs)
files_scanned = 1
# Deduplicate
unique_pairs = deduplicate_pairs(all_pairs)
# Stats
if unique_pairs:
avg_prompt = sum(p["prompt_words"] for p in unique_pairs) / len(unique_pairs)
avg_response = sum(p["response_words"] for p in unique_pairs) / len(unique_pairs)
avg_ratio = sum(p["ratio"] for p in unique_pairs) / len(unique_pairs)
else:
avg_prompt = avg_response = avg_ratio = 0
stats = {
"files_scanned": files_scanned,
"raw_pairs": len(all_pairs),
"unique_pairs": len(unique_pairs),
"duplicates_removed": len(all_pairs) - len(unique_pairs),
"avg_prompt_words": round(avg_prompt, 1),
"avg_response_words": round(avg_response, 1),
"avg_ratio": round(avg_ratio, 2),
}
print(json.dumps(stats, indent=2), file=sys.stderr)
if args.dry_run:
# Print sample pairs
for pair in unique_pairs[:3]:
print(f"\n--- Source: {pair['source']} (ratio: {pair['ratio']}) ---", file=sys.stderr)
print(f"TERSE: {pair['terse'][:100]}...", file=sys.stderr)
print(f"RICH: {pair['rich'][:150]}...", file=sys.stderr)
return
# Write output
output_path = Path(args.output)
with open(output_path, "w") as f:
for pair in unique_pairs:
# Strip internal fields for output
output = {
"terse": pair["terse"],
"rich": pair["rich"],
"source": pair["source"],
"model": pair["model"],
}
f.write(json.dumps(output) + "\n")
print(f"\nWrote {len(unique_pairs)} pairs to {output_path}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,282 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/improvement_proposals.py — 15 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"ip", os.path.join(os.path.dirname(__file__) or ".", "improvement_proposals.py")
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
analyze_sessions = mod.analyze_sessions
generate_proposals = mod.generate_proposals
format_proposals_markdown = mod.format_proposals_markdown
format_proposals_json = mod.format_proposals_json
_normalize_error = mod._normalize_error
# ── Helper to build test sessions ─────────────────────────────
def _make_session(session_id, repo="test-repo", errors=None, tool_calls=None, duration=30):
return {
"session_id": session_id,
"repo": repo,
"errors": [{"message": e} for e in (errors or [])],
"tool_calls": tool_calls or [],
"duration_minutes": duration,
"messages": [],
}
def _make_tool_calls(repeats):
"""Create tool call list with repeated tools."""
calls = []
for tool, count in repeats:
for _ in range(count):
calls.append({"tool": tool, "latency_ms": 100})
return calls
# ── Tests ─────────────────────────────────────────────────────
def test_empty_sessions():
patterns = analyze_sessions([])
assert patterns == []
print("PASS: test_empty_sessions")
def test_no_patterns_on_clean_sessions():
sessions = [
_make_session("s1", tool_calls=[{"tool": "read_file", "latency_ms": 50}]),
_make_session("s2", tool_calls=[{"tool": "write_file", "latency_ms": 80}]),
]
patterns = analyze_sessions(sessions)
# No repeated errors, no slow tools, no retries
assert len(patterns) == 0
print("PASS: test_no_patterns_on_clean_sessions")
def test_repeated_error_detection():
"""Same error across 3+ sessions triggers pattern."""
sessions = [
_make_session(f"s{i}", errors=["ModuleNotFoundError: No module named bannerlord_trace"])
for i in range(4)
]
patterns = analyze_sessions(sessions)
repeated = [p for p in patterns if p.pattern_type == "repeated_error"]
assert len(repeated) == 1
assert repeated[0].occurrences == 4
print("PASS: test_repeated_error_detection")
def test_repeated_error_threshold():
"""2 occurrences should NOT trigger (threshold is 3)."""
sessions = [
_make_session("s1", errors=["TimeoutError: connection timed out"]),
_make_session("s2", errors=["TimeoutError: connection timed out"]),
]
patterns = analyze_sessions(sessions)
repeated = [p for p in patterns if p.pattern_type == "repeated_error"]
assert len(repeated) == 0
print("PASS: test_repeated_error_threshold")
def test_slow_tool_detection():
"""Tool with avg latency > 5000ms across 5+ calls."""
calls = [{"tool": "git_push", "latency_ms": 8000} for _ in range(10)]
sessions = [_make_session("s1", tool_calls=calls)]
patterns = analyze_sessions(sessions)
slow = [p for p in patterns if p.pattern_type == "slow_tool"]
assert len(slow) == 1
assert "git_push" in slow[0].description
print("PASS: test_slow_tool_detection")
def test_fast_tool_not_flagged():
"""Tool under 5000ms avg should not trigger."""
calls = [{"tool": "read_file", "latency_ms": 50} for _ in range(10)]
sessions = [_make_session("s1", tool_calls=calls)]
patterns = analyze_sessions(sessions)
slow = [p for p in patterns if p.pattern_type == "slow_tool"]
assert len(slow) == 0
print("PASS: test_fast_tool_not_flagged")
def test_failed_retry_detection():
"""3+ consecutive calls to same tool triggers retry pattern."""
calls = _make_tool_calls([("execute_code", 5)])
sessions = [_make_session("s1", tool_calls=calls)]
sessions.extend([
_make_session(f"s{i}", tool_calls=_make_tool_calls([("execute_code", 4)]))
for i in range(2, 5)
])
patterns = analyze_sessions(sessions)
retries = [p for p in patterns if p.pattern_type == "failed_retry"]
assert len(retries) >= 1
print("PASS: test_failed_retry_detection")
def test_manual_process_detection():
"""10+ tool calls with <= 3 unique tools."""
calls = _make_tool_calls([("terminal", 8), ("read_file", 5)])
sessions = [_make_session("s1", tool_calls=calls, duration=25)]
patterns = analyze_sessions(sessions)
manual = [p for p in patterns if p.pattern_type == "manual_process"]
assert len(manual) == 1
print("PASS: test_manual_process_detection")
def test_generate_proposals_from_patterns():
"""Proposals generated from waste patterns."""
sessions = [
_make_session(f"s{i}", errors=["Error: push timeout"])
for i in range(5)
]
patterns = analyze_sessions(sessions)
proposals = generate_proposals(patterns)
assert len(proposals) >= 1
assert proposals[0].estimated_monthly_hours_saved > 0
assert proposals[0].priority in ("critical", "high", "medium", "low")
print("PASS: test_generate_proposals_from_patterns")
def test_proposal_roi_positive():
"""ROI weeks should be a positive number for recoverable time."""
patterns = [mod.WastePattern(
pattern_type="repeated_error",
description="Test error",
occurrences=10,
total_time_hours=5.0,
affected_repos=["test"],
)]
proposals = generate_proposals(patterns)
assert len(proposals) == 1
assert proposals[0].roi_weeks > 0
assert proposals[0].roi_weeks < 100
print("PASS: test_proposal_roi_positive")
def test_proposals_sorted_by_impact():
"""Proposals should be sorted by monthly hours saved (descending)."""
sessions = [
_make_session("s1", errors=["Minor warning"] * 3, duration=5),
_make_session("s2", errors=["Critical failure: deploy crashed"] * 5, duration=60),
]
# Add more sessions to cross threshold
for i in range(3, 7):
sessions.append(_make_session(f"s{i}", errors=["Critical failure: deploy crashed"]))
patterns = analyze_sessions(sessions)
proposals = generate_proposals(patterns)
if len(proposals) >= 2:
for i in range(len(proposals) - 1):
assert proposals[i].estimated_monthly_hours_saved >= proposals[i + 1].estimated_monthly_hours_saved
print("PASS: test_proposals_sorted_by_impact")
def test_format_markdown():
"""Markdown output should contain expected sections."""
patterns = [mod.WastePattern(
pattern_type="repeated_error", description="Test", occurrences=5,
total_time_hours=2.5, affected_repos=["repo"],
)]
proposals = generate_proposals(patterns)
md = format_proposals_markdown(proposals, patterns, "2026-04-15T00:00:00Z")
assert "# Improvement Proposals" in md
assert "## Summary" in md
assert "### Problem" in md
assert "### ROI Estimate" in md
assert "## Appendix" in md
print("PASS: test_format_markdown")
def test_format_json():
"""JSON output should be valid and parseable."""
patterns = [mod.WastePattern(
pattern_type="slow_tool", description="Slow", occurrences=10,
total_time_hours=3.0, affected_repos=["global"],
)]
proposals = generate_proposals(patterns)
output = format_proposals_json(proposals)
parsed = json.loads(output)
assert isinstance(parsed, list)
assert len(parsed) == len(proposals)
assert "title" in parsed[0]
assert "roi_weeks" in parsed[0]
print("PASS: test_format_json")
def test_normalize_error():
"""Error normalization should remove paths and hashes."""
err1 = _normalize_error("Failed to clone /Users/apayne/repo with token abc123def456")
assert "/PATH" in err1
assert "HASH" in err1
assert "/Users/apayne" not in err1
err2 = _normalize_error("")
assert err2 == ""
err3 = _normalize_error("Simple error message")
assert "simple error" in err3
print("PASS: test_normalize_error")
def test_cli_integration():
"""End-to-end test: write input JSON, run script, check output."""
import subprocess
sessions = [
_make_session(f"s{i}", errors=["Connection refused: port 8080"])
for i in range(5)
]
with tempfile.TemporaryDirectory() as tmpdir:
input_path = os.path.join(tmpdir, "analytics.json")
output_path = os.path.join(tmpdir, "proposals.md")
with open(input_path, "w") as f:
json.dump({"sessions": sessions}, f)
script = os.path.join(os.path.dirname(__file__) or ".", "improvement_proposals.py")
result = subprocess.run(
[sys.executable, script, "--input", input_path, "--output", output_path],
capture_output=True, text=True, timeout=10,
)
assert result.returncode == 0, f"CLI failed: {result.stderr}"
assert os.path.exists(output_path)
with open(output_path) as f:
content = f.read()
assert "# Improvement Proposals" in content
print("PASS: test_cli_integration")
def run_all():
test_empty_sessions()
test_no_patterns_on_clean_sessions()
test_repeated_error_detection()
test_repeated_error_threshold()
test_slow_tool_detection()
test_fast_tool_not_flagged()
test_failed_retry_detection()
test_manual_process_detection()
test_generate_proposals_from_patterns()
test_proposal_roi_positive()
test_proposals_sorted_by_impact()
test_format_markdown()
test_format_json()
test_normalize_error()
test_cli_integration()
print("\nAll 15 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""Tests for session_pair_harvester."""
import json
import sys
import os
import tempfile
sys.path.insert(0, os.path.dirname(__file__))
from session_pair_harvester import extract_pairs_from_session, deduplicate_pairs, compute_hash
def test_basic_extraction():
session = {
"id": "test_001",
"model": "test-model",
"conversations": [
{"from": "system", "value": "You are helpful."},
{"from": "human", "value": "What is Python?"},
{"from": "gpt", "value": "Python is a high-level programming language known for its readability and versatility. It supports multiple paradigms including procedural, object-oriented, and functional programming. Python is widely used in web development, data science, machine learning, and automation."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 1
assert pairs[0]["terse"] == "What is Python?"
assert "programming language" in pairs[0]["rich"]
assert pairs[0]["source"] == "test_001"
print("PASS: test_basic_extraction")
def test_filters_short_responses():
session = {
"id": "test_002",
"model": "test",
"conversations": [
{"from": "human", "value": "Hi"},
{"from": "gpt", "value": "Hello!"},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=20)
assert len(pairs) == 0
print("PASS: test_filters_short_responses")
def test_skips_tool_results():
session = {
"id": "test_003",
"model": "test",
"conversations": [
{"from": "human", "value": '{"output": "file content", "exit_code": 0}'},
{"from": "gpt", "value": "The file was read successfully. Now let me analyze the content and provide a detailed summary of what was found in the file system."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 0
print("PASS: test_skips_tool_results")
def test_deduplication():
pairs = [
{"terse": "What is X?", "rich": "X is Y.", "source": "s1", "model": "m"},
{"terse": "What is X?", "rich": "X is Y.", "source": "s2", "model": "m"},
{"terse": "What is Z?", "rich": "Z is W.", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
assert len(unique) == 2
print("PASS: test_deduplication")
def test_ratio_filter():
session = {
"id": "test_005",
"model": "test",
"conversations": [
{"from": "human", "value": "Explain quantum computing in detail with examples and applications"},
{"from": "gpt", "value": "OK."},
]
}
pairs = extract_pairs_from_session(session, min_ratio=1.5, min_response_words=10)
assert len(pairs) == 0 # response too short relative to prompt
print("PASS: test_ratio_filter")
if __name__ == "__main__":
test_basic_extraction()
test_filters_short_responses()
test_skips_tool_results()
test_deduplication()
test_ratio_filter()
print("\nAll tests passed.")