Compare commits
4 Commits
feat/gemma
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8298f6edc3 | ||
| d05ce8bf86 | |||
| 76a5548db1 | |||
| d1c069f8f8 |
@@ -1,12 +1,35 @@
|
||||
# Multimodal Backlog
|
||||
# Gemma 4 Multimodal Backlog
|
||||
|
||||
Tracking multimodal capability improvements for Timmy.
|
||||
## Epic 1: Visual QA for Nexus World
|
||||
- **Goal:** Use Gemma 4's vision to audit screenshots of the Three.js Nexus world for layout inconsistencies and UI bugs.
|
||||
- **Tasks:**
|
||||
- [x] Capture automated screenshots of all primary Nexus zones.
|
||||
- [ ] Analyze images for clipping, overlapping UI elements, and lighting glitches.
|
||||
- [ ] Generate a structured bug report with coordinates and suggested fixes.
|
||||
|
||||
## Pending
|
||||
## Epic 2: The Testament Visual Consistency Audit
|
||||
- **Goal:** Ensure the generated image assets for The Testament align with the narrative mood and visual manifest.
|
||||
- **Tasks:**
|
||||
- [ ] Compare generated assets against `visual_manifest.json` descriptions.
|
||||
- [ ] Flag images that diverge from the "Cinematic Noir, 35mm, high contrast" aesthetic.
|
||||
- [ ] Refine prompts for divergent beats and trigger re-renders.
|
||||
|
||||
- [ ] **Task 3:** Add a `ModelCapability.VIDEO` enum member for future video understanding models.
|
||||
- [ ] **Task 4:** Implement `get_model_for_content("video")` routing with appropriate fallback chain.
|
||||
## Epic 3: Sovereign Heart Emotive Stillness
|
||||
- **Goal:** Develop a system for selecting the most emotive static image based on the sentiment of generated TTS.
|
||||
- **Tasks:**
|
||||
- [ ] Analyze TTS output for emotional valence and arousal.
|
||||
- [ ] Map sentiment kernels to the visual asset library.
|
||||
- [ ] Implement a "breathing" transition logic between assets for an expressive presence.
|
||||
|
||||
## Completed
|
||||
## Epic 4: Multimodal Architecture Synthesis
|
||||
- **Goal:** Extract and synthesize architectural patterns from visual research papers.
|
||||
- **Tasks:**
|
||||
- [ ] Ingest PDF research papers on agentic workflows.
|
||||
- [ ] Analyze diagrams and charts to extract structural logic.
|
||||
- [ ] Synthesize findings into `Sovereign_Knowledge_Graph.md`.
|
||||
|
||||
## General Tasks
|
||||
|
||||
- [x] **Task 1:** Add Gemma 4 entries to `KNOWN_MODEL_CAPABILITIES` and vision fallback chain in `src/infrastructure/models/multimodal.py`. Gemma 4 is a multimodal model supporting vision, text, tools, JSON, and streaming. ✅ PR #1493
|
||||
- [ ] **Task 3:** Add a `ModelCapability.VIDEO` enum member for future video understanding models.
|
||||
- [ ] **Task 4:** Implement `get_model_for_content("video")` routing with appropriate fallback chain.
|
||||
|
||||
203
docs/ui-ux-audit-gemma4.md
Normal file
203
docs/ui-ux-audit-gemma4.md
Normal file
@@ -0,0 +1,203 @@
|
||||
# Visual UI/UX Audit — Timmy Dashboard
|
||||
|
||||
**Issue:** #1481
|
||||
**Auditor:** Gemma 4 Multimodal Worker
|
||||
**Date:** 2026-04-09
|
||||
**Branch:** gemma4-worker-20260409-104819-1481
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
A comprehensive visual audit of the Timmy Dashboard reveals a well-structured dark-themed UI with strong information architecture. The design uses a consistent purple/violet color palette on a deep space-like background. Several areas for improvement have been identified across layout consistency, mobile responsiveness, accessibility, and visual hierarchy.
|
||||
|
||||
---
|
||||
|
||||
## 1. Color System & Theming
|
||||
|
||||
### Current State
|
||||
- **Primary Background:** `#080412` (deep navy/purple black)
|
||||
- **Panel Background:** `#110820` (slightly lighter purple)
|
||||
- **Card Background:** `#180d2e` (lighter still)
|
||||
- **Border:** `#3b1a5c` (muted purple)
|
||||
- **Accent/Glow:** `#7c3aed` (bright violet)
|
||||
- **Text:** `#c8b0e0` (soft lavender)
|
||||
- **Text Bright:** `#ede0ff` (near-white lavender)
|
||||
- **Text Dim:** `#6b4a8a` (muted purple)
|
||||
- **Success:** `#00e87a` (bright green)
|
||||
- **Warning:** `#ffb800` (amber)
|
||||
- **Error:** `#ff4455` (red)
|
||||
- **Font:** JetBrains Mono (monospace) — used globally
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| C1 | ⚠️ `--blue` and `--orange` aliases are identical | Low | Both `--blue: #ff7a2a` and `--orange: #ff7a2a` map to the same orange value. This is misleading — either rename `--blue` to avoid confusion or use an actual blue like `#3b82f6`. |
|
||||
| C2 | ⚠️ Contrast ratio for `--text-dim` | Medium | `#6b4a8a` on `#080412` yields a contrast ratio of approximately 2.8:1, which fails WCAG AA (4.5:1 for body text). Consider `#8b6aaa` or similar for dim text on dark backgrounds. |
|
||||
| C3 | ✅ Good contrast for primary text | — | `#c8b0e0` on `#080412` meets AA standards (~6.2:1). |
|
||||
| C4 | ⚠️ No high-contrast / light theme option | Low | The dashboard is dark-only via `data-bs-theme="dark"`. Users in bright environments (outdoor, sunny offices) may struggle. A light toggle or `prefers-color-scheme` media query would help. |
|
||||
|
||||
---
|
||||
|
||||
## 2. Typography & Readability
|
||||
|
||||
### Current State
|
||||
- Global font: `JetBrains Mono`, `'Courier New'`, monospace
|
||||
- Used for ALL text — headings, body, UI labels, code blocks
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| T1 | ⚠️ Monospace for all UI text | Medium | Using a monospace font for body copy and UI labels reduces readability. Monospace is best reserved for code, terminal output, and data tables. A sans-serif (e.g., Inter, system-ui) for UI elements would improve scannability. |
|
||||
| T2 | ⚠️ No font size scale defined | Low | CSS doesn't define a clear type scale (e.g., 12/14/16/20/24/32). Font sizes appear to be set ad-hoc per component. A consistent scale improves visual hierarchy. |
|
||||
| T3 | ⚠️ `letter-spacing: 0.04em` on toasts | Low | The toast notification letter-spacing at 0.04em makes short messages feel scattered. Consider removing for messages under 50 characters. |
|
||||
|
||||
---
|
||||
|
||||
## 3. Layout & Grid
|
||||
|
||||
### Current State
|
||||
- Dashboard uses Bootstrap 5 grid (`col-12 col-md-3` sidebar, `col-12 col-md-9` main)
|
||||
- Landing page uses custom grid classes (`lp-value-grid`, `lp-caps-list`)
|
||||
- Mission control uses card-based panels via HTMX polling
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| L1 | ⚠️ Sidebar collapse at `col-md` (768px) | Medium | The sidebar drops below the main content at 768px. On tablets (768-1024px), users lose the sidebar — a critical navigation element. Consider collapsing to an icon sidebar at medium breakpoints rather than stacking. |
|
||||
| L2 | ⚠️ Inconsistent panel heights | Low | HTMX-polled panels load asynchronously, causing layout shifts as content appears. The `mc-loading-placeholder` shows "LOADING..." text, but panels may jump in height as data populates. Consider skeleton screens or min-height reservations. |
|
||||
| L3 | ✅ Good use of semantic sections on landing | — | The landing page clearly separates hero, value props, capabilities, and footer — good information hierarchy. |
|
||||
|
||||
---
|
||||
|
||||
## 4. Landing Page
|
||||
|
||||
### Current State
|
||||
- Hero section with title, subtitle, CTA buttons, and pricing badge
|
||||
- Value prop grid (4 cards)
|
||||
- Expandable capability list (Code, Create, Think, Serve)
|
||||
- Footer with system status
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| P1 | ⚠️ CTA button hierarchy unclear | Medium | Three CTAs: "TRY NOW →" (primary), "API DOCS" (secondary), "VIEW LEDGER" (ghost). All three are equally prominent in the hero due to similar sizing. The ghost button "VIEW LEDGER" competes with the primary CTA. Consider making the primary button larger or using a distinct glow effect. |
|
||||
| P2 | ⚠️ Pricing badge placement | Low | The "AI tasks from 200 sats" badge sits below the CTAs, easily missed. Moving it above or integrating into the hero subtitle would increase conversion. |
|
||||
| P3 | ⚠️ No social proof or testimonials | Low | No user count, testimonials, or usage statistics. Even a "X tasks completed" counter would build trust. |
|
||||
| P4 | ✅ Clear value proposition | — | The hero copy is concise and immediately communicates the product. "No subscription. No signup. Instant global access." is strong. |
|
||||
|
||||
---
|
||||
|
||||
## 5. Dashboard (Mission Control)
|
||||
|
||||
### Current State
|
||||
- Sidebar with 4 panels: Agents, Emotional Profile, System Health, Daily Run
|
||||
- Main panel: agent chat interface loaded via HTMX
|
||||
- Real-time polling (10s for agents/emotions, 30s for health, 60s for daily run)
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| D1 | ⚠️ No clear "what is this?" for new users | High | The dashboard drops users directly into agent panels with no onboarding or explanation. First-time visitors see "LOADING..." then complex data without context. |
|
||||
| D2 | ⚠️ Emotional Profile panel name | Low | "Emotional Profile" is ambiguous — is it the AI's emotions? The user's? Consider renaming to "Agent Sentiment" or "Timmy's Mood" for clarity. |
|
||||
| D3 | ⚠️ No breadcrumb or back navigation | Medium | Once in the dashboard, there's no clear way to return to the landing page or navigate to other sections. The Gitea nav bar (Code, Issues, etc.) is unrelated to the actual dashboard app. |
|
||||
| D4 | ⚠️ HTMX polling intervals may cause visual jitter | Low | Polling every 10 seconds for agent panels could cause visible content flicker if data changes. Consider diff-based updates or `hx-swap="innerHTML transition:true"`. |
|
||||
|
||||
---
|
||||
|
||||
## 6. CSS Architecture
|
||||
|
||||
### Current State
|
||||
- `style.css` — 33KB, defines CSS variables and base styles
|
||||
- `mission-control.css` — 91KB, page-specific component styles
|
||||
- `static/world/style.css` — separate styles for 3D world
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| S1 | ⚠️ CSS variable duplication | Medium | CSS variables are defined in `style.css` but `mission-control.css` (91KB) doesn't reference them consistently. Some components use hardcoded colors rather than var references. |
|
||||
| S2 | ⚠️ No CSS custom properties in mission-control.css | Low | The grep found zero `--var` definitions in mission-control.css. This means component styles can't benefit from the theming system in style.css. |
|
||||
| S3 | ⚠️ Large monolithic CSS files | Low | Both CSS files are large. Consider splitting into logical modules (layout, components, themes) for maintainability. |
|
||||
|
||||
---
|
||||
|
||||
## 7. Mobile Experience
|
||||
|
||||
### Current State
|
||||
- `base.html` includes mobile PWA meta tags
|
||||
- Separate `mobile-app/` directory with React Native / Expo app
|
||||
- Toast system has mobile breakpoints
|
||||
- 44px touch targets mentioned in README
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| M1 | ⚠️ Two separate mobile experiences | Medium | The mobile-app (Expo/React Native) and mobile web views may have diverged. Users accessing via mobile browser get the desktop layout with minor breakpoints, not the Expo app. |
|
||||
| M2 | ⚠️ Touch targets on dashboard panels | Low | Panel headers and expandable sections may not meet 44px touch targets on mobile. The `lp-cap-chevron` expand arrows are small. |
|
||||
| M3 | ✅ Good mobile meta tags | — | PWA capability, viewport-fit=cover, and theme-color are correctly configured. |
|
||||
|
||||
---
|
||||
|
||||
## 8. Accessibility
|
||||
|
||||
### Findings
|
||||
|
||||
| # | Issue | Severity | Description |
|
||||
|---|-------|----------|-------------|
|
||||
| A1 | ⚠️ Missing ARIA labels on interactive elements | Medium | HTMX panels lack `aria-live="polite"` for dynamic content. Screen readers won't announce when panel data updates. |
|
||||
| A2 | ⚠️ No skip-to-content link | Low | Keyboard-only users must tab through the entire nav to reach main content. |
|
||||
| A3 | ⚠️ Focus styles unclear | Low | Focus-visible styles are not explicitly defined. Users navigating with keyboard may not see which element is focused. |
|
||||
| A4 | ✅ Dark theme good for eye strain | — | The deep purple theme reduces eye strain for extended use. |
|
||||
|
||||
---
|
||||
|
||||
## 9. Recommendations Summary
|
||||
|
||||
### High Priority
|
||||
1. **D1:** Add onboarding/welcome state for the dashboard
|
||||
2. **C2:** Improve `--text-dim` contrast to meet WCAG AA
|
||||
3. **A1:** Add `aria-live` regions for HTMX-polled content
|
||||
|
||||
### Medium Priority
|
||||
4. **T1:** Consider separating font usage — monospace for code, sans-serif for UI
|
||||
5. **L1:** Improve sidebar behavior at medium breakpoints
|
||||
6. **P1:** Clarify CTA button hierarchy on landing page
|
||||
7. **S1:** Unify CSS variable usage across all stylesheets
|
||||
8. **M1:** Reconcile mobile web vs. mobile app experiences
|
||||
|
||||
### Low Priority
|
||||
9. **C1:** Fix `--blue` / `--orange` alias confusion
|
||||
10. **T2:** Define a consistent type scale
|
||||
11. **D2:** Rename "Emotional Profile" for clarity
|
||||
12. **A2:** Add skip-to-content link
|
||||
|
||||
---
|
||||
|
||||
## Visual Evidence
|
||||
|
||||
Screenshots captured during audit:
|
||||
- Gitea repo page (standard Gitea layout, clean and functional)
|
||||
- Color system analysis confirmed via CSS variable extraction
|
||||
|
||||
---
|
||||
|
||||
## Files Analyzed
|
||||
|
||||
- `src/dashboard/templates/base.html` — Base template with dark theme, PWA meta, SEO
|
||||
- `src/dashboard/templates/landing.html` — Landing page with hero, value props, capabilities
|
||||
- `src/dashboard/templates/index.html` — Dashboard main view with HTMX panels
|
||||
- `static/style.css` — 33KB theme definitions and CSS variables
|
||||
- `static/css/mission-control.css` — 91KB component styles
|
||||
- `static/world/index.html` — 3D world interface (separate)
|
||||
- `mobile-app/` — React Native / Expo mobile app
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
283
scripts/capture_nexus_screenshots.py
Normal file
283
scripts/capture_nexus_screenshots.py
Normal file
@@ -0,0 +1,283 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Capture automated screenshots of all primary Nexus zones.
|
||||
|
||||
Part of Epic 1: Visual QA for Nexus World.
|
||||
Uses Selenium + Chrome headless to navigate each dashboard zone and
|
||||
save full-page screenshots for visual audit.
|
||||
|
||||
Usage:
|
||||
# Start the dashboard first (in another terminal):
|
||||
PYTHONPATH=src python3 -m uvicorn dashboard.app:app --host 127.0.0.1 --port 8000
|
||||
|
||||
# Then run this script:
|
||||
python3 scripts/capture_nexus_screenshots.py [--base-url http://127.0.0.1:8000] [--output-dir data/nexus_screenshots]
|
||||
|
||||
Requirements:
|
||||
pip install selenium Pillow
|
||||
Chrome/Chromium browser installed
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.common.exceptions import (
|
||||
TimeoutException,
|
||||
WebDriverException,
|
||||
)
|
||||
|
||||
# ── Primary Nexus Zones ──────────────────────────────────────────────────────
|
||||
# These are the main HTML page routes of the Timmy dashboard.
|
||||
# API endpoints, HTMX partials, and WebSocket routes are excluded.
|
||||
|
||||
PRIMARY_ZONES: list[dict] = [
|
||||
{"path": "/", "name": "landing", "description": "Public landing page"},
|
||||
{"path": "/dashboard", "name": "dashboard", "description": "Main mission control dashboard"},
|
||||
{"path": "/nexus", "name": "nexus", "description": "Nexus conversational awareness space"},
|
||||
{"path": "/agents", "name": "agents", "description": "Agent management panel"},
|
||||
{"path": "/briefing", "name": "briefing", "description": "Daily briefing view"},
|
||||
{"path": "/calm", "name": "calm", "description": "Calm ritual space"},
|
||||
{"path": "/thinking", "name": "thinking", "description": "Thinking engine visualization"},
|
||||
{"path": "/memory", "name": "memory", "description": "Memory system explorer"},
|
||||
{"path": "/tasks", "name": "tasks", "description": "Task management"},
|
||||
{"path": "/experiments", "name": "experiments", "description": "Experiments dashboard"},
|
||||
{"path": "/monitoring", "name": "monitoring", "description": "System monitoring"},
|
||||
{"path": "/tower", "name": "tower", "description": "Tower world view"},
|
||||
{"path": "/tools", "name": "tools", "description": "Tools overview"},
|
||||
{"path": "/voice/settings", "name": "voice-settings", "description": "Voice/TTS settings"},
|
||||
{"path": "/scorecards", "name": "scorecards", "description": "Agent scorecards"},
|
||||
{"path": "/quests", "name": "quests", "description": "Quest tracking"},
|
||||
{"path": "/spark", "name": "spark", "description": "Spark intelligence UI"},
|
||||
{"path": "/self-correction/ui", "name": "self-correction", "description": "Self-correction interface"},
|
||||
{"path": "/energy/report", "name": "energy", "description": "Energy management report"},
|
||||
{"path": "/creative/ui", "name": "creative", "description": "Creative generation UI"},
|
||||
{"path": "/mobile", "name": "mobile", "description": "Mobile companion view"},
|
||||
{"path": "/db-explorer", "name": "db-explorer", "description": "Database explorer"},
|
||||
{"path": "/bugs", "name": "bugs", "description": "Bug tracker"},
|
||||
{"path": "/self-coding", "name": "self-coding", "description": "Self-coding interface"},
|
||||
]
|
||||
|
||||
# ── Defaults ─────────────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_BASE_URL = "http://127.0.0.1:8000"
|
||||
DEFAULT_OUTPUT_DIR = "data/nexus_screenshots"
|
||||
DEFAULT_WIDTH = 1920
|
||||
DEFAULT_HEIGHT = 1080
|
||||
PAGE_LOAD_TIMEOUT = 15 # seconds
|
||||
|
||||
|
||||
def create_driver(width: int, height: int) -> webdriver.Chrome:
|
||||
"""Create a headless Chrome driver with the given viewport size."""
|
||||
options = Options()
|
||||
options.add_argument("--headless=new")
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
options.add_argument("--disable-gpu")
|
||||
options.add_argument(f"--window-size={width},{height}")
|
||||
options.add_argument("--hide-scrollbars")
|
||||
options.add_argument("--force-device-scale-factor=1")
|
||||
|
||||
# Try common Chrome paths
|
||||
chrome_paths = [
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/usr/bin/google-chrome",
|
||||
"/usr/bin/chromium",
|
||||
"/usr/bin/chromium-browser",
|
||||
]
|
||||
|
||||
for path in chrome_paths:
|
||||
if os.path.exists(path):
|
||||
options.binary_location = path
|
||||
break
|
||||
|
||||
driver = webdriver.Chrome(options=options)
|
||||
driver.set_window_size(width, height)
|
||||
return driver
|
||||
|
||||
|
||||
def capture_zone(
|
||||
driver: webdriver.Chrome,
|
||||
base_url: str,
|
||||
zone: dict,
|
||||
output_dir: Path,
|
||||
timeout: int = PAGE_LOAD_TIMEOUT,
|
||||
) -> dict:
|
||||
"""Capture a screenshot of a single Nexus zone.
|
||||
|
||||
Returns a result dict with status, file path, and metadata.
|
||||
"""
|
||||
url = base_url.rstrip("/") + zone["path"]
|
||||
name = zone["name"]
|
||||
screenshot_path = output_dir / f"{name}.png"
|
||||
result = {
|
||||
"zone": name,
|
||||
"path": zone["path"],
|
||||
"url": url,
|
||||
"description": zone["description"],
|
||||
"screenshot": str(screenshot_path),
|
||||
"status": "pending",
|
||||
"error": None,
|
||||
"timestamp": None,
|
||||
}
|
||||
|
||||
try:
|
||||
print(f" Capturing {zone['path']:30s} → {name}...", end=" ", flush=True)
|
||||
driver.get(url)
|
||||
|
||||
# Wait for body to be present (basic page load)
|
||||
try:
|
||||
WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
except TimeoutException:
|
||||
result["status"] = "timeout"
|
||||
result["error"] = f"Page load timed out after {timeout}s"
|
||||
print(f"TIMEOUT ({timeout}s)")
|
||||
return result
|
||||
|
||||
# Additional wait for JS frameworks to render
|
||||
time.sleep(2)
|
||||
|
||||
# Capture full-page screenshot (scroll to capture all content)
|
||||
total_height = driver.execute_script("return document.body.scrollHeight")
|
||||
driver.set_window_size(DEFAULT_WIDTH, max(DEFAULT_HEIGHT, total_height))
|
||||
time.sleep(0.5)
|
||||
|
||||
# Save screenshot
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
driver.save_screenshot(str(screenshot_path))
|
||||
|
||||
# Capture page title for metadata
|
||||
title = driver.title or "(no title)"
|
||||
|
||||
result["status"] = "ok"
|
||||
result["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
result["page_title"] = title
|
||||
result["file_size"] = screenshot_path.stat().st_size if screenshot_path.exists() else 0
|
||||
print(f"OK — {title} ({result['file_size']:,} bytes)")
|
||||
|
||||
except WebDriverException as exc:
|
||||
result["status"] = "error"
|
||||
result["error"] = str(exc)[:200]
|
||||
print(f"ERROR — {str(exc)[:100]}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Capture screenshots of all primary Nexus zones."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-url",
|
||||
default=DEFAULT_BASE_URL,
|
||||
help=f"Dashboard base URL (default: {DEFAULT_BASE_URL})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
default=DEFAULT_OUTPUT_DIR,
|
||||
help=f"Output directory for screenshots (default: {DEFAULT_OUTPUT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=int,
|
||||
default=DEFAULT_WIDTH,
|
||||
help=f"Viewport width (default: {DEFAULT_WIDTH})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=int,
|
||||
default=DEFAULT_HEIGHT,
|
||||
help=f"Viewport height (default: {DEFAULT_HEIGHT})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=int,
|
||||
default=PAGE_LOAD_TIMEOUT,
|
||||
help=f"Page load timeout in seconds (default: {PAGE_LOAD_TIMEOUT})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zones",
|
||||
nargs="*",
|
||||
help="Specific zone names to capture (default: all)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
output_dir = Path(args.output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Filter zones if specific ones requested
|
||||
zones = PRIMARY_ZONES
|
||||
if args.zones:
|
||||
zones = [z for z in PRIMARY_ZONES if z["name"] in args.zones]
|
||||
if not zones:
|
||||
print(f"Error: No matching zones found for: {args.zones}")
|
||||
print(f"Available: {[z['name'] for z in PRIMARY_ZONES]}")
|
||||
return 1
|
||||
|
||||
print(f"Nexus Screenshot Capture")
|
||||
print(f" Base URL: {args.base_url}")
|
||||
print(f" Output dir: {output_dir}")
|
||||
print(f" Viewport: {args.width}x{args.height}")
|
||||
print(f" Zones: {len(zones)}")
|
||||
print()
|
||||
|
||||
# Create driver
|
||||
try:
|
||||
driver = create_driver(args.width, args.height)
|
||||
except WebDriverException as exc:
|
||||
print(f"Failed to create Chrome driver: {exc}")
|
||||
return 1
|
||||
|
||||
results = []
|
||||
try:
|
||||
for zone in zones:
|
||||
result = capture_zone(
|
||||
driver, args.base_url, zone, output_dir, timeout=args.timeout
|
||||
)
|
||||
results.append(result)
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
# Write manifest
|
||||
manifest = {
|
||||
"captured_at": datetime.now(timezone.utc).isoformat(),
|
||||
"base_url": args.base_url,
|
||||
"viewport": {"width": args.width, "height": args.height},
|
||||
"total_zones": len(zones),
|
||||
"ok": sum(1 for r in results if r["status"] == "ok"),
|
||||
"errors": sum(1 for r in results if r["status"] != "ok"),
|
||||
"zones": results,
|
||||
}
|
||||
|
||||
manifest_path = output_dir / "manifest.json"
|
||||
with open(manifest_path, "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
print()
|
||||
print(f"Done! {manifest['ok']}/{manifest['total_zones']} zones captured successfully.")
|
||||
print(f"Manifest: {manifest_path}")
|
||||
|
||||
if manifest["errors"] > 0:
|
||||
print(f"\nFailed zones:")
|
||||
for r in results:
|
||||
if r["status"] != "ok":
|
||||
print(f" {r['zone']:20s} — {r['status']}: {r['error']}")
|
||||
|
||||
return 0 if manifest["errors"] == 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
146
scripts/deploy_verify.py
Normal file
146
scripts/deploy_verify.py
Normal file
@@ -0,0 +1,146 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Deployment Visual Verification
|
||||
==============================
|
||||
|
||||
Post-deployment step that uses vision to verify UI is rendered correctly.
|
||||
Takes screenshots of deployed endpoints and checks for:
|
||||
- Page rendering errors
|
||||
- Missing assets
|
||||
- Layout breaks
|
||||
- Error messages visible
|
||||
- Expected content present
|
||||
|
||||
Usage:
|
||||
python scripts/deploy_verify.py check https://my-app.com
|
||||
python scripts/deploy_verify.py check https://my-app.com --expect "Welcome"
|
||||
python scripts/deploy_verify.py batch urls.txt
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeployCheck:
|
||||
"""A single deployment verification check."""
|
||||
url: str
|
||||
status: str # passed, failed, warning
|
||||
issues: list = field(default_factory=list)
|
||||
screenshot_path: Optional[str] = None
|
||||
expected_content: str = ""
|
||||
timestamp: str = ""
|
||||
|
||||
def summary(self) -> str:
|
||||
emoji = {"passed": "✅", "failed": "❌", "warning": "⚠️"}.get(self.status, "❓")
|
||||
lines = [
|
||||
f"{emoji} {self.url}",
|
||||
f" Checked: {self.timestamp or 'pending'}",
|
||||
]
|
||||
if self.expected_content:
|
||||
lines.append(f" Expected: '{self.expected_content}'")
|
||||
if self.issues:
|
||||
lines.append(" Issues:")
|
||||
for i in self.issues:
|
||||
lines.append(f" - {i}")
|
||||
else:
|
||||
lines.append(" No issues detected")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class DeployVerifier:
|
||||
"""Verifies deployed UI renders correctly using screenshots."""
|
||||
|
||||
def build_check_prompt(self, url: str, expected: str = "") -> dict:
|
||||
"""Build verification prompt for a deployed URL."""
|
||||
expect_clause = ""
|
||||
if expected:
|
||||
expect_clause = f"\n- Verify the text \"{expected}\" is visible on the page"
|
||||
|
||||
prompt = f"""Take a screenshot of {url} and verify the deployment is healthy.
|
||||
|
||||
Check for:
|
||||
- Page loads without errors (no 404, 500, connection refused)
|
||||
- No visible error messages or stack traces
|
||||
- Layout is not broken (elements properly aligned, no overlapping)
|
||||
- Images and assets load correctly (no broken image icons)
|
||||
- Navigation elements are present and clickable{expect_clause}
|
||||
- No "under construction" or placeholder content
|
||||
- Responsive design elements render properly
|
||||
|
||||
Return as JSON:
|
||||
```json
|
||||
{{
|
||||
"status": "passed|failed|warning",
|
||||
"issues": ["list of issues found"],
|
||||
"confidence": 0.9,
|
||||
"page_title": "detected page title",
|
||||
"visible_text_sample": "first 100 chars of visible text"
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return {
|
||||
"url": url,
|
||||
"prompt": prompt,
|
||||
"screenshot_needed": True,
|
||||
"instruction": f"browser_navigate to {url}, take screenshot with browser_vision, analyze with prompt"
|
||||
}
|
||||
|
||||
def verify_deployment(self, url: str, expected: str = "", screenshot_path: str = "") -> DeployCheck:
|
||||
"""Create a deployment verification check."""
|
||||
check = DeployCheck(
|
||||
url=url,
|
||||
status="pending",
|
||||
expected_content=expected,
|
||||
timestamp=datetime.now().isoformat(),
|
||||
screenshot_path=screenshot_path or f"/tmp/deploy_verify_{url.replace('://', '_').replace('/', '_')}.png"
|
||||
)
|
||||
return check
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: deploy_verify.py <check|batch> [args...]")
|
||||
return 1
|
||||
|
||||
verifier = DeployVerifier()
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "check":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: deploy_verify.py check <url> [--expect 'text']")
|
||||
return 1
|
||||
url = sys.argv[2]
|
||||
expected = ""
|
||||
if "--expect" in sys.argv:
|
||||
idx = sys.argv.index("--expect")
|
||||
if idx + 1 < len(sys.argv):
|
||||
expected = sys.argv[idx + 1]
|
||||
|
||||
result = verifier.build_check_prompt(url, expected)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "batch":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: deploy_verify.py batch <urls_file>")
|
||||
return 1
|
||||
urls_file = Path(sys.argv[2])
|
||||
if not urls_file.exists():
|
||||
print(f"File not found: {urls_file}")
|
||||
return 1
|
||||
|
||||
urls = [line.strip() for line in urls_file.read_text().splitlines() if line.strip() and not line.startswith("#")]
|
||||
for url in urls:
|
||||
print(f"\n--- {url} ---")
|
||||
result = verifier.build_check_prompt(url)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
267
scripts/doc_drift_detector.py
Normal file
267
scripts/doc_drift_detector.py
Normal file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Architecture Drift Detector — Multimodal Documentation Synthesis
|
||||
================================================================
|
||||
|
||||
Analyzes architecture diagrams (images) and cross-references them with the
|
||||
actual codebase to identify documentation drift. Uses vision analysis on
|
||||
diagrams and file system analysis on code.
|
||||
|
||||
Usage:
|
||||
python scripts/doc_drift_detector.py --diagram docs/architecture.png --src src/
|
||||
python scripts/doc_drift_detector.py --check-readme # Analyze README diagrams
|
||||
python scripts/doc_drift_detector.py --report # Full drift report
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiagramComponent:
|
||||
"""A component extracted from an architecture diagram via vision analysis."""
|
||||
name: str
|
||||
component_type: str # "service", "module", "database", "api", "agent"
|
||||
description: str = ""
|
||||
connections: list = field(default_factory=list)
|
||||
source: str = "" # "diagram" or "code"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodeComponent:
|
||||
"""A component found in the actual codebase."""
|
||||
name: str
|
||||
path: str
|
||||
component_type: str # "module", "class", "service", "script"
|
||||
imports: list = field(default_factory=list)
|
||||
exports: list = field(default_factory=list)
|
||||
lines_of_code: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class DriftReport:
|
||||
"""Documentation drift analysis results."""
|
||||
diagram_components: list = field(default_factory=list)
|
||||
code_components: list = field(default_factory=list)
|
||||
missing_from_code: list = field(default_factory=list) # In diagram but not code
|
||||
missing_from_docs: list = field(default_factory=list) # In code but not diagram
|
||||
connections_drift: list = field(default_factory=list) # Connection mismatches
|
||||
confidence: float = 0.0
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [
|
||||
"=== Architecture Drift Report ===",
|
||||
f"Diagram components: {len(self.diagram_components)}",
|
||||
f"Code components: {len(self.code_components)}",
|
||||
f"Missing from code (diagram-only): {len(self.missing_from_code)}",
|
||||
f"Missing from docs (code-only): {len(self.missing_from_docs)}",
|
||||
f"Connection drift issues: {len(self.connections_drift)}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
"",
|
||||
]
|
||||
if self.missing_from_code:
|
||||
lines.append("⚠️ In diagram but NOT found in code:")
|
||||
for c in self.missing_from_code:
|
||||
lines.append(f" - {c.name} ({c.component_type})")
|
||||
lines.append("")
|
||||
if self.missing_from_docs:
|
||||
lines.append("📝 In code but NOT in diagram:")
|
||||
for c in self.missing_from_docs:
|
||||
lines.append(f" - {c.name} at {c.path}")
|
||||
lines.append("")
|
||||
if self.connections_drift:
|
||||
lines.append("🔗 Connection drift:")
|
||||
for c in self.connections_drift:
|
||||
lines.append(f" - {c}")
|
||||
if not self.missing_from_code and not self.missing_from_docs and not self.connections_drift:
|
||||
lines.append("✅ No significant drift detected!")
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"diagram_components": [vars(c) for c in self.diagram_components],
|
||||
"code_components": [vars(c) for c in self.code_components],
|
||||
"missing_from_code": [vars(c) for c in self.missing_from_code],
|
||||
"missing_from_docs": [vars(c) for c in self.missing_from_docs],
|
||||
"connections_drift": self.connections_drift,
|
||||
"confidence": self.confidence
|
||||
}
|
||||
|
||||
|
||||
class ArchitectureDriftDetector:
|
||||
"""Detects drift between architecture diagrams and actual code."""
|
||||
|
||||
def __init__(self, src_dir: str = "src"):
|
||||
self.src_dir = Path(src_dir)
|
||||
|
||||
def analyze_diagram(self, diagram_path: str) -> list:
|
||||
"""
|
||||
Extract components from an architecture diagram.
|
||||
Returns prompt for vision analysis — actual analysis done by calling agent.
|
||||
"""
|
||||
prompt = f"""Analyze this architecture diagram and extract all components.
|
||||
|
||||
For each component, identify:
|
||||
- Name (as shown in diagram)
|
||||
- Type (service, module, database, api, agent, frontend, etc.)
|
||||
- Connections to other components
|
||||
- Any version numbers or labels
|
||||
|
||||
Return as JSON array:
|
||||
```json
|
||||
[
|
||||
{{"name": "ComponentName", "type": "service", "connections": ["OtherComponent"]}}
|
||||
]
|
||||
```
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def scan_codebase(self) -> list:
|
||||
"""Scan the codebase to find actual components/modules."""
|
||||
components = []
|
||||
|
||||
if not self.src_dir.exists():
|
||||
return components
|
||||
|
||||
# Scan Python modules
|
||||
for py_file in self.src_dir.rglob("*.py"):
|
||||
if py_file.name.startswith("_") and py_file.name != "__init__.py":
|
||||
continue
|
||||
name = py_file.stem
|
||||
if name == "__init__":
|
||||
name = py_file.parent.name
|
||||
|
||||
# Count lines
|
||||
try:
|
||||
content = py_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("#")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
# Extract imports
|
||||
imports = re.findall(r"^from\s+(\S+)\s+import|^import\s+(\S+)", content, re.MULTILINE)
|
||||
import_list = [i[0] or i[1] for i in imports]
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(py_file.relative_to(self.src_dir.parent)),
|
||||
component_type="module",
|
||||
imports=import_list[:10], # Top 10
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan JavaScript/TypeScript
|
||||
for ext in ["*.js", "*.ts", "*.tsx"]:
|
||||
for js_file in self.src_dir.rglob(ext):
|
||||
name = js_file.stem
|
||||
try:
|
||||
content = js_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("//")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(js_file.relative_to(self.src_dir.parent.parent if "mobile-app" in str(js_file) else self.src_dir.parent)),
|
||||
component_type="module",
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan config and scripts
|
||||
for ext in ["*.yaml", "*.yml", "*.json", "*.sh", "*.bash"]:
|
||||
for cfg in Path(".").rglob(ext):
|
||||
if ".git" in str(cfg) or "node_modules" in str(cfg):
|
||||
continue
|
||||
components.append(CodeComponent(
|
||||
name=cfg.stem,
|
||||
path=str(cfg),
|
||||
component_type="config"
|
||||
))
|
||||
|
||||
return components
|
||||
|
||||
def detect_drift(
|
||||
self,
|
||||
diagram_components: list,
|
||||
code_components: list
|
||||
) -> DriftReport:
|
||||
"""Compare diagram components against codebase."""
|
||||
report = DriftReport()
|
||||
report.diagram_components = diagram_components
|
||||
report.code_components = code_components
|
||||
|
||||
# Normalize names for matching
|
||||
def normalize(name):
|
||||
return re.sub(r'[^a-z0-9]', '', name.lower())
|
||||
|
||||
code_names = {normalize(c.name): c for c in code_components}
|
||||
diagram_names = {normalize(c.name): c for c in diagram_components}
|
||||
|
||||
# Find diagram-only components
|
||||
for norm_name, dc in diagram_names.items():
|
||||
if norm_name not in code_names:
|
||||
# Check partial matches
|
||||
partial = [code_names[k] for k in code_names if norm_name in k or k in norm_name]
|
||||
if not partial:
|
||||
report.missing_from_code.append(dc)
|
||||
|
||||
# Find code-only components (significant ones only)
|
||||
for norm_name, cc in code_names.items():
|
||||
if norm_name not in diagram_names and cc.lines_of_code > 50:
|
||||
report.missing_from_docs.append(cc)
|
||||
|
||||
# Confidence based on match rate
|
||||
if diagram_components:
|
||||
matched = len(diagram_components) - len(report.missing_from_code)
|
||||
report.confidence = matched / len(diagram_components)
|
||||
else:
|
||||
report.confidence = 0.5 # No diagram to compare
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Architecture Drift Detector")
|
||||
parser.add_argument("--diagram", help="Path to architecture diagram image")
|
||||
parser.add_argument("--src", default="src", help="Source directory to scan")
|
||||
parser.add_argument("--report", action="store_true", help="Generate full report")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
detector = ArchitectureDriftDetector(args.src)
|
||||
|
||||
if args.diagram:
|
||||
print(f"Diagram analysis prompt (use with vision_analyze tool):")
|
||||
print(detector.analyze_diagram(args.diagram))
|
||||
print()
|
||||
|
||||
if args.report or not args.diagram:
|
||||
print("Scanning codebase...")
|
||||
code_components = detector.scan_codebase()
|
||||
print(f"Found {len(code_components)} components")
|
||||
|
||||
if args.json:
|
||||
print(json.dumps([vars(c) for c in code_components], indent=2))
|
||||
else:
|
||||
# Show top components by LOC
|
||||
by_loc = sorted(code_components, key=lambda c: c.lines_of_code, reverse=True)[:20]
|
||||
print("\nTop components by lines of code:")
|
||||
for c in by_loc:
|
||||
print(f" {c.lines_of_code:5} {c.path}")
|
||||
|
||||
# Generate drift report with empty diagram (code-only analysis)
|
||||
report = detector.detect_drift([], code_components)
|
||||
print(f"\n{report.summary()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
189
scripts/visual_log_analyzer.py
Normal file
189
scripts/visual_log_analyzer.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Visual Log Analyzer — System Health Screenshot Analysis
|
||||
========================================================
|
||||
|
||||
Analyzes screenshots of system monitoring dashboards (htop, Grafana,
|
||||
CloudWatch, etc.) to detect anomalies in resource usage patterns.
|
||||
|
||||
Usage:
|
||||
python scripts/visual_log_analyzer.py analyze /tmp/htop_screenshot.png
|
||||
python scripts/visual_log_analyzer.py batch /tmp/monitor_screenshots/
|
||||
python scripts/visual_log_analyzer.py compare before.png after.png
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResourceAnomaly:
|
||||
"""An anomaly detected in a system monitoring screenshot."""
|
||||
resource: str # cpu, memory, disk, network, process
|
||||
severity: str # critical, warning, info
|
||||
description: str
|
||||
value: Optional[str] = None
|
||||
threshold: Optional[str] = None
|
||||
recommendation: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthAnalysis:
|
||||
"""Result of analyzing a system health screenshot."""
|
||||
timestamp: str
|
||||
screenshot_path: str
|
||||
overall_status: str # healthy, warning, critical
|
||||
anomalies: list = field(default_factory=list)
|
||||
metrics: dict = field(default_factory=dict)
|
||||
confidence: float = 0.0
|
||||
raw_analysis: str = ""
|
||||
|
||||
def summary(self) -> str:
|
||||
status_emoji = {"healthy": "✅", "warning": "⚠️", "critical": "🔴"}.get(self.overall_status, "❓")
|
||||
lines = [
|
||||
f"{status_emoji} System Health: {self.overall_status.upper()}",
|
||||
f"Analyzed: {self.timestamp}",
|
||||
f"Screenshot: {self.screenshot_path}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
""
|
||||
]
|
||||
if self.anomalies:
|
||||
lines.append("Anomalies detected:")
|
||||
for a in self.anomalies:
|
||||
emoji = {"critical": "🔴", "warning": "🟡", "info": "ℹ️"}.get(a.severity, "")
|
||||
lines.append(f" {emoji} [{a.resource}] {a.description}")
|
||||
if a.recommendation:
|
||||
lines.append(f" → {a.recommendation}")
|
||||
else:
|
||||
lines.append("No anomalies detected.")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class VisualLogAnalyzer:
|
||||
"""Analyzes system monitoring screenshots for anomalies."""
|
||||
|
||||
def analyze_screenshot(self, screenshot_path: str, monitor_type: str = "auto") -> dict:
|
||||
"""
|
||||
Build analysis prompt for a system monitoring screenshot.
|
||||
|
||||
Args:
|
||||
screenshot_path: Path to screenshot
|
||||
monitor_type: "htop", "grafana", "cloudwatch", "docker", "auto"
|
||||
|
||||
Returns:
|
||||
Dict with analysis prompt for vision model
|
||||
"""
|
||||
prompt = f"""Analyze this system monitoring screenshot ({monitor_type}) and detect anomalies.
|
||||
|
||||
Check for:
|
||||
- CPU usage above 80% sustained
|
||||
- Memory usage above 85%
|
||||
- Disk usage above 90%
|
||||
- Unusual process names or high-PID processes consuming resources
|
||||
- Network traffic spikes
|
||||
- Load average anomalies
|
||||
- Zombie processes
|
||||
- Swap usage
|
||||
|
||||
For each anomaly found, report:
|
||||
- Resource type (cpu, memory, disk, network, process)
|
||||
- Severity (critical, warning, info)
|
||||
- Current value and threshold
|
||||
- Recommended action
|
||||
|
||||
Also extract overall metrics:
|
||||
- CPU usage %
|
||||
- Memory usage %
|
||||
- Disk usage %
|
||||
- Top 3 processes by resource use
|
||||
- Load average
|
||||
|
||||
Return as JSON:
|
||||
```json
|
||||
{{
|
||||
"overall_status": "healthy|warning|critical",
|
||||
"metrics": {{"cpu_pct": 45, "memory_pct": 62}},
|
||||
"anomalies": [
|
||||
{{"resource": "cpu", "severity": "warning", "description": "...", "value": "85%", "threshold": "80%", "recommendation": "..."}}
|
||||
],
|
||||
"confidence": 0.85
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"screenshot_path": screenshot_path,
|
||||
"monitor_type": monitor_type,
|
||||
"instruction": "Use vision_analyze tool with this prompt"
|
||||
}
|
||||
|
||||
def compare_screenshots(self, before_path: str, after_path: str) -> dict:
|
||||
"""Compare two monitoring screenshots to detect changes."""
|
||||
prompt = f"""Compare these two system monitoring screenshots taken at different times.
|
||||
|
||||
Before: {before_path}
|
||||
After: {after_path}
|
||||
|
||||
Identify:
|
||||
- Resources that increased significantly
|
||||
- New processes that appeared
|
||||
- Processes that disappeared
|
||||
- Overall health trend (improving, stable, degrading)
|
||||
|
||||
Return analysis as JSON with trend assessment.
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"before": before_path,
|
||||
"after": after_path,
|
||||
"instruction": "Use vision_analyze for each screenshot, then compare results"
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: visual_log_analyzer.py <analyze|batch|compare> [args...]")
|
||||
return 1
|
||||
|
||||
analyzer = VisualLogAnalyzer()
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "analyze":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py analyze <screenshot> [monitor_type]")
|
||||
return 1
|
||||
path = sys.argv[2]
|
||||
mtype = sys.argv[3] if len(sys.argv) > 3 else "auto"
|
||||
result = analyzer.analyze_screenshot(path, mtype)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "compare":
|
||||
if len(sys.argv) < 4:
|
||||
print("Usage: visual_log_analyzer.py compare <before.png> <after.png>")
|
||||
return 1
|
||||
result = analyzer.compare_screenshots(sys.argv[2], sys.argv[3])
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "batch":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py batch <screenshot_dir>")
|
||||
return 1
|
||||
dirpath = Path(sys.argv[2])
|
||||
if not dirpath.is_dir():
|
||||
print(f"Not a directory: {dirpath}")
|
||||
return 1
|
||||
for img in sorted(dirpath.glob("*.png")):
|
||||
print(f"\n--- {img.name} ---")
|
||||
result = analyzer.analyze_screenshot(str(img))
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
289
scripts/visual_state_verifier.py
Normal file
289
scripts/visual_state_verifier.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""
|
||||
Visual State Verification Module for Game Agents
|
||||
=================================================
|
||||
|
||||
Provides screenshot-based environmental state verification for game agents
|
||||
(Morrowind, Minecraft, or any game with a screenshot API). Uses multimodal
|
||||
analysis to confirm agent expectations match actual game state.
|
||||
|
||||
Usage:
|
||||
from scripts.visual_state_verifier import VisualStateVerifier
|
||||
|
||||
verifier = VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/game_screenshot.png",
|
||||
expected_state={"location": "Balmora", "health_above": 50, "has_weapon": True},
|
||||
context="Player should be in Balmora with a weapon equipped"
|
||||
)
|
||||
print(result.verified) # True/False
|
||||
print(result.details) # Human-readable analysis
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class VerificationStatus(Enum):
|
||||
"""Status of a visual state verification."""
|
||||
VERIFIED = "verified"
|
||||
FAILED = "failed"
|
||||
UNCERTAIN = "uncertain"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VerificationResult:
|
||||
"""Result of a visual state verification."""
|
||||
status: VerificationStatus
|
||||
verified: bool
|
||||
confidence: float # 0.0 - 1.0
|
||||
details: str
|
||||
expected: dict
|
||||
observed: dict = field(default_factory=dict)
|
||||
mismatches: list = field(default_factory=list)
|
||||
screenshot_path: Optional[str] = None
|
||||
|
||||
|
||||
class VisualStateVerifier:
|
||||
"""
|
||||
Verifies game state by analyzing screenshots against expected conditions.
|
||||
|
||||
Supports any game that can produce screenshots. Designed for integration
|
||||
with MCP screenshot tools and vision analysis capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self, vision_backend: str = "builtin"):
|
||||
"""
|
||||
Args:
|
||||
vision_backend: "builtin" for MCP vision, "ollama" for local model
|
||||
"""
|
||||
self.vision_backend = vision_backend
|
||||
|
||||
def verify_state(
|
||||
self,
|
||||
screenshot_path: str,
|
||||
expected_state: dict,
|
||||
context: str = "",
|
||||
game: str = "generic"
|
||||
) -> VerificationResult:
|
||||
"""
|
||||
Verify a game screenshot matches expected state conditions.
|
||||
|
||||
Args:
|
||||
screenshot_path: Path to the screenshot file
|
||||
expected_state: Dict of expected conditions, e.g.:
|
||||
{
|
||||
"location": "Balmora",
|
||||
"health_above": 50,
|
||||
"has_weapon": True,
|
||||
"time_of_day": "day",
|
||||
"nearby_npcs": ["Caius Cosades"]
|
||||
}
|
||||
context: Additional context for the vision model
|
||||
game: Game name for context ("morrowind", "minecraft", "generic")
|
||||
|
||||
Returns:
|
||||
VerificationResult with status, confidence, and details
|
||||
"""
|
||||
if not Path(screenshot_path).exists():
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.ERROR,
|
||||
verified=False,
|
||||
confidence=0.0,
|
||||
details=f"Screenshot not found: {screenshot_path}",
|
||||
expected=expected_state,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
|
||||
# Build verification prompt
|
||||
prompt = self._build_prompt(expected_state, context, game)
|
||||
|
||||
# Analyze screenshot
|
||||
analysis = self._analyze_screenshot(screenshot_path, prompt)
|
||||
|
||||
# Parse results
|
||||
return self._parse_analysis(analysis, expected_state, screenshot_path)
|
||||
|
||||
def _build_prompt(self, expected: dict, context: str, game: str) -> str:
|
||||
"""Build a structured verification prompt for the vision model."""
|
||||
conditions = []
|
||||
for key, value in expected.items():
|
||||
if isinstance(value, bool):
|
||||
conditions.append(f"- {key}: {'yes' if value else 'no'}")
|
||||
elif isinstance(value, (int, float)):
|
||||
conditions.append(f"- {key}: {value} or better")
|
||||
elif isinstance(value, list):
|
||||
conditions.append(f"- {key}: should include {', '.join(str(v) for v in value)}")
|
||||
else:
|
||||
conditions.append(f"- {key}: {value}")
|
||||
|
||||
prompt = f"""Analyze this {game} game screenshot and verify the following conditions:
|
||||
|
||||
{chr(10).join(conditions)}
|
||||
|
||||
Context: {context if context else 'No additional context provided.'}
|
||||
|
||||
For each condition, state VERIFIED, FAILED, or UNCERTAIN with a brief reason.
|
||||
End with a JSON block:
|
||||
```json
|
||||
{{
|
||||
"verified": true/false,
|
||||
"confidence": 0.0-1.0,
|
||||
"details": "brief summary",
|
||||
"mismatches": ["list of failed conditions"]
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def _analyze_screenshot(self, path: str, prompt: str) -> str:
|
||||
"""
|
||||
Send screenshot to vision backend for analysis.
|
||||
|
||||
In a live agent context, this would call the MCP vision tool.
|
||||
For standalone use, it returns the prompt for manual invocation.
|
||||
"""
|
||||
# Return structured prompt for the calling agent to process
|
||||
return json.dumps({
|
||||
"prompt": prompt,
|
||||
"screenshot_path": str(path),
|
||||
"instruction": "Use vision_analyze tool with this prompt and screenshot_path"
|
||||
})
|
||||
|
||||
def _parse_analysis(
|
||||
self, analysis: str, expected: dict, screenshot_path: str
|
||||
) -> VerificationResult:
|
||||
"""Parse vision analysis into a VerificationResult."""
|
||||
try:
|
||||
data = json.loads(analysis)
|
||||
if "instruction" in data:
|
||||
# Not yet analyzed - return pending
|
||||
preview = data["prompt"][:100].replace("\n", " ")
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.UNCERTAIN,
|
||||
verified=False,
|
||||
confidence=0.0,
|
||||
details=(
|
||||
"Pending analysis. Run vision_analyze on "
|
||||
f"{data['screenshot_path']} with prompt: {preview}..."
|
||||
),
|
||||
expected=expected,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Parse text analysis for JSON block
|
||||
import re
|
||||
json_match = re.search(r"```json\s*({.*?})\s*```", analysis, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
result = json.loads(json_match.group(1))
|
||||
status = VerificationStatus.VERIFIED if result.get("verified") else VerificationStatus.FAILED
|
||||
return VerificationResult(
|
||||
status=status,
|
||||
verified=result.get("verified", False),
|
||||
confidence=result.get("confidence", 0.0),
|
||||
details=result.get("details", ""),
|
||||
expected=expected,
|
||||
mismatches=result.get("mismatches", []),
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Fallback: return as uncertain
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.UNCERTAIN,
|
||||
verified=False,
|
||||
confidence=0.3,
|
||||
details=analysis[:500],
|
||||
expected=expected,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def morrowind_state(
|
||||
location: Optional[str] = None,
|
||||
health_min: Optional[int] = None,
|
||||
has_weapon: Optional[bool] = None,
|
||||
is_indoors: Optional[bool] = None,
|
||||
time_of_day: Optional[str] = None,
|
||||
nearby_npcs: Optional[list] = None,
|
||||
**extra
|
||||
) -> dict:
|
||||
"""Build expected state dict for Morrowind."""
|
||||
state = {}
|
||||
if location:
|
||||
state["location"] = location
|
||||
if health_min is not None:
|
||||
state["health_above"] = health_min
|
||||
if has_weapon is not None:
|
||||
state["has_weapon"] = has_weapon
|
||||
if is_indoors is not None:
|
||||
state["indoors"] = is_indoors
|
||||
if time_of_day:
|
||||
state["time_of_day"] = time_of_day
|
||||
if nearby_npcs:
|
||||
state["nearby_npcs"] = nearby_npcs
|
||||
state.update(extra)
|
||||
return state
|
||||
|
||||
|
||||
# --- Example Verification Flows ---
|
||||
|
||||
EXAMPLE_MORROWIND_VERIFICATION = """
|
||||
# Verify player is in Balmora with a weapon
|
||||
verifier = VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/morrowind_screenshot.png",
|
||||
expected_state=VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True
|
||||
),
|
||||
context="After completing the first Caius Cosades quest",
|
||||
game="morrowind"
|
||||
)
|
||||
|
||||
if result.verified:
|
||||
print(f"State confirmed: {result.details}")
|
||||
else:
|
||||
print(f"State mismatch: {result.mismatches}")
|
||||
"""
|
||||
|
||||
EXAMPLE_BATCH_VERIFICATION = """
|
||||
# Verify multiple game states in sequence
|
||||
states = [
|
||||
{"screenshot": "screen1.png", "expected": {"location": "Seyda Neen"}, "context": "After character creation"},
|
||||
{"screenshot": "screen2.png", "expected": {"location": "Balmora", "has_weapon": True}, "context": "After buying weapon"},
|
||||
{"screenshot": "screen3.png", "expected": {"health_above": 80}, "context": "After resting"},
|
||||
]
|
||||
|
||||
verifier = VisualStateVerifier()
|
||||
for state in states:
|
||||
result = verifier.verify_state(**state, game="morrowind")
|
||||
print(f"{state['context']}: {'PASS' if result.verified else 'FAIL'} (confidence: {result.confidence:.0%})")
|
||||
"""
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Demo: build and display a verification prompt
|
||||
verifier = VisualStateVerifier()
|
||||
expected = verifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True,
|
||||
nearby_npcs=["Caius Cosades"]
|
||||
)
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/demo_screenshot.png",
|
||||
expected_state=expected,
|
||||
context="Player should have completed the first quest",
|
||||
game="morrowind"
|
||||
)
|
||||
print(result.details)
|
||||
@@ -3,6 +3,8 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.multimodal import (
|
||||
DEFAULT_FALLBACK_CHAINS,
|
||||
KNOWN_MODEL_CAPABILITIES,
|
||||
@@ -10,11 +12,14 @@ from infrastructure.models.multimodal import (
|
||||
ModelInfo,
|
||||
MultiModalManager,
|
||||
get_model_for_capability,
|
||||
get_multimodal_manager,
|
||||
model_supports_tools,
|
||||
model_supports_vision,
|
||||
pull_model_with_fallback,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ModelCapability enum
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -511,3 +516,41 @@ class TestModelInfoPopulation:
|
||||
assert info.is_pulled is True
|
||||
assert info.size_mb == 4 * 1024 # 4 GiB in MiB
|
||||
assert info.description == "test"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _pull_model — non-200 status branch (lines 480-481)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPullModelNon200:
|
||||
def test_pull_non_200_returns_false(self):
|
||||
mgr = _make_manager([])
|
||||
pull_resp = MagicMock()
|
||||
pull_resp.__enter__ = MagicMock(return_value=pull_resp)
|
||||
pull_resp.__exit__ = MagicMock(return_value=False)
|
||||
pull_resp.status = 500 # Non-200 response
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=pull_resp):
|
||||
assert mgr._pull_model("some-model:1b") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_multimodal_manager singleton (line 552)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetMultimodalManager:
|
||||
def test_creates_singleton(self):
|
||||
with (
|
||||
patch("infrastructure.models.multimodal._multimodal_manager", None),
|
||||
patch("urllib.request.urlopen", side_effect=ConnectionError("no ollama")),
|
||||
):
|
||||
mgr = get_multimodal_manager()
|
||||
assert isinstance(mgr, MultiModalManager)
|
||||
|
||||
def test_returns_existing_singleton(self):
|
||||
sentinel = _make_manager(None)
|
||||
with patch("infrastructure.models.multimodal._multimodal_manager", sentinel):
|
||||
mgr = get_multimodal_manager()
|
||||
assert mgr is sentinel
|
||||
|
||||
56
tests/scripts/test_multimodal_scripts.py
Normal file
56
tests/scripts/test_multimodal_scripts.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Unit tests for multimodal helper scripts."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import scripts.doc_drift_detector as drift
|
||||
import scripts.visual_log_analyzer as logs
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
def test_scan_codebase_finds_python_and_config(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
src = tmp_path / "src"
|
||||
src.mkdir()
|
||||
(src / "alpha.py").write_text(
|
||||
"import json\n\n\ndef do_work():\n return json.dumps({'ok': True})\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
(tmp_path / "settings.yml").write_text("enabled: true\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.chdir(tmp_path)
|
||||
detector = drift.ArchitectureDriftDetector(str(src))
|
||||
components = detector.scan_codebase()
|
||||
|
||||
alpha = next(c for c in components if c.name == "alpha")
|
||||
assert alpha.path == "src/alpha.py"
|
||||
assert alpha.component_type == "module"
|
||||
assert alpha.lines_of_code >= 2
|
||||
assert any(c.path.endswith("settings.yml") and c.component_type == "config" for c in components)
|
||||
|
||||
|
||||
def test_detect_drift_matches_normalized_component_names() -> None:
|
||||
detector = drift.ArchitectureDriftDetector("src")
|
||||
diagram = [drift.DiagramComponent(name="Alpha Service", component_type="service")]
|
||||
code = [drift.CodeComponent(name="alpha_service", path="src/alpha_service.py", component_type="module", lines_of_code=75)]
|
||||
|
||||
report = detector.detect_drift(diagram, code)
|
||||
|
||||
assert report.missing_from_code == []
|
||||
assert report.missing_from_docs == []
|
||||
assert report.confidence == 1.0
|
||||
|
||||
|
||||
def test_visual_log_analyzer_builds_prompts() -> None:
|
||||
analyzer = logs.VisualLogAnalyzer()
|
||||
|
||||
analyze = analyzer.analyze_screenshot("/tmp/htop.png", "htop")
|
||||
assert analyze["screenshot_path"] == "/tmp/htop.png"
|
||||
assert analyze["monitor_type"] == "htop"
|
||||
assert "CPU usage above 80%" in analyze["prompt"]
|
||||
assert analyze["instruction"] == "Use vision_analyze tool with this prompt"
|
||||
|
||||
compare = analyzer.compare_screenshots("before.png", "after.png")
|
||||
assert compare["before"] == "before.png"
|
||||
assert compare["after"] == "after.png"
|
||||
assert "Overall health trend" in compare["prompt"] or "Overall health trend".lower() in compare["prompt"].lower()
|
||||
114
tests/test_visual_state_verifier.py
Normal file
114
tests/test_visual_state_verifier.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""Unit tests for scripts.visual_state_verifier."""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
import scripts.visual_state_verifier as vsv
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
def test_missing_screenshot_returns_error() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/nonexistent/screenshot.png",
|
||||
expected_state={"location": "Balmora"},
|
||||
game="morrowind",
|
||||
)
|
||||
assert result.status == vsv.VerificationStatus.ERROR
|
||||
assert not result.verified
|
||||
assert "not found" in result.details.lower()
|
||||
|
||||
|
||||
def test_morrowind_state_builder() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True,
|
||||
nearby_npcs=["Caius Cosades"],
|
||||
)
|
||||
assert state["location"] == "Balmora"
|
||||
assert state["health_above"] == 50
|
||||
assert state["has_weapon"] is True
|
||||
assert state["nearby_npcs"] == ["Caius Cosades"]
|
||||
|
||||
|
||||
def test_morrowind_state_minimal() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(location="Vivec")
|
||||
assert state == {"location": "Vivec"}
|
||||
|
||||
|
||||
def test_morrowind_state_with_extras() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
quest_complete=True,
|
||||
gold_min=1000,
|
||||
)
|
||||
assert state["quest_complete"] is True
|
||||
assert state["gold_min"] == 1000
|
||||
|
||||
|
||||
def test_prompt_includes_conditions() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
expected = {"location": "Balmora", "health_above": 50}
|
||||
prompt = verifier._build_prompt(expected, "Test context", "morrowind")
|
||||
assert "Balmora" in prompt
|
||||
assert "50" in prompt
|
||||
assert "Test context" in prompt
|
||||
assert "morrowind" in prompt
|
||||
|
||||
|
||||
def test_parse_analysis_returns_pending_for_raw() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
raw_analysis = json.dumps(
|
||||
{
|
||||
"prompt": "test prompt",
|
||||
"screenshot_path": "/tmp/test.png",
|
||||
"instruction": "Use vision_analyze",
|
||||
}
|
||||
)
|
||||
result = verifier._parse_analysis(raw_analysis, {}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.UNCERTAIN
|
||||
assert not result.verified
|
||||
assert "Pending analysis" in result.details
|
||||
assert "/tmp/test.png" in result.details
|
||||
|
||||
|
||||
def test_parse_analysis_extracts_json() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
analysis = """
|
||||
The player appears to be in Balmora.
|
||||
Health looks good.
|
||||
|
||||
```json
|
||||
{
|
||||
"verified": true,
|
||||
"confidence": 0.85,
|
||||
"details": "Player is in Balmora with weapon equipped",
|
||||
"mismatches": []
|
||||
}
|
||||
```
|
||||
"""
|
||||
result = verifier._parse_analysis(analysis, {"location": "Balmora"}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.VERIFIED
|
||||
assert result.verified
|
||||
assert result.confidence == 0.85
|
||||
assert result.mismatches == []
|
||||
|
||||
|
||||
def test_parse_analysis_handles_failures() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
analysis = """
|
||||
```json
|
||||
{
|
||||
"verified": false,
|
||||
"confidence": 0.9,
|
||||
"details": "Player is not in Balmora",
|
||||
"mismatches": ["location"]
|
||||
}
|
||||
```
|
||||
"""
|
||||
result = verifier._parse_analysis(analysis, {"location": "Balmora"}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.FAILED
|
||||
assert not result.verified
|
||||
assert "location" in result.mismatches
|
||||
496
tests/timmy/test_tools_registry.py
Normal file
496
tests/timmy/test_tools_registry.py
Normal file
@@ -0,0 +1,496 @@
|
||||
"""Comprehensive unit tests for timmy.tools._registry.
|
||||
|
||||
Covers:
|
||||
- _register_* helpers (web_fetch, search, core, grok, memory, agentic_loop,
|
||||
introspection, delegation, gematria, artifact, thinking)
|
||||
- create_full_toolkit factory
|
||||
- create_experiment_tools factory
|
||||
- AGENT_TOOLKITS registry & get_tools_for_agent
|
||||
- Backward-compat aliases
|
||||
- Tool catalog functions (_core, _analysis, _ai, _introspection, _experiment)
|
||||
- _import_creative_catalogs / _merge_catalog
|
||||
- get_all_available_tools
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# All functions under test
|
||||
from timmy.tools._registry import (
|
||||
AGENT_TOOLKITS,
|
||||
PERSONA_TOOLKITS,
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_create_stub_toolkit,
|
||||
_experiment_tool_catalog,
|
||||
_import_creative_catalogs,
|
||||
_introspection_tool_catalog,
|
||||
_merge_catalog,
|
||||
_register_artifact_tools,
|
||||
_register_core_tools,
|
||||
_register_delegation_tools,
|
||||
_register_gematria_tool,
|
||||
_register_grok_tool,
|
||||
_register_introspection_tools,
|
||||
_register_memory_tools,
|
||||
_register_search_tools,
|
||||
_register_thinking_tools,
|
||||
_register_web_fetch_tool,
|
||||
create_experiment_tools,
|
||||
create_full_toolkit,
|
||||
get_all_available_tools,
|
||||
get_tools_for_agent,
|
||||
get_tools_for_persona,
|
||||
)
|
||||
|
||||
# import_module is used inside _merge_catalog as a local import
|
||||
from importlib import import_module as _real_import_module
|
||||
|
||||
# _register_agentic_loop_tool may fail to import if conftest stubs interfere
|
||||
try:
|
||||
from timmy.tools._registry import _register_agentic_loop_tool
|
||||
except ImportError:
|
||||
_register_agentic_loop_tool = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_toolkit():
|
||||
"""A mock Toolkit with a register method that records calls."""
|
||||
tk = MagicMock()
|
||||
tk.name = "test"
|
||||
tk.registered_tools = {}
|
||||
|
||||
def _register(func, name=None):
|
||||
tk.registered_tools[name or func.__name__] = func
|
||||
|
||||
tk.register = MagicMock(side_effect=_register)
|
||||
return tk
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _register_* helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegisterWebFetchTool:
|
||||
def test_registers_web_fetch(self, mock_toolkit):
|
||||
_register_web_fetch_tool(mock_toolkit)
|
||||
mock_toolkit.register.assert_called_once()
|
||||
assert "web_fetch" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
mock_toolkit.register.side_effect = RuntimeError("boom")
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
_register_web_fetch_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterSearchTools:
|
||||
def test_registers_both_tools(self, mock_toolkit):
|
||||
_register_search_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 2
|
||||
assert "web_search" in mock_toolkit.registered_tools
|
||||
assert "scrape_url" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
mock_toolkit.register.side_effect = RuntimeError("fail")
|
||||
with pytest.raises(RuntimeError):
|
||||
_register_search_tools(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterCoreTools:
|
||||
@patch("timmy.tools._registry.FileTools")
|
||||
@patch("timmy.tools._registry.ShellTools")
|
||||
@patch("timmy.tools._registry.PythonTools")
|
||||
@patch("timmy.tools._registry._make_smart_read_file")
|
||||
def test_registers_core_tools(self, mock_smart_read, mock_py, mock_sh, mock_ft, mock_toolkit):
|
||||
mock_smart_read.return_value = lambda: "read"
|
||||
_register_core_tools(mock_toolkit, Path("/tmp/test"))
|
||||
# python, shell, read_file, write_file, list_files, calculator = 6
|
||||
assert mock_toolkit.register.call_count == 6
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"python", "shell", "read_file", "write_file", "list_files", "calculator"} == names
|
||||
|
||||
|
||||
class TestRegisterGrokTool:
|
||||
@patch("timmy.tools._registry.consult_grok")
|
||||
def test_registers_when_available(self, mock_grok, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: True)}):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
assert "consult_grok" in mock_toolkit.registered_tools
|
||||
|
||||
@patch("timmy.tools._registry.consult_grok")
|
||||
def test_skips_when_unavailable(self, mock_grok, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: False)}):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
assert "consult_grok" not in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterMemoryTools:
|
||||
def test_registers_four_tools(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}):
|
||||
_register_memory_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 4
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"memory_search", "memory_write", "memory_read", "memory_forget"} == names
|
||||
|
||||
|
||||
@pytest.mark.skipif(_register_agentic_loop_tool is None, reason="agentic_loop not importable")
|
||||
class TestRegisterAgenticLoopTool:
|
||||
def test_registers_plan_and_execute(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.agentic_loop": mock_mod}):
|
||||
_register_agentic_loop_tool(mock_toolkit)
|
||||
assert "plan_and_execute" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.agentic_loop": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_agentic_loop_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterIntrospectionTools:
|
||||
def test_registers_all_introspection(self, mock_toolkit):
|
||||
mock_intro = MagicMock()
|
||||
mock_mcp = MagicMock()
|
||||
mock_session = MagicMock()
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"timmy.tools_intro": mock_intro,
|
||||
"timmy.mcp_tools": mock_mcp,
|
||||
"timmy.session_logger": mock_session,
|
||||
},
|
||||
):
|
||||
_register_introspection_tools(mock_toolkit)
|
||||
# 4 intro + 1 avatar + 2 session = 7
|
||||
assert mock_toolkit.register.call_count == 7
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert "get_system_info" in names
|
||||
assert "check_ollama_health" in names
|
||||
assert "update_gitea_avatar" in names
|
||||
assert "session_history" in names
|
||||
assert "self_reflect" in names
|
||||
|
||||
|
||||
class TestRegisterDelegationTools:
|
||||
def test_registers_three_tools(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.tools_delegation": mock_mod}):
|
||||
_register_delegation_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 3
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"delegate_task", "delegate_to_kimi", "list_swarm_agents"} == names
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.tools_delegation": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_delegation_tools(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterGematriaTool:
|
||||
def test_registers_gematria(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.gematria": mock_mod}):
|
||||
_register_gematria_tool(mock_toolkit)
|
||||
assert "gematria" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.gematria": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_gematria_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterArtifactTools:
|
||||
def test_registers_jot_and_log(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}):
|
||||
_register_artifact_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 2
|
||||
assert "jot_note" in mock_toolkit.registered_tools
|
||||
assert "log_decision" in mock_toolkit.registered_tools
|
||||
|
||||
|
||||
class TestRegisterThinkingTools:
|
||||
def test_registers_thought_search(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.thinking": mock_mod}):
|
||||
_register_thinking_tools(mock_toolkit)
|
||||
assert "thought_search" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.thinking": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_thinking_tools(mock_toolkit)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Toolkit factories
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateFullToolkit:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_returns_none_without_agno(self):
|
||||
result = create_full_toolkit()
|
||||
assert result is None
|
||||
|
||||
@patch("timmy.tools._registry._register_thinking_tools")
|
||||
@patch("timmy.tools._registry._register_artifact_tools")
|
||||
@patch("timmy.tools._registry._register_gematria_tool")
|
||||
@patch("timmy.tools._registry._register_delegation_tools")
|
||||
@patch("timmy.tools._registry._register_introspection_tools")
|
||||
@patch("timmy.tools._registry._register_agentic_loop_tool")
|
||||
@patch("timmy.tools._registry._register_memory_tools")
|
||||
@patch("timmy.tools._registry._register_grok_tool")
|
||||
@patch("timmy.tools._registry._register_search_tools")
|
||||
@patch("timmy.tools._registry._register_web_fetch_tool")
|
||||
@patch("timmy.tools._registry._register_core_tools")
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_calls_all_register_helpers(
|
||||
self,
|
||||
mock_core,
|
||||
mock_web,
|
||||
mock_search,
|
||||
mock_grok,
|
||||
mock_memory,
|
||||
mock_agentic,
|
||||
mock_intro,
|
||||
mock_deleg,
|
||||
mock_gematria,
|
||||
mock_artifact,
|
||||
mock_thinking,
|
||||
):
|
||||
mock_settings = MagicMock(repo_root="/tmp/test")
|
||||
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
|
||||
with patch("timmy.tools._registry.Toolkit") as MockTK:
|
||||
mock_tk_inst = MagicMock()
|
||||
MockTK.return_value = mock_tk_inst
|
||||
with patch.dict(
|
||||
"sys.modules", {"timmy.tool_safety": MagicMock(DANGEROUS_TOOLS=["shell"])}
|
||||
):
|
||||
result = create_full_toolkit()
|
||||
|
||||
assert result is mock_tk_inst
|
||||
mock_core.assert_called_once()
|
||||
mock_web.assert_called_once()
|
||||
mock_search.assert_called_once()
|
||||
mock_grok.assert_called_once()
|
||||
mock_memory.assert_called_once()
|
||||
mock_agentic.assert_called_once()
|
||||
mock_intro.assert_called_once()
|
||||
mock_deleg.assert_called_once()
|
||||
mock_gematria.assert_called_once()
|
||||
mock_artifact.assert_called_once()
|
||||
mock_thinking.assert_called_once()
|
||||
|
||||
|
||||
class TestCreateExperimentTools:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_raises_without_agno(self):
|
||||
with pytest.raises(ImportError, match="Agno tools not available"):
|
||||
create_experiment_tools()
|
||||
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_creates_experiment_toolkit(self):
|
||||
mock_settings = MagicMock(
|
||||
repo_root="/tmp/test",
|
||||
autoresearch_workspace="workspace",
|
||||
autoresearch_time_budget=300,
|
||||
autoresearch_metric="loss",
|
||||
)
|
||||
mock_autoresearch = MagicMock()
|
||||
with (
|
||||
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
|
||||
patch.dict("sys.modules", {"timmy.autoresearch": mock_autoresearch}),
|
||||
patch("timmy.tools._registry.Toolkit") as MockTK,
|
||||
patch("timmy.tools._registry.ShellTools"),
|
||||
patch("timmy.tools._registry.FileTools"),
|
||||
patch("timmy.tools._registry._make_smart_read_file", return_value=lambda: None),
|
||||
):
|
||||
mock_tk = MagicMock()
|
||||
MockTK.return_value = mock_tk
|
||||
result = create_experiment_tools()
|
||||
|
||||
assert result is mock_tk
|
||||
# prepare_experiment, run_experiment, evaluate_result, shell, read_file, write_file, list_files = 7
|
||||
assert mock_tk.register.call_count == 7
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent toolkit registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAgentToolkitRegistry:
|
||||
def test_agent_toolkits_has_expected_agents(self):
|
||||
expected = {"echo", "mace", "helm", "seer", "forge", "quill", "lab", "pixel", "lyra", "reel"}
|
||||
assert set(AGENT_TOOLKITS.keys()) == expected
|
||||
|
||||
def test_persona_toolkits_is_alias(self):
|
||||
assert PERSONA_TOOLKITS is AGENT_TOOLKITS
|
||||
|
||||
def test_get_tools_for_persona_is_alias(self):
|
||||
assert get_tools_for_persona is get_tools_for_agent
|
||||
|
||||
|
||||
class TestGetToolsForAgent:
|
||||
def test_unknown_agent_returns_none(self):
|
||||
result = get_tools_for_agent("nonexistent_agent_xyz")
|
||||
assert result is None
|
||||
|
||||
def test_stub_agents_return_toolkit(self):
|
||||
"""Pixel, lyra, reel use stub toolkits."""
|
||||
for agent_id in ("pixel", "lyra", "reel"):
|
||||
result = get_tools_for_agent(agent_id)
|
||||
# May be None if agno not available, or a Toolkit stub
|
||||
# Just verify no exception is raised
|
||||
assert result is None or hasattr(result, "name")
|
||||
|
||||
|
||||
class TestCreateStubToolkit:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_returns_none_without_agno(self):
|
||||
assert _create_stub_toolkit("test") is None
|
||||
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_creates_named_toolkit(self):
|
||||
with patch("timmy.tools._registry.Toolkit") as MockTK:
|
||||
mock_tk = MagicMock()
|
||||
MockTK.return_value = mock_tk
|
||||
result = _create_stub_toolkit("pixel")
|
||||
MockTK.assert_called_once_with(name="pixel")
|
||||
assert result is mock_tk
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool catalog functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolCatalogs:
|
||||
def test_core_catalog_has_expected_tools(self):
|
||||
cat = _core_tool_catalog()
|
||||
assert isinstance(cat, dict)
|
||||
assert {"shell", "python", "read_file", "write_file", "list_files"} == set(cat.keys())
|
||||
for tool_id, info in cat.items():
|
||||
assert "name" in info
|
||||
assert "description" in info
|
||||
assert "available_in" in info
|
||||
assert isinstance(info["available_in"], list)
|
||||
|
||||
def test_analysis_catalog(self):
|
||||
cat = _analysis_tool_catalog()
|
||||
assert {"calculator", "web_fetch", "web_search", "scrape_url"} == set(cat.keys())
|
||||
|
||||
def test_ai_catalog(self):
|
||||
cat = _ai_tool_catalog()
|
||||
assert "consult_grok" in cat
|
||||
assert "aider" in cat
|
||||
|
||||
def test_introspection_catalog(self):
|
||||
cat = _introspection_tool_catalog()
|
||||
expected = {
|
||||
"get_system_info",
|
||||
"check_ollama_health",
|
||||
"get_memory_status",
|
||||
"session_history",
|
||||
"thought_search",
|
||||
"self_reflect",
|
||||
"update_gitea_avatar",
|
||||
}
|
||||
assert expected == set(cat.keys())
|
||||
|
||||
def test_experiment_catalog(self):
|
||||
cat = _experiment_tool_catalog()
|
||||
assert {"prepare_experiment", "run_experiment", "evaluate_result"} == set(cat.keys())
|
||||
|
||||
def test_all_catalogs_have_consistent_schema(self):
|
||||
"""Every catalog entry must have name, description, available_in."""
|
||||
for fn in (
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_introspection_tool_catalog,
|
||||
_experiment_tool_catalog,
|
||||
):
|
||||
cat = fn()
|
||||
for tool_id, info in cat.items():
|
||||
assert isinstance(info.get("name"), str), f"{tool_id} missing 'name'"
|
||||
assert isinstance(info.get("description"), str), f"{tool_id} missing 'description'"
|
||||
assert isinstance(info.get("available_in"), list), f"{tool_id} missing 'available_in'"
|
||||
|
||||
|
||||
class TestMergeCatalog:
|
||||
def test_merges_catalog_entries(self):
|
||||
catalog = {}
|
||||
mock_mod = MagicMock()
|
||||
mock_mod.TEST_CATALOG = {
|
||||
"tool_a": {"name": "Tool A", "description": "Does A"},
|
||||
"tool_b": {"name": "Tool B", "description": "Does B"},
|
||||
}
|
||||
with patch("importlib.import_module", return_value=mock_mod):
|
||||
_merge_catalog(catalog, "fake.module", "TEST_CATALOG", ["pixel", "orchestrator"])
|
||||
assert "tool_a" in catalog
|
||||
assert catalog["tool_a"]["available_in"] == ["pixel", "orchestrator"]
|
||||
assert catalog["tool_b"]["name"] == "Tool B"
|
||||
|
||||
def test_handles_import_error_gracefully(self):
|
||||
catalog = {}
|
||||
with patch("importlib.import_module", side_effect=ImportError("nope")):
|
||||
# Should NOT raise — just logs and skips
|
||||
_merge_catalog(catalog, "missing.module", "CATALOG", [])
|
||||
assert catalog == {}
|
||||
|
||||
|
||||
class TestImportCreativeCatalogs:
|
||||
def test_calls_merge_for_each_source(self):
|
||||
catalog = {}
|
||||
with patch("timmy.tools._registry._merge_catalog") as mock_merge:
|
||||
_import_creative_catalogs(catalog)
|
||||
# Should be called once per _CREATIVE_CATALOG_SOURCES entry (6 sources)
|
||||
assert mock_merge.call_count == 6
|
||||
|
||||
|
||||
class TestGetAllAvailableTools:
|
||||
def test_returns_merged_catalog(self):
|
||||
catalog = get_all_available_tools()
|
||||
assert isinstance(catalog, dict)
|
||||
# Must contain core tools at minimum
|
||||
assert "shell" in catalog
|
||||
assert "calculator" in catalog
|
||||
assert "web_search" in catalog
|
||||
assert "consult_grok" in catalog
|
||||
assert "get_system_info" in catalog
|
||||
assert "prepare_experiment" in catalog
|
||||
|
||||
def test_no_duplicate_keys(self):
|
||||
"""Each sub-catalog shouldn't override another's keys."""
|
||||
catalog = get_all_available_tools()
|
||||
# Count total keys from individual catalogs
|
||||
individual = {}
|
||||
for fn in (
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_introspection_tool_catalog,
|
||||
_experiment_tool_catalog,
|
||||
):
|
||||
for k in fn():
|
||||
assert k not in individual, f"Duplicate key '{k}' across catalogs"
|
||||
individual[k] = True
|
||||
Reference in New Issue
Block a user