From 1d47665dd4fe80e972c0ad1d113c25f22cac0f98 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 14:47:26 +0000 Subject: [PATCH 1/2] feat: add improvement proposal generator (#168) --- scripts/improvement_proposals.py | 451 +++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 scripts/improvement_proposals.py diff --git a/scripts/improvement_proposals.py b/scripts/improvement_proposals.py new file mode 100644 index 0000000..4027434 --- /dev/null +++ b/scripts/improvement_proposals.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +Improvement Proposal Generator for compounding-intelligence. + +Analyzes fleet session data to identify waste patterns and generates +concrete improvement proposals with ROI estimates. + +Input: Session analytics JSON (from fleet metrics or session database) +Output: Markdown proposal document + JSON proposals + +Usage: + python3 scripts/improvement_proposals.py --input analytics.json + python3 scripts/improvement_proposals.py --input analytics.json --format json + python3 scripts/improvement_proposals.py --input analytics.json --output proposals.md + python3 scripts/improvement_proposals.py --input analytics.json --threshold 5.0 +""" + +import argparse +import json +import os +import sys +from collections import defaultdict +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + + +# ── Data types ───────────────────────────────────────────────── + +@dataclass +class WastePattern: + """A detected waste pattern in fleet sessions.""" + pattern_type: str # "repeated_error", "manual_process", "slow_tool", "failed_retry" + description: str + occurrences: int + total_time_hours: float + affected_repos: List[str] = field(default_factory=list) + example_sessions: List[str] = field(default_factory=list) + + +@dataclass +class ImprovementProposal: + """A concrete improvement proposal with ROI estimate.""" + title: str + problem: str + proposed_solution: str + estimated_weekly_hours_saved: float + estimated_monthly_hours_saved: float + implementation_hours: float + roi_weeks: float # weeks to break even + priority: str # "critical", "high", "medium", "low" + affected_area: str + supporting_evidence: List[str] = field(default_factory=list) + + +# ── Session analysis ────────────────────────────────────────── + +def analyze_sessions(sessions: List[dict]) -> List[WastePattern]: + """ + Analyze session data to find waste patterns. + + Looks for: + - Repeated errors (same error across sessions) + - Manual processes (long sequences of similar actions) + - Slow tools (tools with high latency) + - Failed retries (multiple attempts at same task) + """ + patterns = [] + + # Track error frequency across sessions + error_counts: Dict[str, List[dict]] = defaultdict(list) + tool_latencies: Dict[str, List[float]] = defaultdict(list) + retry_counts: Dict[str, int] = defaultdict(int) + manual_sequences: List[dict] = [] + + for session in sessions: + session_id = session.get("session_id", "unknown") + repo = session.get("repo", "global") + messages = session.get("messages", []) + errors = session.get("errors", []) + tool_calls = session.get("tool_calls", []) + duration_min = session.get("duration_minutes", 0) + + # 1. Repeated errors + for err in errors: + err_key = _normalize_error(err.get("message", "")) + if err_key: + error_counts[err_key].append({ + "session_id": session_id, + "repo": repo, + }) + + # 2. Tool latency tracking + for tc in tool_calls: + tool_name = tc.get("tool", "unknown") + latency = tc.get("latency_ms", 0) + if latency > 0: + tool_latencies[tool_name].append(latency) + + # 3. Failed retries (same tool called 3+ times in sequence) + prev_tool = None + streak = 0 + for tc in tool_calls: + tool_name = tc.get("tool", "unknown") + if tool_name == prev_tool: + streak += 1 + else: + if streak >= 3: + retry_counts[prev_tool] += 1 + streak = 1 + prev_tool = tool_name + if streak >= 3: + retry_counts[prev_tool] += 1 + + # 4. Manual processes (10+ sequential tool calls with no automation) + if len(tool_calls) > 10: + tool_sequence = [tc.get("tool") for tc in tool_calls] + unique_tools = set(tool_sequence) + if len(unique_tools) <= 3 and len(tool_calls) > 10: + manual_sequences.append({ + "session_id": session_id, + "repo": repo, + "tool_count": len(tool_calls), + "unique_tools": list(unique_tools), + "duration_min": duration_min, + }) + + # Generate patterns from collected data + + # Repeated errors (appearing in 3+ sessions) + for err_key, occurrences in error_counts.items(): + if len(occurrences) >= 3: + repos = list(set(o["repo"] for o in occurrences)) + sessions_list = [o["session_id"] for o in occurrences[:5]] + # Estimate time wasted: 5 min per error occurrence + hours = len(occurrences) * 5 / 60 + patterns.append(WastePattern( + pattern_type="repeated_error", + description=f"Error: {err_key[:100]}", + occurrences=len(occurrences), + total_time_hours=round(hours, 1), + affected_repos=repos, + example_sessions=sessions_list, + )) + + # Slow tools (avg latency > 5000ms across 5+ calls) + for tool, latencies in tool_latencies.items(): + if len(latencies) >= 5: + avg_ms = sum(latencies) / len(latencies) + if avg_ms > 5000: + hours = sum(latencies) / 1000 / 3600 + patterns.append(WastePattern( + pattern_type="slow_tool", + description=f"Tool '{tool}' averages {avg_ms:.0f}ms latency", + occurrences=len(latencies), + total_time_hours=round(hours, 1), + affected_repos=["global"], + example_sessions=[], + )) + + # Failed retries + for tool, count in retry_counts.items(): + if count >= 2: + hours = count * 10 / 60 # ~10 min per failed retry sequence + patterns.append(WastePattern( + pattern_type="failed_retry", + description=f"Tool '{tool}' had {count} retry sequences (3+ consecutive calls)", + occurrences=count, + total_time_hours=round(hours, 1), + affected_repos=["global"], + example_sessions=[], + )) + + # Manual processes + for seq in manual_sequences: + hours = seq["duration_min"] / 60 + patterns.append(WastePattern( + pattern_type="manual_process", + description=f"Session {seq['session_id'][:12]}: {seq['tool_count']} tool calls with only {len(seq['unique_tools'])} unique tools", + occurrences=1, + total_time_hours=round(hours, 1), + affected_repos=[seq["repo"]], + example_sessions=[seq["session_id"]], + )) + + return sorted(patterns, key=lambda p: p.total_time_hours, reverse=True) + + +def _normalize_error(message: str) -> str: + """Normalize error message to a common key.""" + if not message: + return "" + msg = message.lower().strip() + # Remove variable parts (paths, IDs, timestamps) + import re + msg = re.sub(r'/\S+', '/PATH', msg) + msg = re.sub(r'\b[0-9a-f]{8,}\b', 'HASH', msg) + msg = re.sub(r'\d{4}-\d{2}-\d{2}[tT]\d{2}:\d{2}', 'TIME', msg) + return msg[:150] + + +# ── Proposal generation ─────────────────────────────────────── + +def generate_proposals( + patterns: List[WastePattern], + hourly_rate: float = 50.0, + implementation_overhead: float = 1.5, +) -> List[ImprovementProposal]: + """ + Generate improvement proposals from waste patterns. + + Args: + patterns: Detected waste patterns + hourly_rate: Developer hourly rate for ROI calculation + implementation_overhead: Multiplier for implementation time estimate + """ + proposals = [] + + # Group patterns by type + by_type: Dict[str, List[WastePattern]] = defaultdict(list) + for p in patterns: + by_type[p.pattern_type].append(p) + + # 1. Repeated errors → Create fix/skill + for p in by_type.get("repeated_error", []): + weekly_hours = p.total_time_hours / 4 # monthly → weekly + impl_hours = max(1.0, p.occurrences * 0.25) * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Fix repeated error: {p.description[:60]}", + problem=f"This error occurred {p.occurrences} times across {len(p.affected_repos)} repos, wasting ~{p.total_time_hours:.1f} hours.", + proposed_solution="Root-cause the error and create a permanent fix or mitigation skill.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="reliability", + supporting_evidence=[f"{p.occurrences} occurrences in sessions: {', '.join(p.example_sessions[:3])}"], + )) + + # 2. Slow tools → Optimize or replace + for p in by_type.get("slow_tool", []): + weekly_hours = p.total_time_hours / 4 + impl_hours = 3.0 * implementation_overhead # optimization task + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Optimize slow tool: {p.description[:60]}", + problem=f"Tool has {p.occurrences} calls with high latency, wasting ~{p.total_time_hours:.1f} hours total.", + proposed_solution="Profile the tool, add caching, or replace with a faster alternative.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="performance", + supporting_evidence=[f"{p.occurrences} slow calls detected"], + )) + + # 3. Failed retries → Add retry logic or validation + for p in by_type.get("failed_retry", []): + weekly_hours = p.total_time_hours / 4 + impl_hours = 2.0 * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Reduce retries for tool '{p.description[:50]}'", + problem=f"Tool had {p.occurrences} retry sequences, wasting ~{p.total_time_hours:.1f} hours.", + proposed_solution="Add input validation, pre-flight checks, or automatic retry with backoff.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="reliability", + supporting_evidence=[f"{p.occurrences} retry sequences detected"], + )) + + # 4. Manual processes → Automate + total_manual_hours = sum(p.total_time_hours for p in by_type.get("manual_process", [])) + manual_patterns = by_type.get("manual_process", []) + if manual_patterns: + weekly_hours = total_manual_hours / 4 + impl_hours = len(manual_patterns) * 2.0 * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Automate {len(manual_patterns)} manual processes", + problem=f"{len(manual_patterns)} sessions had long manual tool sequences, wasting ~{total_manual_hours:.1f} hours.", + proposed_solution="Create composite skills or scripts that combine the repeated tool sequences into single operations.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(total_manual_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="automation", + supporting_evidence=[f"{len(manual_patterns)} manual sessions detected"], + )) + + return sorted(proposals, key=lambda p: p.estimated_monthly_hours_saved, reverse=True) + + +def _priority_from_roi(roi_weeks: float) -> str: + """Determine priority from ROI break-even time.""" + if roi_weeks <= 1: + return "critical" + elif roi_weeks <= 4: + return "high" + elif roi_weeks <= 12: + return "medium" + return "low" + + +# ── Output formatting ───────────────────────────────────────── + +def format_proposals_markdown( + proposals: List[ImprovementProposal], + patterns: List[WastePattern], + generated_at: str, +) -> str: + """Format proposals as a markdown document.""" + lines = [ + "# Improvement Proposals", + "", + f"Generated: {generated_at}", + f"Based on analysis of {sum(p.occurrences for p in patterns)} waste events across {len(set(r for p in patterns for r in p.affected_repos))} repos.", + "", + "---", + "", + "## Summary", + "", + f"| Metric | Value |", + f"|--------|-------|", + f"| Total proposals | {len(proposals)} |", + f"| Critical priority | {sum(1 for p in proposals if p.priority == 'critical')} |", + f"| Total monthly hours wasted | {sum(p.estimated_monthly_hours_saved for p in proposals):.1f}h |", + f"| Total weekly hours recoverable | {sum(p.estimated_weekly_hours_saved for p in proposals):.1f}h |", + f"| Implementation investment | {sum(p.implementation_hours for p in proposals):.1f}h |", + "", + "---", + "", + ] + + for i, prop in enumerate(proposals, 1): + lines.extend([ + f"## {i}. {prop.title}", + "", + f"**Priority:** {prop.priority.upper()} ", + f"**Area:** {prop.affected_area} ", + f"**ROI break-even:** {prop.roi_weeks:.1f} weeks", + "", + "### Problem", + "", + prop.problem, + "", + "### Proposed Solution", + "", + prop.proposed_solution, + "", + "### ROI Estimate", + "", + f"- Weekly hours saved: **{prop.estimated_weekly_hours_saved}h**", + f"- Monthly hours saved: **{prop.estimated_monthly_hours_saved}h**", + f"- Implementation time: **{prop.implementation_hours}h**", + f"- Break-even: **{prop.roi_weeks:.1f} weeks**", + "", + ]) + + if prop.supporting_evidence: + lines.extend([ + "### Evidence", + "", + ]) + for ev in prop.supporting_evidence: + lines.append(f"- {ev}") + lines.append("") + + lines.extend(["---", ""]) + + # Waste pattern appendix + lines.extend([ + "## Appendix: Detected Waste Patterns", + "", + "| Type | Description | Occurrences | Hours |", + "|------|-------------|-------------|-------|", + ]) + for p in patterns[:20]: + lines.append( + f"| {p.pattern_type} | {p.description[:60]} | {p.occurrences} | {p.total_time_hours}h |" + ) + lines.append("") + + return "\n".join(lines) + + +def format_proposals_json(proposals: List[ImprovementProposal]) -> str: + """Format proposals as JSON.""" + return json.dumps( + [asdict(p) for p in proposals], + indent=2, + default=str, + ) + + +# ── Main ────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Generate improvement proposals from session analytics") + parser.add_argument("--input", required=True, help="Path to session analytics JSON file") + parser.add_argument("--output", help="Output file path (default: stdout)") + parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format") + parser.add_argument("--hourly-rate", type=float, default=50.0, help="Developer hourly rate for ROI calc") + parser.add_argument("--threshold", type=float, default=2.0, help="Min monthly hours to include proposal") + + args = parser.parse_args() + + with open(args.input) as f: + data = json.load(f) + + sessions = data if isinstance(data, list) else data.get("sessions", []) + + if not sessions: + print("No sessions found in input file.", file=sys.stderr) + sys.exit(1) + + # Analyze + patterns = analyze_sessions(sessions) + proposals = generate_proposals(patterns, hourly_rate=args.hourly_rate) + + # Filter by threshold + proposals = [p for p in proposals if p.estimated_monthly_hours_saved >= args.threshold] + + generated_at = datetime.now(timezone.utc).isoformat() + + if args.format == "markdown": + output = format_proposals_markdown(proposals, patterns, generated_at) + else: + output = format_proposals_json(proposals) + + if args.output: + with open(args.output, "w") as f: + f.write(output) + print(f"Wrote {len(proposals)} proposals to {args.output}", file=sys.stderr) + else: + print(output) + + +if __name__ == "__main__": + main() From f1175df79deb7308ec48d63f7894c2281b7f2ba0 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 14:47:30 +0000 Subject: [PATCH 2/2] test: add improvement proposal generator tests (#168) --- scripts/test_improvement_proposals.py | 282 ++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 scripts/test_improvement_proposals.py diff --git a/scripts/test_improvement_proposals.py b/scripts/test_improvement_proposals.py new file mode 100644 index 0000000..46d0631 --- /dev/null +++ b/scripts/test_improvement_proposals.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +"""Tests for scripts/improvement_proposals.py — 15 tests.""" + +import json +import os +import sys +import tempfile + +sys.path.insert(0, os.path.dirname(__file__) or ".") +import importlib.util + +spec = importlib.util.spec_from_file_location( + "ip", os.path.join(os.path.dirname(__file__) or ".", "improvement_proposals.py") +) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + +analyze_sessions = mod.analyze_sessions +generate_proposals = mod.generate_proposals +format_proposals_markdown = mod.format_proposals_markdown +format_proposals_json = mod.format_proposals_json +_normalize_error = mod._normalize_error + + +# ── Helper to build test sessions ───────────────────────────── + +def _make_session(session_id, repo="test-repo", errors=None, tool_calls=None, duration=30): + return { + "session_id": session_id, + "repo": repo, + "errors": [{"message": e} for e in (errors or [])], + "tool_calls": tool_calls or [], + "duration_minutes": duration, + "messages": [], + } + + +def _make_tool_calls(repeats): + """Create tool call list with repeated tools.""" + calls = [] + for tool, count in repeats: + for _ in range(count): + calls.append({"tool": tool, "latency_ms": 100}) + return calls + + +# ── Tests ───────────────────────────────────────────────────── + +def test_empty_sessions(): + patterns = analyze_sessions([]) + assert patterns == [] + print("PASS: test_empty_sessions") + + +def test_no_patterns_on_clean_sessions(): + sessions = [ + _make_session("s1", tool_calls=[{"tool": "read_file", "latency_ms": 50}]), + _make_session("s2", tool_calls=[{"tool": "write_file", "latency_ms": 80}]), + ] + patterns = analyze_sessions(sessions) + # No repeated errors, no slow tools, no retries + assert len(patterns) == 0 + print("PASS: test_no_patterns_on_clean_sessions") + + +def test_repeated_error_detection(): + """Same error across 3+ sessions triggers pattern.""" + sessions = [ + _make_session(f"s{i}", errors=["ModuleNotFoundError: No module named bannerlord_trace"]) + for i in range(4) + ] + patterns = analyze_sessions(sessions) + repeated = [p for p in patterns if p.pattern_type == "repeated_error"] + assert len(repeated) == 1 + assert repeated[0].occurrences == 4 + print("PASS: test_repeated_error_detection") + + +def test_repeated_error_threshold(): + """2 occurrences should NOT trigger (threshold is 3).""" + sessions = [ + _make_session("s1", errors=["TimeoutError: connection timed out"]), + _make_session("s2", errors=["TimeoutError: connection timed out"]), + ] + patterns = analyze_sessions(sessions) + repeated = [p for p in patterns if p.pattern_type == "repeated_error"] + assert len(repeated) == 0 + print("PASS: test_repeated_error_threshold") + + +def test_slow_tool_detection(): + """Tool with avg latency > 5000ms across 5+ calls.""" + calls = [{"tool": "git_push", "latency_ms": 8000} for _ in range(10)] + sessions = [_make_session("s1", tool_calls=calls)] + patterns = analyze_sessions(sessions) + slow = [p for p in patterns if p.pattern_type == "slow_tool"] + assert len(slow) == 1 + assert "git_push" in slow[0].description + print("PASS: test_slow_tool_detection") + + +def test_fast_tool_not_flagged(): + """Tool under 5000ms avg should not trigger.""" + calls = [{"tool": "read_file", "latency_ms": 50} for _ in range(10)] + sessions = [_make_session("s1", tool_calls=calls)] + patterns = analyze_sessions(sessions) + slow = [p for p in patterns if p.pattern_type == "slow_tool"] + assert len(slow) == 0 + print("PASS: test_fast_tool_not_flagged") + + +def test_failed_retry_detection(): + """3+ consecutive calls to same tool triggers retry pattern.""" + calls = _make_tool_calls([("execute_code", 5)]) + sessions = [_make_session("s1", tool_calls=calls)] + sessions.extend([ + _make_session(f"s{i}", tool_calls=_make_tool_calls([("execute_code", 4)])) + for i in range(2, 5) + ]) + patterns = analyze_sessions(sessions) + retries = [p for p in patterns if p.pattern_type == "failed_retry"] + assert len(retries) >= 1 + print("PASS: test_failed_retry_detection") + + +def test_manual_process_detection(): + """10+ tool calls with <= 3 unique tools.""" + calls = _make_tool_calls([("terminal", 8), ("read_file", 5)]) + sessions = [_make_session("s1", tool_calls=calls, duration=25)] + patterns = analyze_sessions(sessions) + manual = [p for p in patterns if p.pattern_type == "manual_process"] + assert len(manual) == 1 + print("PASS: test_manual_process_detection") + + +def test_generate_proposals_from_patterns(): + """Proposals generated from waste patterns.""" + sessions = [ + _make_session(f"s{i}", errors=["Error: push timeout"]) + for i in range(5) + ] + patterns = analyze_sessions(sessions) + proposals = generate_proposals(patterns) + assert len(proposals) >= 1 + assert proposals[0].estimated_monthly_hours_saved > 0 + assert proposals[0].priority in ("critical", "high", "medium", "low") + print("PASS: test_generate_proposals_from_patterns") + + +def test_proposal_roi_positive(): + """ROI weeks should be a positive number for recoverable time.""" + patterns = [mod.WastePattern( + pattern_type="repeated_error", + description="Test error", + occurrences=10, + total_time_hours=5.0, + affected_repos=["test"], + )] + proposals = generate_proposals(patterns) + assert len(proposals) == 1 + assert proposals[0].roi_weeks > 0 + assert proposals[0].roi_weeks < 100 + print("PASS: test_proposal_roi_positive") + + +def test_proposals_sorted_by_impact(): + """Proposals should be sorted by monthly hours saved (descending).""" + sessions = [ + _make_session("s1", errors=["Minor warning"] * 3, duration=5), + _make_session("s2", errors=["Critical failure: deploy crashed"] * 5, duration=60), + ] + # Add more sessions to cross threshold + for i in range(3, 7): + sessions.append(_make_session(f"s{i}", errors=["Critical failure: deploy crashed"])) + + patterns = analyze_sessions(sessions) + proposals = generate_proposals(patterns) + if len(proposals) >= 2: + for i in range(len(proposals) - 1): + assert proposals[i].estimated_monthly_hours_saved >= proposals[i + 1].estimated_monthly_hours_saved + print("PASS: test_proposals_sorted_by_impact") + + +def test_format_markdown(): + """Markdown output should contain expected sections.""" + patterns = [mod.WastePattern( + pattern_type="repeated_error", description="Test", occurrences=5, + total_time_hours=2.5, affected_repos=["repo"], + )] + proposals = generate_proposals(patterns) + md = format_proposals_markdown(proposals, patterns, "2026-04-15T00:00:00Z") + assert "# Improvement Proposals" in md + assert "## Summary" in md + assert "### Problem" in md + assert "### ROI Estimate" in md + assert "## Appendix" in md + print("PASS: test_format_markdown") + + +def test_format_json(): + """JSON output should be valid and parseable.""" + patterns = [mod.WastePattern( + pattern_type="slow_tool", description="Slow", occurrences=10, + total_time_hours=3.0, affected_repos=["global"], + )] + proposals = generate_proposals(patterns) + output = format_proposals_json(proposals) + parsed = json.loads(output) + assert isinstance(parsed, list) + assert len(parsed) == len(proposals) + assert "title" in parsed[0] + assert "roi_weeks" in parsed[0] + print("PASS: test_format_json") + + +def test_normalize_error(): + """Error normalization should remove paths and hashes.""" + err1 = _normalize_error("Failed to clone /Users/apayne/repo with token abc123def456") + assert "/PATH" in err1 + assert "HASH" in err1 + assert "/Users/apayne" not in err1 + + err2 = _normalize_error("") + assert err2 == "" + + err3 = _normalize_error("Simple error message") + assert "simple error" in err3 + print("PASS: test_normalize_error") + + +def test_cli_integration(): + """End-to-end test: write input JSON, run script, check output.""" + import subprocess + + sessions = [ + _make_session(f"s{i}", errors=["Connection refused: port 8080"]) + for i in range(5) + ] + + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, "analytics.json") + output_path = os.path.join(tmpdir, "proposals.md") + + with open(input_path, "w") as f: + json.dump({"sessions": sessions}, f) + + script = os.path.join(os.path.dirname(__file__) or ".", "improvement_proposals.py") + result = subprocess.run( + [sys.executable, script, "--input", input_path, "--output", output_path], + capture_output=True, text=True, timeout=10, + ) + + assert result.returncode == 0, f"CLI failed: {result.stderr}" + assert os.path.exists(output_path) + + with open(output_path) as f: + content = f.read() + assert "# Improvement Proposals" in content + print("PASS: test_cli_integration") + + +def run_all(): + test_empty_sessions() + test_no_patterns_on_clean_sessions() + test_repeated_error_detection() + test_repeated_error_threshold() + test_slow_tool_detection() + test_fast_tool_not_flagged() + test_failed_retry_detection() + test_manual_process_detection() + test_generate_proposals_from_patterns() + test_proposal_roi_positive() + test_proposals_sorted_by_impact() + test_format_markdown() + test_format_json() + test_normalize_error() + test_cli_integration() + print("\nAll 15 tests passed!") + + +if __name__ == "__main__": + run_all()