From 1d47665dd4fe80e972c0ad1d113c25f22cac0f98 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 14:47:26 +0000 Subject: [PATCH] feat: add improvement proposal generator (#168) --- scripts/improvement_proposals.py | 451 +++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 scripts/improvement_proposals.py diff --git a/scripts/improvement_proposals.py b/scripts/improvement_proposals.py new file mode 100644 index 0000000..4027434 --- /dev/null +++ b/scripts/improvement_proposals.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +Improvement Proposal Generator for compounding-intelligence. + +Analyzes fleet session data to identify waste patterns and generates +concrete improvement proposals with ROI estimates. + +Input: Session analytics JSON (from fleet metrics or session database) +Output: Markdown proposal document + JSON proposals + +Usage: + python3 scripts/improvement_proposals.py --input analytics.json + python3 scripts/improvement_proposals.py --input analytics.json --format json + python3 scripts/improvement_proposals.py --input analytics.json --output proposals.md + python3 scripts/improvement_proposals.py --input analytics.json --threshold 5.0 +""" + +import argparse +import json +import os +import sys +from collections import defaultdict +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + + +# ── Data types ───────────────────────────────────────────────── + +@dataclass +class WastePattern: + """A detected waste pattern in fleet sessions.""" + pattern_type: str # "repeated_error", "manual_process", "slow_tool", "failed_retry" + description: str + occurrences: int + total_time_hours: float + affected_repos: List[str] = field(default_factory=list) + example_sessions: List[str] = field(default_factory=list) + + +@dataclass +class ImprovementProposal: + """A concrete improvement proposal with ROI estimate.""" + title: str + problem: str + proposed_solution: str + estimated_weekly_hours_saved: float + estimated_monthly_hours_saved: float + implementation_hours: float + roi_weeks: float # weeks to break even + priority: str # "critical", "high", "medium", "low" + affected_area: str + supporting_evidence: List[str] = field(default_factory=list) + + +# ── Session analysis ────────────────────────────────────────── + +def analyze_sessions(sessions: List[dict]) -> List[WastePattern]: + """ + Analyze session data to find waste patterns. + + Looks for: + - Repeated errors (same error across sessions) + - Manual processes (long sequences of similar actions) + - Slow tools (tools with high latency) + - Failed retries (multiple attempts at same task) + """ + patterns = [] + + # Track error frequency across sessions + error_counts: Dict[str, List[dict]] = defaultdict(list) + tool_latencies: Dict[str, List[float]] = defaultdict(list) + retry_counts: Dict[str, int] = defaultdict(int) + manual_sequences: List[dict] = [] + + for session in sessions: + session_id = session.get("session_id", "unknown") + repo = session.get("repo", "global") + messages = session.get("messages", []) + errors = session.get("errors", []) + tool_calls = session.get("tool_calls", []) + duration_min = session.get("duration_minutes", 0) + + # 1. Repeated errors + for err in errors: + err_key = _normalize_error(err.get("message", "")) + if err_key: + error_counts[err_key].append({ + "session_id": session_id, + "repo": repo, + }) + + # 2. Tool latency tracking + for tc in tool_calls: + tool_name = tc.get("tool", "unknown") + latency = tc.get("latency_ms", 0) + if latency > 0: + tool_latencies[tool_name].append(latency) + + # 3. Failed retries (same tool called 3+ times in sequence) + prev_tool = None + streak = 0 + for tc in tool_calls: + tool_name = tc.get("tool", "unknown") + if tool_name == prev_tool: + streak += 1 + else: + if streak >= 3: + retry_counts[prev_tool] += 1 + streak = 1 + prev_tool = tool_name + if streak >= 3: + retry_counts[prev_tool] += 1 + + # 4. Manual processes (10+ sequential tool calls with no automation) + if len(tool_calls) > 10: + tool_sequence = [tc.get("tool") for tc in tool_calls] + unique_tools = set(tool_sequence) + if len(unique_tools) <= 3 and len(tool_calls) > 10: + manual_sequences.append({ + "session_id": session_id, + "repo": repo, + "tool_count": len(tool_calls), + "unique_tools": list(unique_tools), + "duration_min": duration_min, + }) + + # Generate patterns from collected data + + # Repeated errors (appearing in 3+ sessions) + for err_key, occurrences in error_counts.items(): + if len(occurrences) >= 3: + repos = list(set(o["repo"] for o in occurrences)) + sessions_list = [o["session_id"] for o in occurrences[:5]] + # Estimate time wasted: 5 min per error occurrence + hours = len(occurrences) * 5 / 60 + patterns.append(WastePattern( + pattern_type="repeated_error", + description=f"Error: {err_key[:100]}", + occurrences=len(occurrences), + total_time_hours=round(hours, 1), + affected_repos=repos, + example_sessions=sessions_list, + )) + + # Slow tools (avg latency > 5000ms across 5+ calls) + for tool, latencies in tool_latencies.items(): + if len(latencies) >= 5: + avg_ms = sum(latencies) / len(latencies) + if avg_ms > 5000: + hours = sum(latencies) / 1000 / 3600 + patterns.append(WastePattern( + pattern_type="slow_tool", + description=f"Tool '{tool}' averages {avg_ms:.0f}ms latency", + occurrences=len(latencies), + total_time_hours=round(hours, 1), + affected_repos=["global"], + example_sessions=[], + )) + + # Failed retries + for tool, count in retry_counts.items(): + if count >= 2: + hours = count * 10 / 60 # ~10 min per failed retry sequence + patterns.append(WastePattern( + pattern_type="failed_retry", + description=f"Tool '{tool}' had {count} retry sequences (3+ consecutive calls)", + occurrences=count, + total_time_hours=round(hours, 1), + affected_repos=["global"], + example_sessions=[], + )) + + # Manual processes + for seq in manual_sequences: + hours = seq["duration_min"] / 60 + patterns.append(WastePattern( + pattern_type="manual_process", + description=f"Session {seq['session_id'][:12]}: {seq['tool_count']} tool calls with only {len(seq['unique_tools'])} unique tools", + occurrences=1, + total_time_hours=round(hours, 1), + affected_repos=[seq["repo"]], + example_sessions=[seq["session_id"]], + )) + + return sorted(patterns, key=lambda p: p.total_time_hours, reverse=True) + + +def _normalize_error(message: str) -> str: + """Normalize error message to a common key.""" + if not message: + return "" + msg = message.lower().strip() + # Remove variable parts (paths, IDs, timestamps) + import re + msg = re.sub(r'/\S+', '/PATH', msg) + msg = re.sub(r'\b[0-9a-f]{8,}\b', 'HASH', msg) + msg = re.sub(r'\d{4}-\d{2}-\d{2}[tT]\d{2}:\d{2}', 'TIME', msg) + return msg[:150] + + +# ── Proposal generation ─────────────────────────────────────── + +def generate_proposals( + patterns: List[WastePattern], + hourly_rate: float = 50.0, + implementation_overhead: float = 1.5, +) -> List[ImprovementProposal]: + """ + Generate improvement proposals from waste patterns. + + Args: + patterns: Detected waste patterns + hourly_rate: Developer hourly rate for ROI calculation + implementation_overhead: Multiplier for implementation time estimate + """ + proposals = [] + + # Group patterns by type + by_type: Dict[str, List[WastePattern]] = defaultdict(list) + for p in patterns: + by_type[p.pattern_type].append(p) + + # 1. Repeated errors → Create fix/skill + for p in by_type.get("repeated_error", []): + weekly_hours = p.total_time_hours / 4 # monthly → weekly + impl_hours = max(1.0, p.occurrences * 0.25) * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Fix repeated error: {p.description[:60]}", + problem=f"This error occurred {p.occurrences} times across {len(p.affected_repos)} repos, wasting ~{p.total_time_hours:.1f} hours.", + proposed_solution="Root-cause the error and create a permanent fix or mitigation skill.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="reliability", + supporting_evidence=[f"{p.occurrences} occurrences in sessions: {', '.join(p.example_sessions[:3])}"], + )) + + # 2. Slow tools → Optimize or replace + for p in by_type.get("slow_tool", []): + weekly_hours = p.total_time_hours / 4 + impl_hours = 3.0 * implementation_overhead # optimization task + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Optimize slow tool: {p.description[:60]}", + problem=f"Tool has {p.occurrences} calls with high latency, wasting ~{p.total_time_hours:.1f} hours total.", + proposed_solution="Profile the tool, add caching, or replace with a faster alternative.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="performance", + supporting_evidence=[f"{p.occurrences} slow calls detected"], + )) + + # 3. Failed retries → Add retry logic or validation + for p in by_type.get("failed_retry", []): + weekly_hours = p.total_time_hours / 4 + impl_hours = 2.0 * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Reduce retries for tool '{p.description[:50]}'", + problem=f"Tool had {p.occurrences} retry sequences, wasting ~{p.total_time_hours:.1f} hours.", + proposed_solution="Add input validation, pre-flight checks, or automatic retry with backoff.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(p.total_time_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="reliability", + supporting_evidence=[f"{p.occurrences} retry sequences detected"], + )) + + # 4. Manual processes → Automate + total_manual_hours = sum(p.total_time_hours for p in by_type.get("manual_process", [])) + manual_patterns = by_type.get("manual_process", []) + if manual_patterns: + weekly_hours = total_manual_hours / 4 + impl_hours = len(manual_patterns) * 2.0 * implementation_overhead + roi_weeks = impl_hours / weekly_hours if weekly_hours > 0 else float('inf') + + proposals.append(ImprovementProposal( + title=f"Automate {len(manual_patterns)} manual processes", + problem=f"{len(manual_patterns)} sessions had long manual tool sequences, wasting ~{total_manual_hours:.1f} hours.", + proposed_solution="Create composite skills or scripts that combine the repeated tool sequences into single operations.", + estimated_weekly_hours_saved=round(weekly_hours, 1), + estimated_monthly_hours_saved=round(total_manual_hours, 1), + implementation_hours=round(impl_hours, 1), + roi_weeks=round(roi_weeks, 1), + priority=_priority_from_roi(roi_weeks), + affected_area="automation", + supporting_evidence=[f"{len(manual_patterns)} manual sessions detected"], + )) + + return sorted(proposals, key=lambda p: p.estimated_monthly_hours_saved, reverse=True) + + +def _priority_from_roi(roi_weeks: float) -> str: + """Determine priority from ROI break-even time.""" + if roi_weeks <= 1: + return "critical" + elif roi_weeks <= 4: + return "high" + elif roi_weeks <= 12: + return "medium" + return "low" + + +# ── Output formatting ───────────────────────────────────────── + +def format_proposals_markdown( + proposals: List[ImprovementProposal], + patterns: List[WastePattern], + generated_at: str, +) -> str: + """Format proposals as a markdown document.""" + lines = [ + "# Improvement Proposals", + "", + f"Generated: {generated_at}", + f"Based on analysis of {sum(p.occurrences for p in patterns)} waste events across {len(set(r for p in patterns for r in p.affected_repos))} repos.", + "", + "---", + "", + "## Summary", + "", + f"| Metric | Value |", + f"|--------|-------|", + f"| Total proposals | {len(proposals)} |", + f"| Critical priority | {sum(1 for p in proposals if p.priority == 'critical')} |", + f"| Total monthly hours wasted | {sum(p.estimated_monthly_hours_saved for p in proposals):.1f}h |", + f"| Total weekly hours recoverable | {sum(p.estimated_weekly_hours_saved for p in proposals):.1f}h |", + f"| Implementation investment | {sum(p.implementation_hours for p in proposals):.1f}h |", + "", + "---", + "", + ] + + for i, prop in enumerate(proposals, 1): + lines.extend([ + f"## {i}. {prop.title}", + "", + f"**Priority:** {prop.priority.upper()} ", + f"**Area:** {prop.affected_area} ", + f"**ROI break-even:** {prop.roi_weeks:.1f} weeks", + "", + "### Problem", + "", + prop.problem, + "", + "### Proposed Solution", + "", + prop.proposed_solution, + "", + "### ROI Estimate", + "", + f"- Weekly hours saved: **{prop.estimated_weekly_hours_saved}h**", + f"- Monthly hours saved: **{prop.estimated_monthly_hours_saved}h**", + f"- Implementation time: **{prop.implementation_hours}h**", + f"- Break-even: **{prop.roi_weeks:.1f} weeks**", + "", + ]) + + if prop.supporting_evidence: + lines.extend([ + "### Evidence", + "", + ]) + for ev in prop.supporting_evidence: + lines.append(f"- {ev}") + lines.append("") + + lines.extend(["---", ""]) + + # Waste pattern appendix + lines.extend([ + "## Appendix: Detected Waste Patterns", + "", + "| Type | Description | Occurrences | Hours |", + "|------|-------------|-------------|-------|", + ]) + for p in patterns[:20]: + lines.append( + f"| {p.pattern_type} | {p.description[:60]} | {p.occurrences} | {p.total_time_hours}h |" + ) + lines.append("") + + return "\n".join(lines) + + +def format_proposals_json(proposals: List[ImprovementProposal]) -> str: + """Format proposals as JSON.""" + return json.dumps( + [asdict(p) for p in proposals], + indent=2, + default=str, + ) + + +# ── Main ────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Generate improvement proposals from session analytics") + parser.add_argument("--input", required=True, help="Path to session analytics JSON file") + parser.add_argument("--output", help="Output file path (default: stdout)") + parser.add_argument("--format", choices=["markdown", "json"], default="markdown", help="Output format") + parser.add_argument("--hourly-rate", type=float, default=50.0, help="Developer hourly rate for ROI calc") + parser.add_argument("--threshold", type=float, default=2.0, help="Min monthly hours to include proposal") + + args = parser.parse_args() + + with open(args.input) as f: + data = json.load(f) + + sessions = data if isinstance(data, list) else data.get("sessions", []) + + if not sessions: + print("No sessions found in input file.", file=sys.stderr) + sys.exit(1) + + # Analyze + patterns = analyze_sessions(sessions) + proposals = generate_proposals(patterns, hourly_rate=args.hourly_rate) + + # Filter by threshold + proposals = [p for p in proposals if p.estimated_monthly_hours_saved >= args.threshold] + + generated_at = datetime.now(timezone.utc).isoformat() + + if args.format == "markdown": + output = format_proposals_markdown(proposals, patterns, generated_at) + else: + output = format_proposals_json(proposals) + + if args.output: + with open(args.output, "w") as f: + f.write(output) + print(f"Wrote {len(proposals)} proposals to {args.output}", file=sys.stderr) + else: + print(output) + + +if __name__ == "__main__": + main()