From 03f4f8fbad0182a6bc6702a4e3ea4fd78c83eaea Mon Sep 17 00:00:00 2001 From: Alex Payne Date: Sun, 26 Apr 2026 06:02:58 -0400 Subject: [PATCH] PAPERCLIPS-7: Add disassembly audit script - New bin/disassembly_audit.py identifies waste: * Zombie processes (defunct, unreclaimable) * Unused systemd services (enabled but inactive) * Dead loops: systemic cron failures (via cron-audit-662) * Stale hermes agent sessions - Recommends keep/kill/disable actions - Estimates monthly cost savings (memory, CPU) - Optional --execute flag for safe disassembly - JSON and markdown report output Closes #335 --- bin/disassembly_audit.py | 477 ++++++++++++++++++ .../{cron-audit-662.py => cron_audit_662.py} | 0 tests/test_disassembly_audit.py | 25 + 3 files changed, 502 insertions(+) create mode 100644 bin/disassembly_audit.py rename scripts/{cron-audit-662.py => cron_audit_662.py} (100%) create mode 100755 tests/test_disassembly_audit.py diff --git a/bin/disassembly_audit.py b/bin/disassembly_audit.py new file mode 100644 index 00000000..3389dde3 --- /dev/null +++ b/bin/disassembly_audit.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +""" +Disassembly Audit — Know what to tear down (#335) + +Monthly audit that identifies waste: zombie processes, unused services, dead loops. +Recommends: keep, kill, or migrate. Optionally executes clean disassembly. + +Usage: + python3 bin/disassembly_audit.py # Dry-run audit report + python3 bin/disassembly_audit.py --execute # Perform safe disassembly + python3 bin/disassembly_audit.py --json # JSON output + python3 bin/disassembly_audit.py --output report.md + +References: + - Issue: https://forge.alexanderwhitestone.com/Timmy_Foundation/timmy-config/issues/335 + - Epic: #328 (Universal Paperclips — Disassembly Sequence) +""" +from __future__ import annotations + +import json +import os +import re +import signal +import subprocess +import sys +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +# === CONFIG === +ZOMBIE_THRESHOLD_DAYS = 7 +UNUSED_SERVICE_DAYS = 30 +CRON_ERROR_THRESHOLD_HOURS = 48 +HERMES_STALE_AGE_HOURS = 24 +COST_PER_GB_MONTH = 10 +COST_PER_VCPU_MONTH = 5 + + +def get_zombie_processes() -> List[Dict[str, Any]]: + """Identify zombie (defunct) processes.""" + try: + result = subprocess.run( + ["ps", "aux"], + capture_output=True, text=True, timeout=10 + ) + zombies = [] + for line in result.stdout.split('\n'): + if not line.strip() or 'grep' in line: + continue + parts = line.split(None, 10) + if len(parts) < 11: + continue + stat = parts[7] + if 'Z' in stat: + pid = int(parts[1]) + cmd = parts[10][:120] + age_sec = None + try: + age_result = subprocess.run( + ["ps", "-o", "etimes=", "-p", str(pid)], + capture_output=True, text=True, timeout=5 + ) + if age_result.returncode == 0 and age_result.stdout.strip(): + age_sec = int(age_result.stdout.strip()) + except Exception: + pass + zombies.append({ + "pid": pid, + "user": parts[0], + "command": cmd, + "state": stat, + "age_seconds": age_sec, + "age_hours": round(age_sec / 3600, 1) if age_sec else None, + "category": "zombie_process", + "recommendation": "kill", + "reason": "Zombie process cannot be wait()-ed; blocks resources", + "resources_freed_mb": estimate_process_memory_mb(pid), + }) + return zombies + except Exception as e: + print(f"Warning: zombie scan failed: {e}", file=sys.stderr) + return [] + + +def estimate_process_memory_mb(pid: int) -> float: + """Estimate memory usage of a process in MB.""" + try: + result = subprocess.run( + ["ps", "-p", str(pid), "-o", "rss="], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + rss_kb = int(result.stdout.strip()) + return round(rss_kb / 1024, 1) + except Exception: + pass + return 0.0 + + +def get_systemd_services() -> List[Dict[str, Any]]: + """Identify enabled but inactive systemd services (potential waste).""" + try: + result = subprocess.run( + ["systemctl", "list-units", "--type=service", "--state=enabled", "--all", "--output=json"], + capture_output=True, text=True, timeout=10 + ) + services = [] + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + return [] + + for svc in data: + name = svc.get("unit", "") + desc = svc.get("description", "") + try: + active_result = subprocess.run( + ["systemctl", "is-active", name], + capture_output=True, text=True, timeout=5 + ) + active = active_result.stdout.strip() + if active == "inactive": + services.append({ + "service": name, + "description": desc, + "state": "enabled+inactive", + "category": "unused_service", + "recommendation": "disable", + "reason": "Service is enabled but not running", + "resources_freed_mb": 0, + }) + except Exception: + continue + + return services + except FileNotFoundError: + return [] + except Exception as e: + print(f"Warning: systemd scan failed: {e}", file=sys.stderr) + return [] + + +def get_cron_jobs_audit() -> Dict[str, Any]: + """Reuse cron-audit logic if available.""" + try: + sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts")) + from cron_audit_662 import load_cron_state, categorize_job, parse_timestamp + jobs = load_cron_state() + now = datetime.now(timezone.utc) + categorized = [] + systemic = [] + transient = [] + healthy = [] + + for job in jobs: + cat = categorize_job(job, now) + cat["_job"] = job + categorized.append(cat) + if cat["category"] == "systemic": + systemic.append(cat) + elif cat["category"] == "transient": + transient.append(cat) + else: + healthy.append(cat) + + return { + "total": len(jobs), + "healthy": len(healthy), + "transient": len(transient), + "systemic": len(systemic), + "systemic_jobs": [ + { + "id": c["id"], + "name": c["name"], + "reason": c["reason"], + "last_error": c.get("last_error", ""), + "recommendation": "disable", + "category": "dead_loop", + } + for c in systemic + ], + "transient_jobs": [ + {"id": c["id"], "name": c["name"], "reason": c["reason"], "recommendation": "monitor"} + for c in transient + ], + } + except ImportError: + return {"error": "cron_audit_662 not available", "total": 0, "systemic_jobs": []} + + +def get_hermes_stale_sessions() -> List[Dict[str, Any]]: + """Detect stale hermes agent sessions.""" + try: + result = subprocess.run( + ["ps", "aux"], + capture_output=True, text=True, timeout=10 + ) + sessions = {} + for line in result.stdout.split('\n'): + if 'hermes' not in line.lower() or 'grep' in line: + continue + parts = line.split(None, 10) + if len(parts) < 11: + continue + pid = int(parts[1]) + cpu = float(parts[2]) + mem_kb = int(parts[5]) + cmd = parts[10][:100] + sessions[str(pid)] = {"pid": pid, "cpu": cpu, "rss": mem_kb, "cmd": cmd} + try: + child_out = subprocess.run( + ["pgrep", "-P", str(pid)], + capture_output=True, text=True, timeout=5 + ) + children = [] + for cp in child_out.stdout.strip().split('\n'): + if cp.strip(): + children.append(int(cp.strip())) + sessions[str(pid)]["children"] = children + except Exception: + sessions[str(pid)]["children"] = [] + + stale = [] + for key, proc_info in sessions.items(): + pid = proc_info["pid"] + try: + age_res = subprocess.run( + ["ps", "-o", "etimes=", "-p", str(pid)], + capture_output=True, text=True, timeout=5 + ) + if age_res.returncode == 0 and age_res.stdout.strip(): + age_sec = int(age_res.stdout.strip()) + age_hours = age_sec / 3600 + else: + continue + except Exception: + continue + + if age_hours > HERMES_STALE_AGE_HOURS and proc_info["cpu"] < 0.5: + total_rss = proc_info["rss"] + sum( + _get_rss_kb(child) for child in proc_info.get("children", []) + ) + stale.append({ + "pid": pid, + "age_hours": round(age_hours, 1), + "cpu_percent": proc_info["cpu"], + "total_rss_mb": round(total_rss / 1024, 1), + "process_count": 1 + len(proc_info.get("children", [])), + "command": proc_info["cmd"], + "category": "dead_loop", + "recommendation": "kill", + "reason": f"Hermes session idle for {age_hours:.1f}h", + "resources_freed_mb": round(total_rss / 1024, 1), + }) + return stale + except Exception as e: + print(f"Warning: hermes scan failed: {e}", file=sys.stderr) + return [] + + +def _get_rss_kb(pid: int) -> int: + try: + r = subprocess.run(["ps", "-p", str(pid), "-o", "rss="], capture_output=True, text=True, timeout=3) + if r.returncode == 0 and r.stdout.strip(): + return int(r.stdout.strip()) + except Exception: + pass + return 0 + + +def calculate_cost_savings(findings: List[Dict[str, Any]]) -> Dict[str, float]: + total_mb = sum(f.get("resources_freed_mb", 0) for f in findings) + gb = total_mb / 1024 + process_count = sum(f.get("process_count", 1) for f in findings) + cpu_equiv = process_count * 0.1 + return { + "memory_gb_reclaimed": round(gb, 2), + "memory_cost_saved_monthly": round(gb * COST_PER_GB_MONTH, 2), + "cpu_equiv_vcpus": round(cpu_equiv, 1), + "cpu_cost_saved_monthly": round(cpu_equiv * COST_PER_VCPU_MONTH, 2), + "total_cost_saved_monthly": round(gb * COST_PER_GB_MONTH + cpu_equiv * COST_PER_VCPU_MONTH, 2), + } + + +def generate_markdown_report(audit: Dict[str, Any]) -> str: + lines = [] + lines.append("# Disassembly Audit Report") + lines.append(f"**Generated:** {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") + lines.append(f"**Issue:** [#335](https://forge.alexanderwhitestone.com/Timmy_Foundation/timmy-config/issues/335)") + lines.append("") + + total_findings = sum(len(audit.get(k, [])) for k in ["zombies", "unused_services", "dead_loops_cron", "dead_loops_hermes"]) + lines.append("## Summary") + lines.append("") + lines.append("| Category | Count | Action |") + lines.append("|----------|-------|--------|") + lines.append(f"| Zombie Processes | {len(audit.get('zombies', []))} | Kill |") + lines.append(f"| Unused Services | {len(audit.get('unused_services', []))} | Disable |") + lines.append(f"| Dead Loops (Cron) | {len(audit.get('dead_loops_cron', []))} | Disable |") + lines.append(f"| Dead Loops (Hermes) | {len(audit.get('dead_loops_hermes', []))} | Kill |") + lines.append(f"| **Total Waste** | **{total_findings}** | **Disassemble** |") + lines.append("") + + costs = audit.get("cost_savings", {}) + if costs and costs.get("total_cost_saved_monthly", 0) > 0: + lines.append("## Cost Impact (Monthly)") + lines.append("") + lines.append(f"- Memory reclaimed: **{costs.get('memory_gb_reclaimed', 0):.2f} GB**") + lines.append(f"- CPU equivalent: **{costs.get('cpu_equiv_vcpus', 0):.1f} vCPUs**") + lines.append(f"- **Estimated monthly savings: ${costs.get('total_cost_saved_monthly', 0):.2f}**") + lines.append("") + + def add_table(title, items, headers): + if not items: + return + lines.append(f"## {title}") + lines.append("") + header_row = "| " + " | ".join(headers) + " |" + lines.append(header_row) + lines.append("|" + "|".join("---" for _ in headers) + "|") + for item in items: + row = [] + for h in headers: + key = h.lower().replace(" ", "_") + val = item.get(key, "—") + if isinstance(val, float): + val = f"{val:.1f}" + row.append(str(val)) + lines.append("| " + " | ".join(row) + " |") + lines.append("") + + add_table( + "Zombie Processes", + audit.get("zombies", []), + ["PID", "User", "Command", "Age (h)", "Memory (MB)", "Action"] + ) + add_table( + "Unused Services (enabled but inactive)", + audit.get("unused_services", []), + ["Service", "Description", "Action"] + ) + add_table( + "Dead Loops — Cron Systemic Failures", + audit.get("dead_loops_cron", []), + ["ID", "Name", "Reason", "Action"] + ) + add_table( + "Dead Loops — Stale Hermes Sessions", + audit.get("dead_loops_hermes", []), + ["PID", "Age (h)", "CPU %", "Memory (MB)", "Procs", "Action"] + ) + + lines.append("## Recommended Actions") + lines.append("") + lines.append("### Kill immediately") + lines.append("- All zombie processes (unreclaimable)") + lines.append("- Stale hermes sessions (idle >24h)") + lines.append("") + lines.append("### Disable") + lines.append("- Systemd services enabled but inactive") + lines.append("- Cron jobs erroring >48h (systemic failures)") + lines.append("") + lines.append("### Monitor") + lines.append("- Cron jobs with transient errors (network, rate limit)") + lines.append("") + lines.append("## Disassembly Execution") + lines.append("") + lines.append("To execute these disassembly actions safely, run:") + lines.append("```") + lines.append(" python3 bin/disassembly_audit.py --execute") + lines.append("```") + lines.append("") + lines.append("This will perform safe termination, disable services, pause cron jobs.") + lines.append("") + lines.append("---") + lines.append("*Generated by Disassembly Audit — #335*") + return "\n".join(lines) + + +def execute_disassembly(audit: Dict[str, Any]) -> Dict[str, Any]: + executed = {"killed": [], "disabled": [], "errors": []} + for z in audit.get("zombies", []): + pid = z["pid"] + try: + os.kill(pid, signal.SIGTERM) + executed["killed"].append({"pid": pid, "type": "zombie"}) + except Exception as e: + executed["errors"].append(f"PID {pid}: {e}") + + for h in audit.get("dead_loops_hermes", []): + pid = h["pid"] + try: + os.kill(pid, signal.SIGTERM) + executed["killed"].append({"pid": pid, "type": "hermes_stale"}) + except Exception as e: + executed["errors"].append(f"PID {pid}: {e}") + + for s in audit.get("unused_services", []): + name = s["service"] + try: + subprocess.run(["systemctl", "disable", "--now", name], check=True, capture_output=True) + executed["disabled"].append({"service": name, "type": "systemd"}) + except subprocess.CalledProcessError as e: + executed["errors"].append(f"Service {name}: {e}") + + for c in audit.get("dead_loops_cron", []): + job_id = c["id"] + try: + subprocess.run(["hermes", "cron", "pause", job_id], check=True, capture_output=True) + executed["disabled"].append({"cron_job": job_id, "type": "hermes_cron"}) + except subprocess.CalledProcessError as e: + executed["errors"].append(f"Cron {job_id}: {e}") + + return executed + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Disassembly Audit — identify & eliminate waste") + parser.add_argument("--json", action="store_true", help="JSON output only") + parser.add_argument("--output", "-o", help="Write report to file") + parser.add_argument("--execute", action="store_true", help="Execute disassembly actions (DESTRUCTIVE)") + args = parser.parse_args() + + zombies = get_zombie_processes() + unused_services = get_systemd_services() + cron_audit = get_cron_jobs_audit() + hermes_stale = get_hermes_stale_sessions() + + dead_loops_cron = cron_audit.get("systemic_jobs", []) + dead_loops_hermes = hermes_stale + + all_findings = zombies + unused_services + dead_loops_cron + dead_loops_hermes + cost_savings = calculate_cost_savings(all_findings) + + audit = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "zombies": zombies, + "unused_services": unused_services, + "dead_loops_cron": dead_loops_cron, + "dead_loops_hermes": dead_loops_hermes, + "cost_savings": cost_savings, + } + + if args.json: + print(json.dumps(audit, indent=2)) + sys.exit(0) + + report = generate_markdown_report(audit) + if args.output: + Path(args.output).write_text(report) + print(f"Report written to {args.output}") + else: + print(report) + + if args.execute: + print("\n" + "="*50) + print("EXECUTING DISASSEMBLY ACTIONS...") + result = execute_disassembly(audit) + print(f"Killed: {len(result['killed'])} processes") + print(f"Disabled: {len(result['disabled'])} services/cron jobs") + if result["errors"]: + print(f"Errors: {len(result['errors'])}") + for e in result["errors"][:10]: + print(f" - {e}") + + total_critical = len(zombies) + len(unused_services) + len(dead_loops_cron) + if total_critical > 0 and not args.execute: + print(f"\n⚠️ {total_critical} waste items identified. Run with --execute to disassemble.") + sys.exit(1) + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/scripts/cron-audit-662.py b/scripts/cron_audit_662.py similarity index 100% rename from scripts/cron-audit-662.py rename to scripts/cron_audit_662.py diff --git a/tests/test_disassembly_audit.py b/tests/test_disassembly_audit.py new file mode 100755 index 00000000..61288b2f --- /dev/null +++ b/tests/test_disassembly_audit.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +"""Minimal smoke test for disassembly_audit.py (#335)""" + +import sys +from pathlib import Path +import json +import subprocess + +script_path = Path(__file__).resolve().parent.parent / "bin" / "disassembly_audit.py" + +result = subprocess.run( + [sys.executable, str(script_path), "--json"], + capture_output=True, text=True, timeout=20 +) +if result.returncode != 0: + print(f"Script error: {result.stderr[:500]}") + sys.exit(1) + +data = json.loads(result.stdout) +assert "zombies" in data +assert "unused_services" in data +assert "dead_loops_cron" in data +assert "dead_loops_hermes" in data +assert "cost_savings" in data +print("SMOKE TEST: disassembly_audit.py generates valid report structure")