#!/usr/bin/env python3 """ Visual Log Analyzer — System Health Screenshot Analysis ======================================================== Analyzes screenshots of system monitoring dashboards (htop, Grafana, CloudWatch, etc.) to detect anomalies in resource usage patterns. Usage: python scripts/visual_log_analyzer.py analyze /tmp/htop_screenshot.png python scripts/visual_log_analyzer.py batch /tmp/monitor_screenshots/ python scripts/visual_log_analyzer.py compare before.png after.png """ import json import os import sys from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional @dataclass class ResourceAnomaly: """An anomaly detected in a system monitoring screenshot.""" resource: str # cpu, memory, disk, network, process severity: str # critical, warning, info description: str value: Optional[str] = None threshold: Optional[str] = None recommendation: str = "" @dataclass class HealthAnalysis: """Result of analyzing a system health screenshot.""" timestamp: str screenshot_path: str overall_status: str # healthy, warning, critical anomalies: list = field(default_factory=list) metrics: dict = field(default_factory=dict) confidence: float = 0.0 raw_analysis: str = "" def summary(self) -> str: status_emoji = {"healthy": "✅", "warning": "âš ī¸", "critical": "🔴"}.get(self.overall_status, "❓") lines = [ f"{status_emoji} System Health: {self.overall_status.upper()}", f"Analyzed: {self.timestamp}", f"Screenshot: {self.screenshot_path}", f"Confidence: {self.confidence:.0%}", "" ] if self.anomalies: lines.append("Anomalies detected:") for a in self.anomalies: emoji = {"critical": "🔴", "warning": "🟡", "info": "â„šī¸"}.get(a.severity, "") lines.append(f" {emoji} [{a.resource}] {a.description}") if a.recommendation: lines.append(f" → {a.recommendation}") else: lines.append("No anomalies detected.") return "\n".join(lines) class VisualLogAnalyzer: """Analyzes system monitoring screenshots for anomalies.""" def analyze_screenshot(self, screenshot_path: str, monitor_type: str = "auto") -> dict: """ Build analysis prompt for a system monitoring screenshot. Args: screenshot_path: Path to screenshot monitor_type: "htop", "grafana", "cloudwatch", "docker", "auto" Returns: Dict with analysis prompt for vision model """ prompt = f"""Analyze this system monitoring screenshot ({monitor_type}) and detect anomalies. Check for: - CPU usage above 80% sustained - Memory usage above 85% - Disk usage above 90% - Unusual process names or high-PID processes consuming resources - Network traffic spikes - Load average anomalies - Zombie processes - Swap usage For each anomaly found, report: - Resource type (cpu, memory, disk, network, process) - Severity (critical, warning, info) - Current value and threshold - Recommended action Also extract overall metrics: - CPU usage % - Memory usage % - Disk usage % - Top 3 processes by resource use - Load average Return as JSON: ```json {{ "overall_status": "healthy|warning|critical", "metrics": {{"cpu_pct": 45, "memory_pct": 62}}, "anomalies": [ {{"resource": "cpu", "severity": "warning", "description": "...", "value": "85%", "threshold": "80%", "recommendation": "..."}} ], "confidence": 0.85 }} ``` """ return { "prompt": prompt, "screenshot_path": screenshot_path, "monitor_type": monitor_type, "instruction": "Use vision_analyze tool with this prompt" } def compare_screenshots(self, before_path: str, after_path: str) -> dict: """Compare two monitoring screenshots to detect changes.""" prompt = f"""Compare these two system monitoring screenshots taken at different times. Before: {before_path} After: {after_path} Identify: - Resources that increased significantly - New processes that appeared - Processes that disappeared - Overall health trend (improving, stable, degrading) Return analysis as JSON with trend assessment. """ return { "prompt": prompt, "before": before_path, "after": after_path, "instruction": "Use vision_analyze for each screenshot, then compare results" } def main(): if len(sys.argv) < 2: print("Usage: visual_log_analyzer.py [args...]") return 1 analyzer = VisualLogAnalyzer() cmd = sys.argv[1] if cmd == "analyze": if len(sys.argv) < 3: print("Usage: visual_log_analyzer.py analyze [monitor_type]") return 1 path = sys.argv[2] mtype = sys.argv[3] if len(sys.argv) > 3 else "auto" result = analyzer.analyze_screenshot(path, mtype) print(json.dumps(result, indent=2)) elif cmd == "compare": if len(sys.argv) < 4: print("Usage: visual_log_analyzer.py compare ") return 1 result = analyzer.compare_screenshots(sys.argv[2], sys.argv[3]) print(json.dumps(result, indent=2)) elif cmd == "batch": if len(sys.argv) < 3: print("Usage: visual_log_analyzer.py batch ") return 1 dirpath = Path(sys.argv[2]) if not dirpath.is_dir(): print(f"Not a directory: {dirpath}") return 1 for img in sorted(dirpath.glob("*.png")): print(f"\n--- {img.name} ---") result = analyzer.analyze_screenshot(str(img)) print(json.dumps(result, indent=2)) return 0 if __name__ == "__main__": sys.exit(main())