[gemma-4-multimodal] Visual Log Analysis for System Health #1489
189
scripts/visual_log_analyzer.py
Normal file
189
scripts/visual_log_analyzer.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Visual Log Analyzer — System Health Screenshot Analysis
|
||||
========================================================
|
||||
|
||||
Analyzes screenshots of system monitoring dashboards (htop, Grafana,
|
||||
CloudWatch, etc.) to detect anomalies in resource usage patterns.
|
||||
|
||||
Usage:
|
||||
python scripts/visual_log_analyzer.py analyze /tmp/htop_screenshot.png
|
||||
python scripts/visual_log_analyzer.py batch /tmp/monitor_screenshots/
|
||||
python scripts/visual_log_analyzer.py compare before.png after.png
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResourceAnomaly:
|
||||
"""An anomaly detected in a system monitoring screenshot."""
|
||||
resource: str # cpu, memory, disk, network, process
|
||||
severity: str # critical, warning, info
|
||||
description: str
|
||||
value: Optional[str] = None
|
||||
threshold: Optional[str] = None
|
||||
recommendation: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthAnalysis:
|
||||
"""Result of analyzing a system health screenshot."""
|
||||
timestamp: str
|
||||
screenshot_path: str
|
||||
overall_status: str # healthy, warning, critical
|
||||
anomalies: list = field(default_factory=list)
|
||||
metrics: dict = field(default_factory=dict)
|
||||
confidence: float = 0.0
|
||||
raw_analysis: str = ""
|
||||
|
||||
def summary(self) -> str:
|
||||
status_emoji = {"healthy": "✅", "warning": "⚠️", "critical": "🔴"}.get(self.overall_status, "❓")
|
||||
lines = [
|
||||
f"{status_emoji} System Health: {self.overall_status.upper()}",
|
||||
f"Analyzed: {self.timestamp}",
|
||||
f"Screenshot: {self.screenshot_path}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
""
|
||||
]
|
||||
if self.anomalies:
|
||||
lines.append("Anomalies detected:")
|
||||
for a in self.anomalies:
|
||||
emoji = {"critical": "🔴", "warning": "🟡", "info": "ℹ️"}.get(a.severity, "")
|
||||
lines.append(f" {emoji} [{a.resource}] {a.description}")
|
||||
if a.recommendation:
|
||||
lines.append(f" → {a.recommendation}")
|
||||
else:
|
||||
lines.append("No anomalies detected.")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class VisualLogAnalyzer:
|
||||
"""Analyzes system monitoring screenshots for anomalies."""
|
||||
|
||||
def analyze_screenshot(self, screenshot_path: str, monitor_type: str = "auto") -> dict:
|
||||
"""
|
||||
Build analysis prompt for a system monitoring screenshot.
|
||||
|
||||
Args:
|
||||
screenshot_path: Path to screenshot
|
||||
monitor_type: "htop", "grafana", "cloudwatch", "docker", "auto"
|
||||
|
||||
Returns:
|
||||
Dict with analysis prompt for vision model
|
||||
"""
|
||||
prompt = f"""Analyze this system monitoring screenshot ({monitor_type}) and detect anomalies.
|
||||
|
||||
Check for:
|
||||
- CPU usage above 80% sustained
|
||||
- Memory usage above 85%
|
||||
- Disk usage above 90%
|
||||
- Unusual process names or high-PID processes consuming resources
|
||||
- Network traffic spikes
|
||||
- Load average anomalies
|
||||
- Zombie processes
|
||||
- Swap usage
|
||||
|
||||
For each anomaly found, report:
|
||||
- Resource type (cpu, memory, disk, network, process)
|
||||
- Severity (critical, warning, info)
|
||||
- Current value and threshold
|
||||
- Recommended action
|
||||
|
||||
Also extract overall metrics:
|
||||
- CPU usage %
|
||||
- Memory usage %
|
||||
- Disk usage %
|
||||
- Top 3 processes by resource use
|
||||
- Load average
|
||||
|
||||
Return as JSON:
|
||||
```json
|
||||
{{
|
||||
"overall_status": "healthy|warning|critical",
|
||||
"metrics": {{"cpu_pct": 45, "memory_pct": 62}},
|
||||
"anomalies": [
|
||||
{{"resource": "cpu", "severity": "warning", "description": "...", "value": "85%", "threshold": "80%", "recommendation": "..."}}
|
||||
],
|
||||
"confidence": 0.85
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"screenshot_path": screenshot_path,
|
||||
"monitor_type": monitor_type,
|
||||
"instruction": "Use vision_analyze tool with this prompt"
|
||||
}
|
||||
|
||||
def compare_screenshots(self, before_path: str, after_path: str) -> dict:
|
||||
"""Compare two monitoring screenshots to detect changes."""
|
||||
prompt = f"""Compare these two system monitoring screenshots taken at different times.
|
||||
|
||||
Before: {before_path}
|
||||
After: {after_path}
|
||||
|
||||
Identify:
|
||||
- Resources that increased significantly
|
||||
- New processes that appeared
|
||||
- Processes that disappeared
|
||||
- Overall health trend (improving, stable, degrading)
|
||||
|
||||
Return analysis as JSON with trend assessment.
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"before": before_path,
|
||||
"after": after_path,
|
||||
"instruction": "Use vision_analyze for each screenshot, then compare results"
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: visual_log_analyzer.py <analyze|batch|compare> [args...]")
|
||||
return 1
|
||||
|
||||
analyzer = VisualLogAnalyzer()
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "analyze":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py analyze <screenshot> [monitor_type]")
|
||||
return 1
|
||||
path = sys.argv[2]
|
||||
mtype = sys.argv[3] if len(sys.argv) > 3 else "auto"
|
||||
result = analyzer.analyze_screenshot(path, mtype)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "compare":
|
||||
if len(sys.argv) < 4:
|
||||
print("Usage: visual_log_analyzer.py compare <before.png> <after.png>")
|
||||
return 1
|
||||
result = analyzer.compare_screenshots(sys.argv[2], sys.argv[3])
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "batch":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py batch <screenshot_dir>")
|
||||
return 1
|
||||
dirpath = Path(sys.argv[2])
|
||||
if not dirpath.is_dir():
|
||||
print(f"Not a directory: {dirpath}")
|
||||
return 1
|
||||
for img in sorted(dirpath.glob("*.png")):
|
||||
print(f"\n--- {img.name} ---")
|
||||
result = analyzer.analyze_screenshot(str(img))
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user