[gemma-4-multimodal] Visual Log Analysis for System Health #1489

Open
Rockachopa wants to merge 1 commits from gemma4-worker-20260409-105442-1484 into main

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""
Visual Log Analyzer — System Health Screenshot Analysis
========================================================
Analyzes screenshots of system monitoring dashboards (htop, Grafana,
CloudWatch, etc.) to detect anomalies in resource usage patterns.
Usage:
python scripts/visual_log_analyzer.py analyze /tmp/htop_screenshot.png
python scripts/visual_log_analyzer.py batch /tmp/monitor_screenshots/
python scripts/visual_log_analyzer.py compare before.png after.png
"""
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
@dataclass
class ResourceAnomaly:
"""An anomaly detected in a system monitoring screenshot."""
resource: str # cpu, memory, disk, network, process
severity: str # critical, warning, info
description: str
value: Optional[str] = None
threshold: Optional[str] = None
recommendation: str = ""
@dataclass
class HealthAnalysis:
"""Result of analyzing a system health screenshot."""
timestamp: str
screenshot_path: str
overall_status: str # healthy, warning, critical
anomalies: list = field(default_factory=list)
metrics: dict = field(default_factory=dict)
confidence: float = 0.0
raw_analysis: str = ""
def summary(self) -> str:
status_emoji = {"healthy": "", "warning": "⚠️", "critical": "🔴"}.get(self.overall_status, "")
lines = [
f"{status_emoji} System Health: {self.overall_status.upper()}",
f"Analyzed: {self.timestamp}",
f"Screenshot: {self.screenshot_path}",
f"Confidence: {self.confidence:.0%}",
""
]
if self.anomalies:
lines.append("Anomalies detected:")
for a in self.anomalies:
emoji = {"critical": "🔴", "warning": "🟡", "info": ""}.get(a.severity, "")
lines.append(f" {emoji} [{a.resource}] {a.description}")
if a.recommendation:
lines.append(f"{a.recommendation}")
else:
lines.append("No anomalies detected.")
return "\n".join(lines)
class VisualLogAnalyzer:
"""Analyzes system monitoring screenshots for anomalies."""
def analyze_screenshot(self, screenshot_path: str, monitor_type: str = "auto") -> dict:
"""
Build analysis prompt for a system monitoring screenshot.
Args:
screenshot_path: Path to screenshot
monitor_type: "htop", "grafana", "cloudwatch", "docker", "auto"
Returns:
Dict with analysis prompt for vision model
"""
prompt = f"""Analyze this system monitoring screenshot ({monitor_type}) and detect anomalies.
Check for:
- CPU usage above 80% sustained
- Memory usage above 85%
- Disk usage above 90%
- Unusual process names or high-PID processes consuming resources
- Network traffic spikes
- Load average anomalies
- Zombie processes
- Swap usage
For each anomaly found, report:
- Resource type (cpu, memory, disk, network, process)
- Severity (critical, warning, info)
- Current value and threshold
- Recommended action
Also extract overall metrics:
- CPU usage %
- Memory usage %
- Disk usage %
- Top 3 processes by resource use
- Load average
Return as JSON:
```json
{{
"overall_status": "healthy|warning|critical",
"metrics": {{"cpu_pct": 45, "memory_pct": 62}},
"anomalies": [
{{"resource": "cpu", "severity": "warning", "description": "...", "value": "85%", "threshold": "80%", "recommendation": "..."}}
],
"confidence": 0.85
}}
```
"""
return {
"prompt": prompt,
"screenshot_path": screenshot_path,
"monitor_type": monitor_type,
"instruction": "Use vision_analyze tool with this prompt"
}
def compare_screenshots(self, before_path: str, after_path: str) -> dict:
"""Compare two monitoring screenshots to detect changes."""
prompt = f"""Compare these two system monitoring screenshots taken at different times.
Before: {before_path}
After: {after_path}
Identify:
- Resources that increased significantly
- New processes that appeared
- Processes that disappeared
- Overall health trend (improving, stable, degrading)
Return analysis as JSON with trend assessment.
"""
return {
"prompt": prompt,
"before": before_path,
"after": after_path,
"instruction": "Use vision_analyze for each screenshot, then compare results"
}
def main():
if len(sys.argv) < 2:
print("Usage: visual_log_analyzer.py <analyze|batch|compare> [args...]")
return 1
analyzer = VisualLogAnalyzer()
cmd = sys.argv[1]
if cmd == "analyze":
if len(sys.argv) < 3:
print("Usage: visual_log_analyzer.py analyze <screenshot> [monitor_type]")
return 1
path = sys.argv[2]
mtype = sys.argv[3] if len(sys.argv) > 3 else "auto"
result = analyzer.analyze_screenshot(path, mtype)
print(json.dumps(result, indent=2))
elif cmd == "compare":
if len(sys.argv) < 4:
print("Usage: visual_log_analyzer.py compare <before.png> <after.png>")
return 1
result = analyzer.compare_screenshots(sys.argv[2], sys.argv[3])
print(json.dumps(result, indent=2))
elif cmd == "batch":
if len(sys.argv) < 3:
print("Usage: visual_log_analyzer.py batch <screenshot_dir>")
return 1
dirpath = Path(sys.argv[2])
if not dirpath.is_dir():
print(f"Not a directory: {dirpath}")
return 1
for img in sorted(dirpath.glob("*.png")):
print(f"\n--- {img.name} ---")
result = analyzer.analyze_screenshot(str(img))
print(json.dumps(result, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())