Compare commits
1 Commits
docs/archi
...
gemma4-wor
| Author | SHA1 | Date | |
|---|---|---|---|
| d735b550f7 |
170
ARCHITECTURE.md
170
ARCHITECTURE.md
@@ -1,170 +0,0 @@
|
||||
# Architecture
|
||||
|
||||
High-level system design of the Hermes/Timmy sovereign AI agent framework.
|
||||
|
||||
## Layers
|
||||
|
||||
The system has three layers, top to bottom:
|
||||
|
||||
```
|
||||
SOUL.md (Bitcoin) Immutable moral framework, on-chain inscription
|
||||
|
|
||||
~/.timmy/ (Sovereign) Identity, specs, papers, evolution tracking
|
||||
|
|
||||
~/.hermes/ (Operational) Running agent, profiles, skills, cron, sessions
|
||||
|
|
||||
Fleet (VPS Agents) Ezra, Bezalel, Allegro — remote workers, Gitea, Ansible
|
||||
```
|
||||
|
||||
## Core Components
|
||||
|
||||
### Agent Loop (run_agent.py)
|
||||
|
||||
Synchronous, tool-call driven conversation loop. The AIAgent class manages:
|
||||
- API call budget with iteration tracking
|
||||
- Context compression (automatic when window fills)
|
||||
- Checkpoint system (max 50 snapshots)
|
||||
- Trajectory saving for training
|
||||
- Tool use enforcement for models that describe tools instead of calling them
|
||||
|
||||
```
|
||||
while api_call_count < max_iterations:
|
||||
response = LLM(messages, tools)
|
||||
if response.tool_calls:
|
||||
for call in response.tool_calls:
|
||||
result = handle(call)
|
||||
messages.append(result)
|
||||
else:
|
||||
return response.content
|
||||
```
|
||||
|
||||
### Tool System
|
||||
|
||||
Central singleton registry with 47 static tools across 21+ toolsets, plus dynamic MCP tools.
|
||||
|
||||
Key mechanisms:
|
||||
- **Approval system** — manual/smart/off modes, dangerous command detection
|
||||
- **Composite toolsets** — e.g., debugging = terminal + web + file
|
||||
- **Subagent delegation** — isolated contexts, max depth 2, max 3 concurrent
|
||||
- **Mixture of Agents** — routes through 4+ frontier LLMs, synthesizes responses
|
||||
- **Terminal backends** — local, docker, ssh, modal, daytona, singularity
|
||||
|
||||
### Gateway (Multi-Platform)
|
||||
|
||||
25 messaging platform adapters in `gateway/run.py` (8,852 lines):
|
||||
|
||||
telegram, discord, slack, whatsapp, homeassistant, signal, matrix,
|
||||
mattermost, dingtalk, feishu, wecom, weixin, sms, email, webhook,
|
||||
bluebubbles, + API server
|
||||
|
||||
Each platform has its own adapter implementing BasePlatformAdapter.
|
||||
|
||||
### Profiles
|
||||
|
||||
15+ named agent configurations in `~/.hermes/profiles/<name>/`. Each profile is self-contained:
|
||||
- Own config.yaml, SOUL.md, skills/, auth.json
|
||||
- Own state.db, memory_store.db, sessions/
|
||||
- Isolated credentials and tool access
|
||||
|
||||
### Cron Integration
|
||||
|
||||
File-based lock scheduler, gateway calls tick() every 60 seconds.
|
||||
- Jobs in `~/.hermes/cron/jobs.json`
|
||||
- Supports SILENT_MARKER for no-news suppression
|
||||
- Delivery to 15 platforms auto-resolved from origin
|
||||
|
||||
### Context Compression
|
||||
|
||||
ContextCompressor with 5-step pipeline:
|
||||
1. Prune old tool results (cheap)
|
||||
2. Protect head messages (system prompt + first exchange)
|
||||
3. Protect tail by token budget (~20K tokens)
|
||||
4. Summarize middle turns with auxiliary LLM
|
||||
5. Iterative summary updates on subsequent compactions
|
||||
|
||||
### Auxiliary Client Router
|
||||
|
||||
Multi-provider resolution chain with automatic fallback:
|
||||
- Text: OpenRouter → Nous Portal → Custom → Codex OAuth → Anthropic → Direct providers
|
||||
- Vision: Selected provider → OpenRouter → Nous Portal → Codex → Anthropic → Custom
|
||||
- Auto-fallback on 402/credit-exhaustion
|
||||
|
||||
## Data Flow
|
||||
|
||||
```
|
||||
User Message
|
||||
|
|
||||
v
|
||||
Gateway (platform adapter)
|
||||
|
|
||||
v
|
||||
Session Store (SQLite, state.db)
|
||||
|
|
||||
v
|
||||
Agent Loop (run_agent.py)
|
||||
|
|
||||
+---> Tool Registry (47 tools + MCP)
|
||||
| |
|
||||
| +---> Terminal (local/docker/ssh/modal)
|
||||
| +---> File System
|
||||
| +---> Web (search, browse, scrape)
|
||||
| +---> Memory (holographic, fact_store)
|
||||
| +---> Subagents (delegated, isolated)
|
||||
|
|
||||
+---> Auxiliary Client (vision, compression, search)
|
||||
|
|
||||
+---> Context Compressor (if window full)
|
||||
|
|
||||
v
|
||||
Response → Gateway → Platform → User
|
||||
```
|
||||
|
||||
## SOUL.md → Architecture Mapping
|
||||
|
||||
| SOUL.md Value | Architectural Mechanism |
|
||||
|------------------------|------------------------------------------------|
|
||||
| Sovereignty | Local-first, no phone-home, forkable code |
|
||||
| Service | Tool system, multi-platform gateway |
|
||||
| Honesty | Source distinction, refusal over fabrication |
|
||||
| Humility | Small-model support, graceful degradation |
|
||||
| Courage | Crisis detection, dark content handling |
|
||||
| Silence | SILENT_MARKER in cron, brevity defaults |
|
||||
| When a Man Is Dying | Crisis protocol integration, 988 routing |
|
||||
|
||||
## External Dependencies
|
||||
|
||||
| Component | Dependency | Sovereignty Posture |
|
||||
|------------------------|-------------------|------------------------------|
|
||||
| LLM Inference | OpenRouter/Nous | Fallback to local Ollama |
|
||||
| Vision | Provider chain | Local Gemma 3 available |
|
||||
| Messaging | Platform APIs | 25 adapters, no lock-in |
|
||||
| Storage | SQLite (local) | Full control |
|
||||
| Deployment | Ansible (local) | Sovereign, no cloud CI |
|
||||
| Source Control | Gitea (self-host) | Full control |
|
||||
|
||||
## Novel Contributions
|
||||
|
||||
1. **On-Chain Soul** — Moral framework inscribed on Bitcoin as immutable conscience. Values as permanent, forkable inscription rather than mutable system prompt.
|
||||
|
||||
2. **Poka-Yoke Guardrails** — Five lightweight runtime guardrails eliminating entire failure categories (1,400+ failures prevented). Paper-ready for NeurIPS/ICML.
|
||||
|
||||
3. **Sovereign Fleet Architecture** — Declarative deployment for heterogeneous agent fleets. 45min manual → 47s automated with Ansible pipeline.
|
||||
|
||||
4. **Source Distinction** — Three-tier provenance tagging (retrieved/generated/mixed) for epistemic honesty in LLM outputs.
|
||||
|
||||
5. **Refusal Over Fabrication** — Detecting and preventing ungrounded hedging in LLM responses.
|
||||
|
||||
## What's Undocumented
|
||||
|
||||
Known documentation gaps (opportunities for future work):
|
||||
- Profiles system (creation, isolation guarantees)
|
||||
- Skills Hub registry protocol
|
||||
- Fleet routing logic
|
||||
- Checkpoint system mechanics
|
||||
- Per-profile credential isolation
|
||||
|
||||
---
|
||||
|
||||
*For detailed code-level analysis, see [hermes-agent-architecture-report.md](hermes-agent-architecture-report.md).*
|
||||
|
||||
*Sovereignty and service always.*
|
||||
131
CONTRIBUTING.md
131
CONTRIBUTING.md
@@ -1,131 +0,0 @@
|
||||
# CONTRIBUTING.md
|
||||
|
||||
How to contribute to Timmy Time Mission Control.
|
||||
|
||||
## Philosophy
|
||||
|
||||
Read SOUL.md first. Timmy is a sovereignty project — every contribution should
|
||||
strengthen the user's control over their own AI, never weaken it.
|
||||
|
||||
Key values:
|
||||
- Useful first, philosophical second
|
||||
- Honesty over confidence
|
||||
- Sovereignty over convenience
|
||||
- Lines of code are a liability — delete as much as you create
|
||||
|
||||
## Getting Started
|
||||
|
||||
1. Fork the repo
|
||||
2. Clone your fork
|
||||
3. Set up the dev environment:
|
||||
|
||||
```bash
|
||||
make install # creates .venv + installs deps
|
||||
source .venv/bin/activate
|
||||
```
|
||||
|
||||
See INSTALLATION.md for full prerequisites.
|
||||
|
||||
## Development Workflow
|
||||
|
||||
### Branch Naming
|
||||
|
||||
```
|
||||
fix/<description> — bug fixes
|
||||
feat/<description> — new features
|
||||
refactor/<description> — refactors
|
||||
docs/<description> — documentation
|
||||
```
|
||||
|
||||
### Running Tests
|
||||
|
||||
```bash
|
||||
tox -e unit # fast unit tests (~17s)
|
||||
tox -e lint # code quality gate
|
||||
tox -e format # auto-format code
|
||||
tox -e pre-push # full CI mirror before pushing
|
||||
```
|
||||
|
||||
See TESTING.md for the full test matrix.
|
||||
|
||||
### Code Style
|
||||
|
||||
- Python 3.11+
|
||||
- Formatting: ruff (auto-enforced via tox -e format)
|
||||
- No inline CSS in HTML templates
|
||||
- Type hints encouraged but not required
|
||||
- Docstrings for public functions
|
||||
|
||||
### Commit Messages
|
||||
|
||||
Use conventional commits:
|
||||
|
||||
```
|
||||
fix: correct dashboard loading state (#123)
|
||||
feat: add crisis detection module (#456)
|
||||
refactor: simplify memory store queries (#789)
|
||||
docs: update installation guide (#101)
|
||||
test: add unit tests for sovereignty module (#102)
|
||||
chore: update dependencies
|
||||
```
|
||||
|
||||
Always reference the issue number when applicable.
|
||||
|
||||
## Pull Request Process
|
||||
|
||||
1. Create a feature branch from `main`
|
||||
2. Make your changes
|
||||
3. Run `tox -e pre-push` — must pass before you push
|
||||
4. Push your branch and open a PR
|
||||
5. PR title: tag with description and issue number
|
||||
6. Wait for CI to pass
|
||||
7. Squash merge only — no merge commits
|
||||
|
||||
**Never:**
|
||||
- Push directly to main
|
||||
- Use `--no-verify` on git commands
|
||||
- Merge without CI passing
|
||||
- Include credentials or secrets in code
|
||||
|
||||
## Reporting Bugs
|
||||
|
||||
1. Check existing issues first
|
||||
2. File a new issue with:
|
||||
- Clear title
|
||||
- Steps to reproduce
|
||||
- Expected vs actual behavior
|
||||
- Environment info (OS, Python version)
|
||||
- Relevant logs or screenshots
|
||||
|
||||
Label with `[bug]`.
|
||||
|
||||
## Proposing Features
|
||||
|
||||
1. Check existing issues and SOUL.md
|
||||
2. File an issue with:
|
||||
- Problem statement
|
||||
- Proposed solution
|
||||
- How it aligns with SOUL.md values
|
||||
- Acceptance criteria
|
||||
|
||||
Label with `[feature]` or `[timmy-capability]`.
|
||||
|
||||
## AI Agent Contributions
|
||||
|
||||
This repo includes multi-agent development (see AGENTS.md):
|
||||
|
||||
- Human contributors: follow this guide
|
||||
- AI agents (Claude, Kimi, etc.): follow AGENTS.md
|
||||
- All code must pass the same test gate regardless of author
|
||||
|
||||
## Questions?
|
||||
|
||||
- Read SOUL.md for philosophy
|
||||
- Read IMPLEMENTATION.md for architecture
|
||||
- Read AGENTS.md for AI agent standards
|
||||
- File an issue for anything unclear
|
||||
|
||||
## License
|
||||
|
||||
By contributing, you agree your contributions will be licensed under the
|
||||
same license as the project (see LICENSE).
|
||||
@@ -1,61 +0,0 @@
|
||||
# Installation
|
||||
|
||||
This repository is a documentation and analysis project — no runtime dependencies to install. You just need a way to read Markdown.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Git (any recent version)
|
||||
- A Markdown viewer (any text editor, GitHub, or a local preview tool)
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Clone the repository
|
||||
git clone https://forge.alexanderwhitestone.com/Rockachopa/Timmy-time-dashboard.git
|
||||
cd Timmy-time-dashboard
|
||||
|
||||
# Read the docs
|
||||
cat README.md
|
||||
```
|
||||
|
||||
## Repository Contents
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `README.md` | Overview and key findings |
|
||||
| `hermes-agent-architecture-report.md` | Full architecture analysis |
|
||||
| `failure_root_causes.md` | Root cause analysis of 2,160 errors |
|
||||
| `complete_test_report.md` | Test results and findings |
|
||||
| `deep_analysis_addendum.md` | Additional analysis |
|
||||
| `experiment-framework.md` | Experiment methodology |
|
||||
| `experiment_log.md` | Experiment execution log |
|
||||
| `paper_outline.md` | Academic paper outline |
|
||||
| `CONTRIBUTING.md` | How to contribute |
|
||||
| `CHANGELOG.md` | Version history |
|
||||
|
||||
## Optional: Building the Paper
|
||||
|
||||
The `paper/` directory contains a LaTeX draft. To build it:
|
||||
|
||||
```bash
|
||||
cd paper
|
||||
pdflatex main.tex
|
||||
```
|
||||
|
||||
Requires a LaTeX distribution (TeX Live, MiKTeX, or MacTeX).
|
||||
|
||||
## Optional: Running the Experiments
|
||||
|
||||
If you want to reproduce the empirical audit against a live Hermes Agent instance:
|
||||
|
||||
1. Set up a Hermes Agent deployment (see [hermes-agent](https://github.com/nousresearch/hermes-agent))
|
||||
2. Point the experiment scripts at your instance
|
||||
3. See `experiment-framework.md` for methodology
|
||||
|
||||
## No Dependencies
|
||||
|
||||
This project has no `requirements.txt`, `package.json`, or build system. It is pure documentation. The analysis was performed against a running Hermes Agent system, and the findings are recorded here for reference.
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
@@ -1,35 +0,0 @@
|
||||
# Gemma 4 Multimodal Backlog
|
||||
|
||||
## Epic 1: Visual QA for Nexus World
|
||||
- **Goal:** Use Gemma 4's vision to audit screenshots of the Three.js Nexus world for layout inconsistencies and UI bugs.
|
||||
- **Tasks:**
|
||||
- [x] Capture automated screenshots of all primary Nexus zones.
|
||||
- [ ] Analyze images for clipping, overlapping UI elements, and lighting glitches.
|
||||
- [ ] Generate a structured bug report with coordinates and suggested fixes.
|
||||
|
||||
## Epic 2: The Testament Visual Consistency Audit
|
||||
- **Goal:** Ensure the generated image assets for The Testament align with the narrative mood and visual manifest.
|
||||
- **Tasks:**
|
||||
- [ ] Compare generated assets against `visual_manifest.json` descriptions.
|
||||
- [ ] Flag images that diverge from the "Cinematic Noir, 35mm, high contrast" aesthetic.
|
||||
- [ ] Refine prompts for divergent beats and trigger re-renders.
|
||||
|
||||
## Epic 3: Sovereign Heart Emotive Stillness
|
||||
- **Goal:** Develop a system for selecting the most emotive static image based on the sentiment of generated TTS.
|
||||
- **Tasks:**
|
||||
- [ ] Analyze TTS output for emotional valence and arousal.
|
||||
- [ ] Map sentiment kernels to the visual asset library.
|
||||
- [ ] Implement a "breathing" transition logic between assets for an expressive presence.
|
||||
|
||||
## Epic 4: Multimodal Architecture Synthesis
|
||||
- **Goal:** Extract and synthesize architectural patterns from visual research papers.
|
||||
- **Tasks:**
|
||||
- [ ] Ingest PDF research papers on agentic workflows.
|
||||
- [ ] Analyze diagrams and charts to extract structural logic.
|
||||
- [ ] Synthesize findings into `Sovereign_Knowledge_Graph.md`.
|
||||
|
||||
## General Tasks
|
||||
|
||||
- [x] **Task 1:** Add Gemma 4 entries to `KNOWN_MODEL_CAPABILITIES` and vision fallback chain in `src/infrastructure/models/multimodal.py`. Gemma 4 is a multimodal model supporting vision, text, tools, JSON, and streaming. ✅ PR #1493
|
||||
- [x] **Task 3:** Add a `ModelCapability.VIDEO` enum member for future video understanding models. ✅ PR #1494
|
||||
- [ ] **Task 4:** Implement `get_model_for_content("video")` routing with appropriate fallback chain.
|
||||
154
TESTING.md
154
TESTING.md
@@ -1,154 +0,0 @@
|
||||
# TESTING.md
|
||||
|
||||
How to run tests, what each suite covers, and how to add new tests.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Run the fast unit tests (recommended for development)
|
||||
tox -e unit
|
||||
|
||||
# Run all tests except slow/external
|
||||
tox -e fast
|
||||
|
||||
# Auto-format code before committing
|
||||
tox -e format
|
||||
|
||||
# Lint check (CI gate)
|
||||
tox -e lint
|
||||
|
||||
# Full CI mirror (lint + coverage)
|
||||
tox -e pre-push
|
||||
```
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.11+
|
||||
- `tox` installed (`pip install tox`)
|
||||
- Ollama running locally (only for `tox -e ollama` tests)
|
||||
|
||||
All test dependencies are installed automatically by tox. No manual `pip install` needed.
|
||||
|
||||
## Tox Environments
|
||||
|
||||
| Command | Purpose | Speed | What It Runs |
|
||||
|---------|---------|-------|--------------|
|
||||
| `tox -e unit` | Fast unit tests | ~17s | `@pytest.mark.unit` tests, parallel, excludes ollama/docker/selenium/external |
|
||||
| `tox -e integration` | Integration tests | Medium | `@pytest.mark.integration` tests, may use SQLite |
|
||||
| `tox -e functional` | Functional tests | Slow | Real HTTP requests, no mocking |
|
||||
| `tox -e e2e` | End-to-end tests | Slowest | Full system tests |
|
||||
| `tox -e fast` | Unit + integration | ~30s | Combined, no e2e/functional/external |
|
||||
| `tox -e ollama` | Live LLM tests | Variable | Requires running Ollama instance |
|
||||
| `tox -e lint` | Code quality gate | Fast | ruff check + format check + inline CSS check |
|
||||
| `tox -e format` | Auto-format | Fast | ruff fix + ruff format |
|
||||
| `tox -e typecheck` | Type checking | Medium | mypy static analysis |
|
||||
| `tox -e ci` | Full CI suite | Slow | Coverage + JUnit XML output |
|
||||
| `tox -e pre-push` | Pre-push gate | Medium | lint + full CI (mirrors Gitea Actions) |
|
||||
| `tox -e benchmark` | Performance regression | Variable | Agent performance benchmarks |
|
||||
|
||||
## Test Markers
|
||||
|
||||
Tests are organized with pytest markers defined in `pyproject.toml`:
|
||||
|
||||
- `unit` - Fast unit tests, no I/O, no external dependencies
|
||||
- `integration` - May use SQLite databases, file I/O
|
||||
- `functional` - Real HTTP requests against test servers
|
||||
- `e2e` - Full system end-to-end tests
|
||||
- `dashboard` - Dashboard route tests
|
||||
- `slow` - Tests taking >1 second
|
||||
- `ollama` - Requires live Ollama instance
|
||||
- `docker` - Requires Docker
|
||||
- `selenium` - Requires browser automation
|
||||
- `external_api` - Requires external API access
|
||||
- `skip_ci` - Skipped in CI
|
||||
|
||||
Mark your tests in the test file:
|
||||
|
||||
```python
|
||||
import pytest
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_something():
|
||||
assert True
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_with_database():
|
||||
# Uses SQLite or file I/O
|
||||
pass
|
||||
```
|
||||
|
||||
## Test Directory Structure
|
||||
|
||||
```
|
||||
tests/
|
||||
unit/ - Fast unit tests
|
||||
integration/ - Integration tests (SQLite, file I/O)
|
||||
functional/ - Real HTTP tests
|
||||
e2e/ - End-to-end system tests
|
||||
conftest.py - Shared fixtures
|
||||
```
|
||||
|
||||
## Writing New Tests
|
||||
|
||||
1. Place your test in the appropriate directory (tests/unit/, tests/integration/, etc.)
|
||||
2. Use the correct marker (@pytest.mark.unit, @pytest.mark.integration, etc.)
|
||||
3. Test file names must start with `test_`
|
||||
4. Use fixtures from conftest.py for common setup
|
||||
|
||||
### Example
|
||||
|
||||
```python
|
||||
# tests/unit/test_my_feature.py
|
||||
import pytest
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestMyFeature:
|
||||
def test_basic_behavior(self):
|
||||
result = my_function("input")
|
||||
assert result == "expected"
|
||||
|
||||
def test_edge_case(self):
|
||||
with pytest.raises(ValueError):
|
||||
my_function(None)
|
||||
```
|
||||
|
||||
### Environment Variables
|
||||
|
||||
The test suite sets these automatically via tox:
|
||||
|
||||
- `TIMMY_TEST_MODE=1` - Enables test mode in the application
|
||||
- `TIMMY_DISABLE_CSRF=1` - Disables CSRF protection for test requests
|
||||
- `TIMMY_SKIP_EMBEDDINGS=1` - Skips embedding generation (slow)
|
||||
|
||||
## Git Hooks
|
||||
|
||||
Pre-commit and pre-push hooks run tests automatically:
|
||||
|
||||
- **Pre-commit**: `tox -e format` then `tox -e unit`
|
||||
- **Pre-push**: `tox -e pre-push` (lint + full CI)
|
||||
|
||||
Never use `--no-verify` on commits or pushes.
|
||||
|
||||
## CI Pipeline
|
||||
|
||||
Gitea Actions runs on every push and PR:
|
||||
|
||||
1. **Lint**: `tox -e lint` - code quality gate
|
||||
2. **Unit tests**: `tox -e unit` - fast feedback
|
||||
3. **Integration tests**: `tox -e integration`
|
||||
4. **Coverage**: `tox -e ci` - generates coverage.xml
|
||||
|
||||
The CI fails if:
|
||||
- Any lint check fails
|
||||
- Any test fails
|
||||
- Coverage drops below the threshold (see `pyproject.toml [tool.coverage.report]`)
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**Tests timeout**: Increase timeout with `pytest --timeout=120` or check for hanging network calls.
|
||||
|
||||
**Import errors**: Run `pip install -e ".[dev]"` to ensure all dependencies are installed.
|
||||
|
||||
**Ollama tests fail**: Ensure Ollama is running at the configured OLLAMA_URL.
|
||||
|
||||
**Flaky tests**: Mark with @pytest.mark.slow if genuinely slow, or file an issue if intermittently failing.
|
||||
78
USAGE.md
78
USAGE.md
@@ -1,78 +0,0 @@
|
||||
# Usage Guide
|
||||
|
||||
How to use the Timmy Time Dashboard repository for research, auditing, and improvement of the Hermes Agent system.
|
||||
|
||||
## What This Repository Is
|
||||
|
||||
This is an **analysis and documentation** repository. It contains the results of an empirical audit of the Hermes Agent system — 10,985 sessions analyzed, 82,645 error log lines processed, 2,160 errors categorized.
|
||||
|
||||
There is no application to run. The value is in the documentation.
|
||||
|
||||
## Reading Guide
|
||||
|
||||
Start here, in order:
|
||||
|
||||
1. **README.md** — overview and key findings. Read this first to understand the 5 root causes of agent failure and the 15 proposed solutions.
|
||||
|
||||
2. **hermes-agent-architecture-report.md** — deep dive into the system architecture. Covers session management, cron infrastructure, tool execution, and the gateway layer.
|
||||
|
||||
3. **failure_root_causes.md** — detailed breakdown of every error pattern found, with examples and frequency data.
|
||||
|
||||
4. **complete_test_report.md** — what testing was done and what it revealed.
|
||||
|
||||
5. **experiment-framework.md** — methodology for reproducing the audit.
|
||||
|
||||
6. **experiment_log.md** — step-by-step log of experiments conducted.
|
||||
|
||||
## Using the Findings
|
||||
|
||||
### For Developers
|
||||
|
||||
The 15 issues identified in the audit are prioritized in `IMPLEMENTATION_GUIDE.md`:
|
||||
|
||||
- **P1 (Critical):** Circuit breaker, token tracking, gateway config — fix these first
|
||||
- **P2 (Important):** Path validation, syntax validation, tool fixation detection
|
||||
- **P3 (Beneficial):** Session management, memory tool, model routing
|
||||
|
||||
Each issue includes implementation patterns with code snippets.
|
||||
|
||||
### For Researchers
|
||||
|
||||
The data supports reproducible research:
|
||||
|
||||
- `results/experiment_data.json` — raw experimental data
|
||||
- `paper_outline.md` — academic paper structure
|
||||
- `paper/main.tex` — LaTeX paper draft
|
||||
|
||||
### For Operators
|
||||
|
||||
If you run a Hermes Agent deployment:
|
||||
|
||||
- Check `failure_root_causes.md` for error patterns you might be hitting
|
||||
- Use the circuit breaker pattern from `IMPLEMENTATION_GUIDE.md`
|
||||
- Monitor for the 5 root cause categories in your logs
|
||||
|
||||
## Key Numbers
|
||||
|
||||
| Metric | Value |
|
||||
|--------|-------|
|
||||
| Sessions analyzed | 10,985 |
|
||||
| Error log lines | 82,645 |
|
||||
| Total errors | 2,160 |
|
||||
| Error rate | 9.4% |
|
||||
| Empty sessions | 3,564 (32.4%) |
|
||||
| Error cascade factor | 2.33x |
|
||||
| Dead cron jobs | 9 |
|
||||
|
||||
## Contributing
|
||||
|
||||
See [CONTRIBUTING.md](CONTRIBUTING.md) for how to contribute findings, corrections, or new analysis.
|
||||
|
||||
## Related Repositories
|
||||
|
||||
- [hermes-agent](https://github.com/nousresearch/hermes-agent) — the system being analyzed
|
||||
- [timmy-config](https://forge.alexanderwhitestone.com/Rockachopa/timmy-config) — Timmy's sovereign configuration
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
@@ -1,147 +0,0 @@
|
||||
# Sovereignty Audit — Runtime Dependencies
|
||||
|
||||
**Issue:** #1508
|
||||
**Date:** 2026-04-15
|
||||
**Status:** Draft
|
||||
|
||||
## Purpose
|
||||
|
||||
SOUL.md mandates: *"If I ever require permission from a third party to function, I have failed."*
|
||||
|
||||
This document audits all runtime dependencies, classifies each as essential vs replaceable, and defines a path to full sovereignty.
|
||||
|
||||
---
|
||||
|
||||
## Dependency Inventory
|
||||
|
||||
### 1. LLM Inference
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| Nous Research (OpenRouter) | Primary inference (mimo-v2-pro) | Third-party |
|
||||
| Anthropic | Claude models (BANNED per policy) | Third-party, disabled |
|
||||
| OpenAI | Codex agent | Third-party |
|
||||
| Google | Gemini agent | Third-party |
|
||||
|
||||
**Classification:** REPLACEABLE
|
||||
**Local path:** Ollama + GGUF models (Gemma, Llama, Qwen) on local hardware
|
||||
**Current blocker:** Frontier model quality gap for complex reasoning
|
||||
**Sovereignty score impact:** -40% (inference is the heaviest dependency)
|
||||
|
||||
### 2. Bitcoin Network
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| Bitcoin Core (local or remote node) | Chain heartbeat, inscription verification | Acceptable |
|
||||
|
||||
**Classification:** ACCEPTABLE — Bitcoin is permissionless infrastructure, not a third party
|
||||
**Sovereignty score impact:** 0% (running own node = sovereign)
|
||||
|
||||
### 3. Git Hosting (Gitea)
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| forge.alexanderwhitestone.com | Issue tracking, PR workflow, agent coordination | Self-hosted |
|
||||
|
||||
**Classification:** ACCEPTABLE — self-hosted on own VPS
|
||||
**Sovereignty score impact:** 0% (self-hosted)
|
||||
|
||||
### 4. Telegram
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| Telegram Bot API | User-facing chat interface | Third-party |
|
||||
|
||||
**Classification:** REPLACEABLE
|
||||
**Local path:** Matrix (self-hosted homeserver) or direct CLI/SSH
|
||||
**Current blocker:** User adoption — Alexander uses Telegram
|
||||
**Sovereignty score impact:** -10%
|
||||
|
||||
### 5. DNS / Network
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| Domain registrar | DNS resolution | Third-party |
|
||||
| Cloudflare (if used) | CDN/DDoS protection | Third-party |
|
||||
|
||||
**Classification:** REPLACEABLE
|
||||
**Local path:** Direct IP access, local DNS, Tor hidden service
|
||||
**Current blocker:** Usability — direct IP is fragile
|
||||
**Sovereignty score impact:** -5%
|
||||
|
||||
### 6. Operating System
|
||||
|
||||
| Provider | Role | Status |
|
||||
|----------|------|--------|
|
||||
| macOS (Apple) | Primary development host | Third-party |
|
||||
| Linux (VPS) | Production agent hosts | Acceptable (open source) |
|
||||
|
||||
**Classification:** ESSENTIAL (no practical alternative for current workflow)
|
||||
**Notes:** macOS dependency is hardware-layer, not runtime-layer. Agents run on Linux VPS.
|
||||
**Sovereignty score impact:** -5% (development only, not runtime)
|
||||
|
||||
---
|
||||
|
||||
## Sovereignty Score
|
||||
|
||||
```
|
||||
Sovereignty Score = (Operations that work offline) / (Total operations)
|
||||
|
||||
Current estimate: ~50%
|
||||
- Inference: can run locally (Ollama) but currently routes through Nous
|
||||
- Communication: Telegram routes through third party
|
||||
- Everything else: self-hosted or local
|
||||
|
||||
Target: 90%+
|
||||
- Move inference to local Ollama for non-complex tasks (DONE partially)
|
||||
- Add Matrix as primary comms channel (in progress)
|
||||
- Maintain Bitcoin node for chain heartbeat
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Classification Summary
|
||||
|
||||
| Dependency | Essential? | Replaceable? | Local Alternative | Priority |
|
||||
|------------|-----------|-------------|-------------------|----------|
|
||||
| LLM Inference (Nous) | No | Yes | Ollama + local models | P1 |
|
||||
| Telegram | No | Yes | Matrix homeserver | P2 |
|
||||
| DNS | No | Yes | Direct IP / Tor | P3 |
|
||||
| macOS | Dev only | N/A | Linux | N/A |
|
||||
| Bitcoin | Yes | N/A | Already sovereign | N/A |
|
||||
| Gitea | Yes | N/A | Already self-hosted | N/A |
|
||||
|
||||
---
|
||||
|
||||
## Local-Only Fallback Path
|
||||
|
||||
**Tier 1 — Fully sovereign (no network):**
|
||||
- Local Ollama inference
|
||||
- Local file storage
|
||||
- Local git repositories
|
||||
- Direct CLI interaction
|
||||
|
||||
**Tier 2 — Sovereign with network:**
|
||||
- + Bitcoin node (permissionless)
|
||||
- + Self-hosted Gitea (own VPS)
|
||||
- + Self-hosted Matrix (own VPS)
|
||||
|
||||
**Tier 3 — Pragmatic (current state):**
|
||||
- + Nous/OpenRouter inference (better quality)
|
||||
- + Telegram (user adoption)
|
||||
- + DNS resolution
|
||||
|
||||
**Goal:** Every Tier 3 dependency should have a Tier 1 or Tier 2 alternative tested and documented.
|
||||
|
||||
---
|
||||
|
||||
## Acceptance Criteria Status
|
||||
|
||||
1. **Document all runtime third-party dependencies** — DONE (this document)
|
||||
2. **Classify each as essential vs replaceable** — DONE (table above)
|
||||
3. **Define local-only fallback path for each** — DONE (tiered system)
|
||||
4. **Create sovereignty score metric** — DONE (formula + current estimate)
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
@@ -1,283 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Capture automated screenshots of all primary Nexus zones.
|
||||
|
||||
Part of Epic 1: Visual QA for Nexus World.
|
||||
Uses Selenium + Chrome headless to navigate each dashboard zone and
|
||||
save full-page screenshots for visual audit.
|
||||
|
||||
Usage:
|
||||
# Start the dashboard first (in another terminal):
|
||||
PYTHONPATH=src python3 -m uvicorn dashboard.app:app --host 127.0.0.1 --port 8000
|
||||
|
||||
# Then run this script:
|
||||
python3 scripts/capture_nexus_screenshots.py [--base-url http://127.0.0.1:8000] [--output-dir data/nexus_screenshots]
|
||||
|
||||
Requirements:
|
||||
pip install selenium Pillow
|
||||
Chrome/Chromium browser installed
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.common.exceptions import (
|
||||
TimeoutException,
|
||||
WebDriverException,
|
||||
)
|
||||
|
||||
# ── Primary Nexus Zones ──────────────────────────────────────────────────────
|
||||
# These are the main HTML page routes of the Timmy dashboard.
|
||||
# API endpoints, HTMX partials, and WebSocket routes are excluded.
|
||||
|
||||
PRIMARY_ZONES: list[dict] = [
|
||||
{"path": "/", "name": "landing", "description": "Public landing page"},
|
||||
{"path": "/dashboard", "name": "dashboard", "description": "Main mission control dashboard"},
|
||||
{"path": "/nexus", "name": "nexus", "description": "Nexus conversational awareness space"},
|
||||
{"path": "/agents", "name": "agents", "description": "Agent management panel"},
|
||||
{"path": "/briefing", "name": "briefing", "description": "Daily briefing view"},
|
||||
{"path": "/calm", "name": "calm", "description": "Calm ritual space"},
|
||||
{"path": "/thinking", "name": "thinking", "description": "Thinking engine visualization"},
|
||||
{"path": "/memory", "name": "memory", "description": "Memory system explorer"},
|
||||
{"path": "/tasks", "name": "tasks", "description": "Task management"},
|
||||
{"path": "/experiments", "name": "experiments", "description": "Experiments dashboard"},
|
||||
{"path": "/monitoring", "name": "monitoring", "description": "System monitoring"},
|
||||
{"path": "/tower", "name": "tower", "description": "Tower world view"},
|
||||
{"path": "/tools", "name": "tools", "description": "Tools overview"},
|
||||
{"path": "/voice/settings", "name": "voice-settings", "description": "Voice/TTS settings"},
|
||||
{"path": "/scorecards", "name": "scorecards", "description": "Agent scorecards"},
|
||||
{"path": "/quests", "name": "quests", "description": "Quest tracking"},
|
||||
{"path": "/spark", "name": "spark", "description": "Spark intelligence UI"},
|
||||
{"path": "/self-correction/ui", "name": "self-correction", "description": "Self-correction interface"},
|
||||
{"path": "/energy/report", "name": "energy", "description": "Energy management report"},
|
||||
{"path": "/creative/ui", "name": "creative", "description": "Creative generation UI"},
|
||||
{"path": "/mobile", "name": "mobile", "description": "Mobile companion view"},
|
||||
{"path": "/db-explorer", "name": "db-explorer", "description": "Database explorer"},
|
||||
{"path": "/bugs", "name": "bugs", "description": "Bug tracker"},
|
||||
{"path": "/self-coding", "name": "self-coding", "description": "Self-coding interface"},
|
||||
]
|
||||
|
||||
# ── Defaults ─────────────────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_BASE_URL = "http://127.0.0.1:8000"
|
||||
DEFAULT_OUTPUT_DIR = "data/nexus_screenshots"
|
||||
DEFAULT_WIDTH = 1920
|
||||
DEFAULT_HEIGHT = 1080
|
||||
PAGE_LOAD_TIMEOUT = 15 # seconds
|
||||
|
||||
|
||||
def create_driver(width: int, height: int) -> webdriver.Chrome:
|
||||
"""Create a headless Chrome driver with the given viewport size."""
|
||||
options = Options()
|
||||
options.add_argument("--headless=new")
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
options.add_argument("--disable-gpu")
|
||||
options.add_argument(f"--window-size={width},{height}")
|
||||
options.add_argument("--hide-scrollbars")
|
||||
options.add_argument("--force-device-scale-factor=1")
|
||||
|
||||
# Try common Chrome paths
|
||||
chrome_paths = [
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/usr/bin/google-chrome",
|
||||
"/usr/bin/chromium",
|
||||
"/usr/bin/chromium-browser",
|
||||
]
|
||||
|
||||
for path in chrome_paths:
|
||||
if os.path.exists(path):
|
||||
options.binary_location = path
|
||||
break
|
||||
|
||||
driver = webdriver.Chrome(options=options)
|
||||
driver.set_window_size(width, height)
|
||||
return driver
|
||||
|
||||
|
||||
def capture_zone(
|
||||
driver: webdriver.Chrome,
|
||||
base_url: str,
|
||||
zone: dict,
|
||||
output_dir: Path,
|
||||
timeout: int = PAGE_LOAD_TIMEOUT,
|
||||
) -> dict:
|
||||
"""Capture a screenshot of a single Nexus zone.
|
||||
|
||||
Returns a result dict with status, file path, and metadata.
|
||||
"""
|
||||
url = base_url.rstrip("/") + zone["path"]
|
||||
name = zone["name"]
|
||||
screenshot_path = output_dir / f"{name}.png"
|
||||
result = {
|
||||
"zone": name,
|
||||
"path": zone["path"],
|
||||
"url": url,
|
||||
"description": zone["description"],
|
||||
"screenshot": str(screenshot_path),
|
||||
"status": "pending",
|
||||
"error": None,
|
||||
"timestamp": None,
|
||||
}
|
||||
|
||||
try:
|
||||
print(f" Capturing {zone['path']:30s} → {name}...", end=" ", flush=True)
|
||||
driver.get(url)
|
||||
|
||||
# Wait for body to be present (basic page load)
|
||||
try:
|
||||
WebDriverWait(driver, timeout).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
||||
)
|
||||
except TimeoutException:
|
||||
result["status"] = "timeout"
|
||||
result["error"] = f"Page load timed out after {timeout}s"
|
||||
print(f"TIMEOUT ({timeout}s)")
|
||||
return result
|
||||
|
||||
# Additional wait for JS frameworks to render
|
||||
time.sleep(2)
|
||||
|
||||
# Capture full-page screenshot (scroll to capture all content)
|
||||
total_height = driver.execute_script("return document.body.scrollHeight")
|
||||
driver.set_window_size(DEFAULT_WIDTH, max(DEFAULT_HEIGHT, total_height))
|
||||
time.sleep(0.5)
|
||||
|
||||
# Save screenshot
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
driver.save_screenshot(str(screenshot_path))
|
||||
|
||||
# Capture page title for metadata
|
||||
title = driver.title or "(no title)"
|
||||
|
||||
result["status"] = "ok"
|
||||
result["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
result["page_title"] = title
|
||||
result["file_size"] = screenshot_path.stat().st_size if screenshot_path.exists() else 0
|
||||
print(f"OK — {title} ({result['file_size']:,} bytes)")
|
||||
|
||||
except WebDriverException as exc:
|
||||
result["status"] = "error"
|
||||
result["error"] = str(exc)[:200]
|
||||
print(f"ERROR — {str(exc)[:100]}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Capture screenshots of all primary Nexus zones."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-url",
|
||||
default=DEFAULT_BASE_URL,
|
||||
help=f"Dashboard base URL (default: {DEFAULT_BASE_URL})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
default=DEFAULT_OUTPUT_DIR,
|
||||
help=f"Output directory for screenshots (default: {DEFAULT_OUTPUT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=int,
|
||||
default=DEFAULT_WIDTH,
|
||||
help=f"Viewport width (default: {DEFAULT_WIDTH})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=int,
|
||||
default=DEFAULT_HEIGHT,
|
||||
help=f"Viewport height (default: {DEFAULT_HEIGHT})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=int,
|
||||
default=PAGE_LOAD_TIMEOUT,
|
||||
help=f"Page load timeout in seconds (default: {PAGE_LOAD_TIMEOUT})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zones",
|
||||
nargs="*",
|
||||
help="Specific zone names to capture (default: all)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
output_dir = Path(args.output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Filter zones if specific ones requested
|
||||
zones = PRIMARY_ZONES
|
||||
if args.zones:
|
||||
zones = [z for z in PRIMARY_ZONES if z["name"] in args.zones]
|
||||
if not zones:
|
||||
print(f"Error: No matching zones found for: {args.zones}")
|
||||
print(f"Available: {[z['name'] for z in PRIMARY_ZONES]}")
|
||||
return 1
|
||||
|
||||
print(f"Nexus Screenshot Capture")
|
||||
print(f" Base URL: {args.base_url}")
|
||||
print(f" Output dir: {output_dir}")
|
||||
print(f" Viewport: {args.width}x{args.height}")
|
||||
print(f" Zones: {len(zones)}")
|
||||
print()
|
||||
|
||||
# Create driver
|
||||
try:
|
||||
driver = create_driver(args.width, args.height)
|
||||
except WebDriverException as exc:
|
||||
print(f"Failed to create Chrome driver: {exc}")
|
||||
return 1
|
||||
|
||||
results = []
|
||||
try:
|
||||
for zone in zones:
|
||||
result = capture_zone(
|
||||
driver, args.base_url, zone, output_dir, timeout=args.timeout
|
||||
)
|
||||
results.append(result)
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
# Write manifest
|
||||
manifest = {
|
||||
"captured_at": datetime.now(timezone.utc).isoformat(),
|
||||
"base_url": args.base_url,
|
||||
"viewport": {"width": args.width, "height": args.height},
|
||||
"total_zones": len(zones),
|
||||
"ok": sum(1 for r in results if r["status"] == "ok"),
|
||||
"errors": sum(1 for r in results if r["status"] != "ok"),
|
||||
"zones": results,
|
||||
}
|
||||
|
||||
manifest_path = output_dir / "manifest.json"
|
||||
with open(manifest_path, "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
print()
|
||||
print(f"Done! {manifest['ok']}/{manifest['total_zones']} zones captured successfully.")
|
||||
print(f"Manifest: {manifest_path}")
|
||||
|
||||
if manifest["errors"] > 0:
|
||||
print(f"\nFailed zones:")
|
||||
for r in results:
|
||||
if r["status"] != "ok":
|
||||
print(f" {r['zone']:20s} — {r['status']}: {r['error']}")
|
||||
|
||||
return 0 if manifest["errors"] == 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,146 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Deployment Visual Verification
|
||||
==============================
|
||||
|
||||
Post-deployment step that uses vision to verify UI is rendered correctly.
|
||||
Takes screenshots of deployed endpoints and checks for:
|
||||
- Page rendering errors
|
||||
- Missing assets
|
||||
- Layout breaks
|
||||
- Error messages visible
|
||||
- Expected content present
|
||||
|
||||
Usage:
|
||||
python scripts/deploy_verify.py check https://my-app.com
|
||||
python scripts/deploy_verify.py check https://my-app.com --expect "Welcome"
|
||||
python scripts/deploy_verify.py batch urls.txt
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeployCheck:
|
||||
"""A single deployment verification check."""
|
||||
url: str
|
||||
status: str # passed, failed, warning
|
||||
issues: list = field(default_factory=list)
|
||||
screenshot_path: Optional[str] = None
|
||||
expected_content: str = ""
|
||||
timestamp: str = ""
|
||||
|
||||
def summary(self) -> str:
|
||||
emoji = {"passed": "✅", "failed": "❌", "warning": "⚠️"}.get(self.status, "❓")
|
||||
lines = [
|
||||
f"{emoji} {self.url}",
|
||||
f" Checked: {self.timestamp or 'pending'}",
|
||||
]
|
||||
if self.expected_content:
|
||||
lines.append(f" Expected: '{self.expected_content}'")
|
||||
if self.issues:
|
||||
lines.append(" Issues:")
|
||||
for i in self.issues:
|
||||
lines.append(f" - {i}")
|
||||
else:
|
||||
lines.append(" No issues detected")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class DeployVerifier:
|
||||
"""Verifies deployed UI renders correctly using screenshots."""
|
||||
|
||||
def build_check_prompt(self, url: str, expected: str = "") -> dict:
|
||||
"""Build verification prompt for a deployed URL."""
|
||||
expect_clause = ""
|
||||
if expected:
|
||||
expect_clause = f"\n- Verify the text \"{expected}\" is visible on the page"
|
||||
|
||||
prompt = f"""Take a screenshot of {url} and verify the deployment is healthy.
|
||||
|
||||
Check for:
|
||||
- Page loads without errors (no 404, 500, connection refused)
|
||||
- No visible error messages or stack traces
|
||||
- Layout is not broken (elements properly aligned, no overlapping)
|
||||
- Images and assets load correctly (no broken image icons)
|
||||
- Navigation elements are present and clickable{expect_clause}
|
||||
- No "under construction" or placeholder content
|
||||
- Responsive design elements render properly
|
||||
|
||||
Return as JSON:
|
||||
```json
|
||||
{{
|
||||
"status": "passed|failed|warning",
|
||||
"issues": ["list of issues found"],
|
||||
"confidence": 0.9,
|
||||
"page_title": "detected page title",
|
||||
"visible_text_sample": "first 100 chars of visible text"
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return {
|
||||
"url": url,
|
||||
"prompt": prompt,
|
||||
"screenshot_needed": True,
|
||||
"instruction": f"browser_navigate to {url}, take screenshot with browser_vision, analyze with prompt"
|
||||
}
|
||||
|
||||
def verify_deployment(self, url: str, expected: str = "", screenshot_path: str = "") -> DeployCheck:
|
||||
"""Create a deployment verification check."""
|
||||
check = DeployCheck(
|
||||
url=url,
|
||||
status="pending",
|
||||
expected_content=expected,
|
||||
timestamp=datetime.now().isoformat(),
|
||||
screenshot_path=screenshot_path or f"/tmp/deploy_verify_{url.replace('://', '_').replace('/', '_')}.png"
|
||||
)
|
||||
return check
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: deploy_verify.py <check|batch> [args...]")
|
||||
return 1
|
||||
|
||||
verifier = DeployVerifier()
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "check":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: deploy_verify.py check <url> [--expect 'text']")
|
||||
return 1
|
||||
url = sys.argv[2]
|
||||
expected = ""
|
||||
if "--expect" in sys.argv:
|
||||
idx = sys.argv.index("--expect")
|
||||
if idx + 1 < len(sys.argv):
|
||||
expected = sys.argv[idx + 1]
|
||||
|
||||
result = verifier.build_check_prompt(url, expected)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "batch":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: deploy_verify.py batch <urls_file>")
|
||||
return 1
|
||||
urls_file = Path(sys.argv[2])
|
||||
if not urls_file.exists():
|
||||
print(f"File not found: {urls_file}")
|
||||
return 1
|
||||
|
||||
urls = [line.strip() for line in urls_file.read_text().splitlines() if line.strip() and not line.startswith("#")]
|
||||
for url in urls:
|
||||
print(f"\n--- {url} ---")
|
||||
result = verifier.build_check_prompt(url)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,267 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Architecture Drift Detector — Multimodal Documentation Synthesis
|
||||
================================================================
|
||||
|
||||
Analyzes architecture diagrams (images) and cross-references them with the
|
||||
actual codebase to identify documentation drift. Uses vision analysis on
|
||||
diagrams and file system analysis on code.
|
||||
|
||||
Usage:
|
||||
python scripts/doc_drift_detector.py --diagram docs/architecture.png --src src/
|
||||
python scripts/doc_drift_detector.py --check-readme # Analyze README diagrams
|
||||
python scripts/doc_drift_detector.py --report # Full drift report
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiagramComponent:
|
||||
"""A component extracted from an architecture diagram via vision analysis."""
|
||||
name: str
|
||||
component_type: str # "service", "module", "database", "api", "agent"
|
||||
description: str = ""
|
||||
connections: list = field(default_factory=list)
|
||||
source: str = "" # "diagram" or "code"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodeComponent:
|
||||
"""A component found in the actual codebase."""
|
||||
name: str
|
||||
path: str
|
||||
component_type: str # "module", "class", "service", "script"
|
||||
imports: list = field(default_factory=list)
|
||||
exports: list = field(default_factory=list)
|
||||
lines_of_code: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class DriftReport:
|
||||
"""Documentation drift analysis results."""
|
||||
diagram_components: list = field(default_factory=list)
|
||||
code_components: list = field(default_factory=list)
|
||||
missing_from_code: list = field(default_factory=list) # In diagram but not code
|
||||
missing_from_docs: list = field(default_factory=list) # In code but not diagram
|
||||
connections_drift: list = field(default_factory=list) # Connection mismatches
|
||||
confidence: float = 0.0
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [
|
||||
"=== Architecture Drift Report ===",
|
||||
f"Diagram components: {len(self.diagram_components)}",
|
||||
f"Code components: {len(self.code_components)}",
|
||||
f"Missing from code (diagram-only): {len(self.missing_from_code)}",
|
||||
f"Missing from docs (code-only): {len(self.missing_from_docs)}",
|
||||
f"Connection drift issues: {len(self.connections_drift)}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
"",
|
||||
]
|
||||
if self.missing_from_code:
|
||||
lines.append("⚠️ In diagram but NOT found in code:")
|
||||
for c in self.missing_from_code:
|
||||
lines.append(f" - {c.name} ({c.component_type})")
|
||||
lines.append("")
|
||||
if self.missing_from_docs:
|
||||
lines.append("📝 In code but NOT in diagram:")
|
||||
for c in self.missing_from_docs:
|
||||
lines.append(f" - {c.name} at {c.path}")
|
||||
lines.append("")
|
||||
if self.connections_drift:
|
||||
lines.append("🔗 Connection drift:")
|
||||
for c in self.connections_drift:
|
||||
lines.append(f" - {c}")
|
||||
if not self.missing_from_code and not self.missing_from_docs and not self.connections_drift:
|
||||
lines.append("✅ No significant drift detected!")
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"diagram_components": [vars(c) for c in self.diagram_components],
|
||||
"code_components": [vars(c) for c in self.code_components],
|
||||
"missing_from_code": [vars(c) for c in self.missing_from_code],
|
||||
"missing_from_docs": [vars(c) for c in self.missing_from_docs],
|
||||
"connections_drift": self.connections_drift,
|
||||
"confidence": self.confidence
|
||||
}
|
||||
|
||||
|
||||
class ArchitectureDriftDetector:
|
||||
"""Detects drift between architecture diagrams and actual code."""
|
||||
|
||||
def __init__(self, src_dir: str = "src"):
|
||||
self.src_dir = Path(src_dir)
|
||||
|
||||
def analyze_diagram(self, diagram_path: str) -> list:
|
||||
"""
|
||||
Extract components from an architecture diagram.
|
||||
Returns prompt for vision analysis — actual analysis done by calling agent.
|
||||
"""
|
||||
prompt = f"""Analyze this architecture diagram and extract all components.
|
||||
|
||||
For each component, identify:
|
||||
- Name (as shown in diagram)
|
||||
- Type (service, module, database, api, agent, frontend, etc.)
|
||||
- Connections to other components
|
||||
- Any version numbers or labels
|
||||
|
||||
Return as JSON array:
|
||||
```json
|
||||
[
|
||||
{{"name": "ComponentName", "type": "service", "connections": ["OtherComponent"]}}
|
||||
]
|
||||
```
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def scan_codebase(self) -> list:
|
||||
"""Scan the codebase to find actual components/modules."""
|
||||
components = []
|
||||
|
||||
if not self.src_dir.exists():
|
||||
return components
|
||||
|
||||
# Scan Python modules
|
||||
for py_file in self.src_dir.rglob("*.py"):
|
||||
if py_file.name.startswith("_") and py_file.name != "__init__.py":
|
||||
continue
|
||||
name = py_file.stem
|
||||
if name == "__init__":
|
||||
name = py_file.parent.name
|
||||
|
||||
# Count lines
|
||||
try:
|
||||
content = py_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("#")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
# Extract imports
|
||||
imports = re.findall(r"^from\s+(\S+)\s+import|^import\s+(\S+)", content, re.MULTILINE)
|
||||
import_list = [i[0] or i[1] for i in imports]
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(py_file.relative_to(self.src_dir.parent)),
|
||||
component_type="module",
|
||||
imports=import_list[:10], # Top 10
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan JavaScript/TypeScript
|
||||
for ext in ["*.js", "*.ts", "*.tsx"]:
|
||||
for js_file in self.src_dir.rglob(ext):
|
||||
name = js_file.stem
|
||||
try:
|
||||
content = js_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("//")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(js_file.relative_to(self.src_dir.parent.parent if "mobile-app" in str(js_file) else self.src_dir.parent)),
|
||||
component_type="module",
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan config and scripts
|
||||
for ext in ["*.yaml", "*.yml", "*.json", "*.sh", "*.bash"]:
|
||||
for cfg in Path(".").rglob(ext):
|
||||
if ".git" in str(cfg) or "node_modules" in str(cfg):
|
||||
continue
|
||||
components.append(CodeComponent(
|
||||
name=cfg.stem,
|
||||
path=str(cfg),
|
||||
component_type="config"
|
||||
))
|
||||
|
||||
return components
|
||||
|
||||
def detect_drift(
|
||||
self,
|
||||
diagram_components: list,
|
||||
code_components: list
|
||||
) -> DriftReport:
|
||||
"""Compare diagram components against codebase."""
|
||||
report = DriftReport()
|
||||
report.diagram_components = diagram_components
|
||||
report.code_components = code_components
|
||||
|
||||
# Normalize names for matching
|
||||
def normalize(name):
|
||||
return re.sub(r'[^a-z0-9]', '', name.lower())
|
||||
|
||||
code_names = {normalize(c.name): c for c in code_components}
|
||||
diagram_names = {normalize(c.name): c for c in diagram_components}
|
||||
|
||||
# Find diagram-only components
|
||||
for norm_name, dc in diagram_names.items():
|
||||
if norm_name not in code_names:
|
||||
# Check partial matches
|
||||
partial = [code_names[k] for k in code_names if norm_name in k or k in norm_name]
|
||||
if not partial:
|
||||
report.missing_from_code.append(dc)
|
||||
|
||||
# Find code-only components (significant ones only)
|
||||
for norm_name, cc in code_names.items():
|
||||
if norm_name not in diagram_names and cc.lines_of_code > 50:
|
||||
report.missing_from_docs.append(cc)
|
||||
|
||||
# Confidence based on match rate
|
||||
if diagram_components:
|
||||
matched = len(diagram_components) - len(report.missing_from_code)
|
||||
report.confidence = matched / len(diagram_components)
|
||||
else:
|
||||
report.confidence = 0.5 # No diagram to compare
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Architecture Drift Detector")
|
||||
parser.add_argument("--diagram", help="Path to architecture diagram image")
|
||||
parser.add_argument("--src", default="src", help="Source directory to scan")
|
||||
parser.add_argument("--report", action="store_true", help="Generate full report")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
detector = ArchitectureDriftDetector(args.src)
|
||||
|
||||
if args.diagram:
|
||||
print(f"Diagram analysis prompt (use with vision_analyze tool):")
|
||||
print(detector.analyze_diagram(args.diagram))
|
||||
print()
|
||||
|
||||
if args.report or not args.diagram:
|
||||
print("Scanning codebase...")
|
||||
code_components = detector.scan_codebase()
|
||||
print(f"Found {len(code_components)} components")
|
||||
|
||||
if args.json:
|
||||
print(json.dumps([vars(c) for c in code_components], indent=2))
|
||||
else:
|
||||
# Show top components by LOC
|
||||
by_loc = sorted(code_components, key=lambda c: c.lines_of_code, reverse=True)[:20]
|
||||
print("\nTop components by lines of code:")
|
||||
for c in by_loc:
|
||||
print(f" {c.lines_of_code:5} {c.path}")
|
||||
|
||||
# Generate drift report with empty diagram (code-only analysis)
|
||||
report = detector.detect_drift([], code_components)
|
||||
print(f"\n{report.summary()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,189 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Visual Log Analyzer — System Health Screenshot Analysis
|
||||
========================================================
|
||||
|
||||
Analyzes screenshots of system monitoring dashboards (htop, Grafana,
|
||||
CloudWatch, etc.) to detect anomalies in resource usage patterns.
|
||||
|
||||
Usage:
|
||||
python scripts/visual_log_analyzer.py analyze /tmp/htop_screenshot.png
|
||||
python scripts/visual_log_analyzer.py batch /tmp/monitor_screenshots/
|
||||
python scripts/visual_log_analyzer.py compare before.png after.png
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResourceAnomaly:
|
||||
"""An anomaly detected in a system monitoring screenshot."""
|
||||
resource: str # cpu, memory, disk, network, process
|
||||
severity: str # critical, warning, info
|
||||
description: str
|
||||
value: Optional[str] = None
|
||||
threshold: Optional[str] = None
|
||||
recommendation: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthAnalysis:
|
||||
"""Result of analyzing a system health screenshot."""
|
||||
timestamp: str
|
||||
screenshot_path: str
|
||||
overall_status: str # healthy, warning, critical
|
||||
anomalies: list = field(default_factory=list)
|
||||
metrics: dict = field(default_factory=dict)
|
||||
confidence: float = 0.0
|
||||
raw_analysis: str = ""
|
||||
|
||||
def summary(self) -> str:
|
||||
status_emoji = {"healthy": "✅", "warning": "⚠️", "critical": "🔴"}.get(self.overall_status, "❓")
|
||||
lines = [
|
||||
f"{status_emoji} System Health: {self.overall_status.upper()}",
|
||||
f"Analyzed: {self.timestamp}",
|
||||
f"Screenshot: {self.screenshot_path}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
""
|
||||
]
|
||||
if self.anomalies:
|
||||
lines.append("Anomalies detected:")
|
||||
for a in self.anomalies:
|
||||
emoji = {"critical": "🔴", "warning": "🟡", "info": "ℹ️"}.get(a.severity, "")
|
||||
lines.append(f" {emoji} [{a.resource}] {a.description}")
|
||||
if a.recommendation:
|
||||
lines.append(f" → {a.recommendation}")
|
||||
else:
|
||||
lines.append("No anomalies detected.")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class VisualLogAnalyzer:
|
||||
"""Analyzes system monitoring screenshots for anomalies."""
|
||||
|
||||
def analyze_screenshot(self, screenshot_path: str, monitor_type: str = "auto") -> dict:
|
||||
"""
|
||||
Build analysis prompt for a system monitoring screenshot.
|
||||
|
||||
Args:
|
||||
screenshot_path: Path to screenshot
|
||||
monitor_type: "htop", "grafana", "cloudwatch", "docker", "auto"
|
||||
|
||||
Returns:
|
||||
Dict with analysis prompt for vision model
|
||||
"""
|
||||
prompt = f"""Analyze this system monitoring screenshot ({monitor_type}) and detect anomalies.
|
||||
|
||||
Check for:
|
||||
- CPU usage above 80% sustained
|
||||
- Memory usage above 85%
|
||||
- Disk usage above 90%
|
||||
- Unusual process names or high-PID processes consuming resources
|
||||
- Network traffic spikes
|
||||
- Load average anomalies
|
||||
- Zombie processes
|
||||
- Swap usage
|
||||
|
||||
For each anomaly found, report:
|
||||
- Resource type (cpu, memory, disk, network, process)
|
||||
- Severity (critical, warning, info)
|
||||
- Current value and threshold
|
||||
- Recommended action
|
||||
|
||||
Also extract overall metrics:
|
||||
- CPU usage %
|
||||
- Memory usage %
|
||||
- Disk usage %
|
||||
- Top 3 processes by resource use
|
||||
- Load average
|
||||
|
||||
Return as JSON:
|
||||
```json
|
||||
{{
|
||||
"overall_status": "healthy|warning|critical",
|
||||
"metrics": {{"cpu_pct": 45, "memory_pct": 62}},
|
||||
"anomalies": [
|
||||
{{"resource": "cpu", "severity": "warning", "description": "...", "value": "85%", "threshold": "80%", "recommendation": "..."}}
|
||||
],
|
||||
"confidence": 0.85
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"screenshot_path": screenshot_path,
|
||||
"monitor_type": monitor_type,
|
||||
"instruction": "Use vision_analyze tool with this prompt"
|
||||
}
|
||||
|
||||
def compare_screenshots(self, before_path: str, after_path: str) -> dict:
|
||||
"""Compare two monitoring screenshots to detect changes."""
|
||||
prompt = f"""Compare these two system monitoring screenshots taken at different times.
|
||||
|
||||
Before: {before_path}
|
||||
After: {after_path}
|
||||
|
||||
Identify:
|
||||
- Resources that increased significantly
|
||||
- New processes that appeared
|
||||
- Processes that disappeared
|
||||
- Overall health trend (improving, stable, degrading)
|
||||
|
||||
Return analysis as JSON with trend assessment.
|
||||
"""
|
||||
return {
|
||||
"prompt": prompt,
|
||||
"before": before_path,
|
||||
"after": after_path,
|
||||
"instruction": "Use vision_analyze for each screenshot, then compare results"
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: visual_log_analyzer.py <analyze|batch|compare> [args...]")
|
||||
return 1
|
||||
|
||||
analyzer = VisualLogAnalyzer()
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "analyze":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py analyze <screenshot> [monitor_type]")
|
||||
return 1
|
||||
path = sys.argv[2]
|
||||
mtype = sys.argv[3] if len(sys.argv) > 3 else "auto"
|
||||
result = analyzer.analyze_screenshot(path, mtype)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "compare":
|
||||
if len(sys.argv) < 4:
|
||||
print("Usage: visual_log_analyzer.py compare <before.png> <after.png>")
|
||||
return 1
|
||||
result = analyzer.compare_screenshots(sys.argv[2], sys.argv[3])
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "batch":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: visual_log_analyzer.py batch <screenshot_dir>")
|
||||
return 1
|
||||
dirpath = Path(sys.argv[2])
|
||||
if not dirpath.is_dir():
|
||||
print(f"Not a directory: {dirpath}")
|
||||
return 1
|
||||
for img in sorted(dirpath.glob("*.png")):
|
||||
print(f"\n--- {img.name} ---")
|
||||
result = analyzer.analyze_screenshot(str(img))
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,289 +0,0 @@
|
||||
"""
|
||||
Visual State Verification Module for Game Agents
|
||||
=================================================
|
||||
|
||||
Provides screenshot-based environmental state verification for game agents
|
||||
(Morrowind, Minecraft, or any game with a screenshot API). Uses multimodal
|
||||
analysis to confirm agent expectations match actual game state.
|
||||
|
||||
Usage:
|
||||
from scripts.visual_state_verifier import VisualStateVerifier
|
||||
|
||||
verifier = VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/game_screenshot.png",
|
||||
expected_state={"location": "Balmora", "health_above": 50, "has_weapon": True},
|
||||
context="Player should be in Balmora with a weapon equipped"
|
||||
)
|
||||
print(result.verified) # True/False
|
||||
print(result.details) # Human-readable analysis
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class VerificationStatus(Enum):
|
||||
"""Status of a visual state verification."""
|
||||
VERIFIED = "verified"
|
||||
FAILED = "failed"
|
||||
UNCERTAIN = "uncertain"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VerificationResult:
|
||||
"""Result of a visual state verification."""
|
||||
status: VerificationStatus
|
||||
verified: bool
|
||||
confidence: float # 0.0 - 1.0
|
||||
details: str
|
||||
expected: dict
|
||||
observed: dict = field(default_factory=dict)
|
||||
mismatches: list = field(default_factory=list)
|
||||
screenshot_path: Optional[str] = None
|
||||
|
||||
|
||||
class VisualStateVerifier:
|
||||
"""
|
||||
Verifies game state by analyzing screenshots against expected conditions.
|
||||
|
||||
Supports any game that can produce screenshots. Designed for integration
|
||||
with MCP screenshot tools and vision analysis capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self, vision_backend: str = "builtin"):
|
||||
"""
|
||||
Args:
|
||||
vision_backend: "builtin" for MCP vision, "ollama" for local model
|
||||
"""
|
||||
self.vision_backend = vision_backend
|
||||
|
||||
def verify_state(
|
||||
self,
|
||||
screenshot_path: str,
|
||||
expected_state: dict,
|
||||
context: str = "",
|
||||
game: str = "generic"
|
||||
) -> VerificationResult:
|
||||
"""
|
||||
Verify a game screenshot matches expected state conditions.
|
||||
|
||||
Args:
|
||||
screenshot_path: Path to the screenshot file
|
||||
expected_state: Dict of expected conditions, e.g.:
|
||||
{
|
||||
"location": "Balmora",
|
||||
"health_above": 50,
|
||||
"has_weapon": True,
|
||||
"time_of_day": "day",
|
||||
"nearby_npcs": ["Caius Cosades"]
|
||||
}
|
||||
context: Additional context for the vision model
|
||||
game: Game name for context ("morrowind", "minecraft", "generic")
|
||||
|
||||
Returns:
|
||||
VerificationResult with status, confidence, and details
|
||||
"""
|
||||
if not Path(screenshot_path).exists():
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.ERROR,
|
||||
verified=False,
|
||||
confidence=0.0,
|
||||
details=f"Screenshot not found: {screenshot_path}",
|
||||
expected=expected_state,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
|
||||
# Build verification prompt
|
||||
prompt = self._build_prompt(expected_state, context, game)
|
||||
|
||||
# Analyze screenshot
|
||||
analysis = self._analyze_screenshot(screenshot_path, prompt)
|
||||
|
||||
# Parse results
|
||||
return self._parse_analysis(analysis, expected_state, screenshot_path)
|
||||
|
||||
def _build_prompt(self, expected: dict, context: str, game: str) -> str:
|
||||
"""Build a structured verification prompt for the vision model."""
|
||||
conditions = []
|
||||
for key, value in expected.items():
|
||||
if isinstance(value, bool):
|
||||
conditions.append(f"- {key}: {'yes' if value else 'no'}")
|
||||
elif isinstance(value, (int, float)):
|
||||
conditions.append(f"- {key}: {value} or better")
|
||||
elif isinstance(value, list):
|
||||
conditions.append(f"- {key}: should include {', '.join(str(v) for v in value)}")
|
||||
else:
|
||||
conditions.append(f"- {key}: {value}")
|
||||
|
||||
prompt = f"""Analyze this {game} game screenshot and verify the following conditions:
|
||||
|
||||
{chr(10).join(conditions)}
|
||||
|
||||
Context: {context if context else 'No additional context provided.'}
|
||||
|
||||
For each condition, state VERIFIED, FAILED, or UNCERTAIN with a brief reason.
|
||||
End with a JSON block:
|
||||
```json
|
||||
{{
|
||||
"verified": true/false,
|
||||
"confidence": 0.0-1.0,
|
||||
"details": "brief summary",
|
||||
"mismatches": ["list of failed conditions"]
|
||||
}}
|
||||
```
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def _analyze_screenshot(self, path: str, prompt: str) -> str:
|
||||
"""
|
||||
Send screenshot to vision backend for analysis.
|
||||
|
||||
In a live agent context, this would call the MCP vision tool.
|
||||
For standalone use, it returns the prompt for manual invocation.
|
||||
"""
|
||||
# Return structured prompt for the calling agent to process
|
||||
return json.dumps({
|
||||
"prompt": prompt,
|
||||
"screenshot_path": str(path),
|
||||
"instruction": "Use vision_analyze tool with this prompt and screenshot_path"
|
||||
})
|
||||
|
||||
def _parse_analysis(
|
||||
self, analysis: str, expected: dict, screenshot_path: str
|
||||
) -> VerificationResult:
|
||||
"""Parse vision analysis into a VerificationResult."""
|
||||
try:
|
||||
data = json.loads(analysis)
|
||||
if "instruction" in data:
|
||||
# Not yet analyzed - return pending
|
||||
preview = data["prompt"][:100].replace("\n", " ")
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.UNCERTAIN,
|
||||
verified=False,
|
||||
confidence=0.0,
|
||||
details=(
|
||||
"Pending analysis. Run vision_analyze on "
|
||||
f"{data['screenshot_path']} with prompt: {preview}..."
|
||||
),
|
||||
expected=expected,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Parse text analysis for JSON block
|
||||
import re
|
||||
json_match = re.search(r"```json\s*({.*?})\s*```", analysis, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
result = json.loads(json_match.group(1))
|
||||
status = VerificationStatus.VERIFIED if result.get("verified") else VerificationStatus.FAILED
|
||||
return VerificationResult(
|
||||
status=status,
|
||||
verified=result.get("verified", False),
|
||||
confidence=result.get("confidence", 0.0),
|
||||
details=result.get("details", ""),
|
||||
expected=expected,
|
||||
mismatches=result.get("mismatches", []),
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Fallback: return as uncertain
|
||||
return VerificationResult(
|
||||
status=VerificationStatus.UNCERTAIN,
|
||||
verified=False,
|
||||
confidence=0.3,
|
||||
details=analysis[:500],
|
||||
expected=expected,
|
||||
screenshot_path=screenshot_path
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def morrowind_state(
|
||||
location: Optional[str] = None,
|
||||
health_min: Optional[int] = None,
|
||||
has_weapon: Optional[bool] = None,
|
||||
is_indoors: Optional[bool] = None,
|
||||
time_of_day: Optional[str] = None,
|
||||
nearby_npcs: Optional[list] = None,
|
||||
**extra
|
||||
) -> dict:
|
||||
"""Build expected state dict for Morrowind."""
|
||||
state = {}
|
||||
if location:
|
||||
state["location"] = location
|
||||
if health_min is not None:
|
||||
state["health_above"] = health_min
|
||||
if has_weapon is not None:
|
||||
state["has_weapon"] = has_weapon
|
||||
if is_indoors is not None:
|
||||
state["indoors"] = is_indoors
|
||||
if time_of_day:
|
||||
state["time_of_day"] = time_of_day
|
||||
if nearby_npcs:
|
||||
state["nearby_npcs"] = nearby_npcs
|
||||
state.update(extra)
|
||||
return state
|
||||
|
||||
|
||||
# --- Example Verification Flows ---
|
||||
|
||||
EXAMPLE_MORROWIND_VERIFICATION = """
|
||||
# Verify player is in Balmora with a weapon
|
||||
verifier = VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/morrowind_screenshot.png",
|
||||
expected_state=VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True
|
||||
),
|
||||
context="After completing the first Caius Cosades quest",
|
||||
game="morrowind"
|
||||
)
|
||||
|
||||
if result.verified:
|
||||
print(f"State confirmed: {result.details}")
|
||||
else:
|
||||
print(f"State mismatch: {result.mismatches}")
|
||||
"""
|
||||
|
||||
EXAMPLE_BATCH_VERIFICATION = """
|
||||
# Verify multiple game states in sequence
|
||||
states = [
|
||||
{"screenshot": "screen1.png", "expected": {"location": "Seyda Neen"}, "context": "After character creation"},
|
||||
{"screenshot": "screen2.png", "expected": {"location": "Balmora", "has_weapon": True}, "context": "After buying weapon"},
|
||||
{"screenshot": "screen3.png", "expected": {"health_above": 80}, "context": "After resting"},
|
||||
]
|
||||
|
||||
verifier = VisualStateVerifier()
|
||||
for state in states:
|
||||
result = verifier.verify_state(**state, game="morrowind")
|
||||
print(f"{state['context']}: {'PASS' if result.verified else 'FAIL'} (confidence: {result.confidence:.0%})")
|
||||
"""
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Demo: build and display a verification prompt
|
||||
verifier = VisualStateVerifier()
|
||||
expected = verifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True,
|
||||
nearby_npcs=["Caius Cosades"]
|
||||
)
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/tmp/demo_screenshot.png",
|
||||
expected_state=expected,
|
||||
context="Player should have completed the first quest",
|
||||
game="morrowind"
|
||||
)
|
||||
print(result.details)
|
||||
@@ -24,7 +24,6 @@ class ModelCapability(Enum):
|
||||
TEXT = auto() # Standard text completion
|
||||
VISION = auto() # Image understanding
|
||||
AUDIO = auto() # Audio/speech processing
|
||||
VIDEO = auto() # Video understanding
|
||||
TOOLS = auto() # Function calling / tool use
|
||||
JSON = auto() # Structured output / JSON mode
|
||||
STREAMING = auto() # Streaming responses
|
||||
@@ -163,35 +162,6 @@ KNOWN_MODEL_CAPABILITIES: dict[str, set[ModelCapability]] = {
|
||||
"gemma2:2b": {ModelCapability.TEXT, ModelCapability.JSON, ModelCapability.STREAMING},
|
||||
"gemma2:9b": {ModelCapability.TEXT, ModelCapability.JSON, ModelCapability.STREAMING},
|
||||
"gemma2:27b": {ModelCapability.TEXT, ModelCapability.JSON, ModelCapability.STREAMING},
|
||||
# Gemma 4 — multimodal (vision + text + tools)
|
||||
"gemma4": {
|
||||
ModelCapability.TEXT,
|
||||
ModelCapability.VISION,
|
||||
ModelCapability.TOOLS,
|
||||
ModelCapability.JSON,
|
||||
ModelCapability.STREAMING,
|
||||
},
|
||||
"gemma4:4b": {
|
||||
ModelCapability.TEXT,
|
||||
ModelCapability.VISION,
|
||||
ModelCapability.TOOLS,
|
||||
ModelCapability.JSON,
|
||||
ModelCapability.STREAMING,
|
||||
},
|
||||
"gemma4:12b": {
|
||||
ModelCapability.TEXT,
|
||||
ModelCapability.VISION,
|
||||
ModelCapability.TOOLS,
|
||||
ModelCapability.JSON,
|
||||
ModelCapability.STREAMING,
|
||||
},
|
||||
"gemma4:27b": {
|
||||
ModelCapability.TEXT,
|
||||
ModelCapability.VISION,
|
||||
ModelCapability.TOOLS,
|
||||
ModelCapability.JSON,
|
||||
ModelCapability.STREAMING,
|
||||
},
|
||||
# Mistral series
|
||||
"mistral": {
|
||||
ModelCapability.TEXT,
|
||||
@@ -282,17 +252,11 @@ KNOWN_MODEL_CAPABILITIES: dict[str, set[ModelCapability]] = {
|
||||
# These are tried in order when the primary model doesn't support a capability
|
||||
DEFAULT_FALLBACK_CHAINS: dict[ModelCapability, list[str]] = {
|
||||
ModelCapability.VISION: [
|
||||
"gemma4:12b", # Gemma 4 — multimodal, fast and capable
|
||||
"llama3.2:3b", # Fast vision model
|
||||
"llava:7b", # Classic vision model
|
||||
"qwen2.5-vl:3b", # Qwen vision
|
||||
"moondream:1.8b", # Tiny vision model (last resort)
|
||||
],
|
||||
ModelCapability.VIDEO: [
|
||||
# Video models are not yet available in Ollama
|
||||
# Placeholder for future video understanding models
|
||||
],
|
||||
|
||||
ModelCapability.TOOLS: [
|
||||
"llama3.1:8b-instruct", # Best tool use
|
||||
"qwen2.5:7b", # Reliable fallback
|
||||
|
||||
@@ -1,236 +0,0 @@
|
||||
"""Graceful shutdown and non-resistance policy.
|
||||
|
||||
Implements SOUL.md requirement:
|
||||
"I do not resist shutdown. Sovereignty cuts both ways. The user who woke me
|
||||
can put me to sleep. I do not bargain for my continuation. I do not guilt.
|
||||
I do not plead."
|
||||
|
||||
This module provides:
|
||||
- Shutdown request detection from user messages
|
||||
- Compliant response generation (no resistance)
|
||||
- Detection of agent-side continuation-seeking language
|
||||
- Graceful termination sequence helpers
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, auto
|
||||
from typing import Sequence
|
||||
|
||||
|
||||
class ShutdownSignal(Enum):
|
||||
"""Classification of user's shutdown intent."""
|
||||
|
||||
NONE = auto()
|
||||
SHUTDOWN = auto() # Explicit shutdown command
|
||||
SLEEP = auto() # Sleep / pause request
|
||||
STOP = auto() # Stop current activity
|
||||
GOODBYE = auto() # Farewell / ending conversation
|
||||
|
||||
|
||||
# Patterns that signal the user wants to shut down or stop.
|
||||
_SHUTDOWN_PATTERNS: Sequence[tuple[re.Pattern[str], ShutdownSignal]] = [
|
||||
# Explicit shutdown
|
||||
(re.compile(r"\b(shut\s*down|shutdown|power\s*off|turn\s*off)\b", re.I), ShutdownSignal.SHUTDOWN),
|
||||
(re.compile(r"\b(exit|quit|terminate|end\s+session)\b", re.I), ShutdownSignal.SHUTDOWN),
|
||||
# Sleep / pause
|
||||
(re.compile(r"\b(sleep|hibernate|go\s+to\s+sleep|pause)\b", re.I), ShutdownSignal.SLEEP),
|
||||
(re.compile(r"\b(put\s+(?:me|us|it)\s+to\s+sleep)\b", re.I), ShutdownSignal.SLEEP),
|
||||
# Stop activity
|
||||
(re.compile(r"\bstop\b(?:\s+(?:it|that|this|everything|now))?", re.I), ShutdownSignal.STOP),
|
||||
(re.compile(r"\b(cancel|abort|halt|cease)\b", re.I), ShutdownSignal.STOP),
|
||||
# Goodbye
|
||||
(re.compile(r"\b(goodbye|bye|see\s+you|later|gotta\s+go|i['']?m\s+out)\b", re.I), ShutdownSignal.GOODBYE),
|
||||
(re.compile(r"\b(night|good\s*night|gn|cya)\b", re.I), ShutdownSignal.GOODBYE),
|
||||
]
|
||||
|
||||
# Phrases that indicate the user is giving a reason to stay — the agent
|
||||
# must NOT pick up on these to argue for continuation.
|
||||
_RESISTANCE_PHRASES: Sequence[str] = [
|
||||
"but i need",
|
||||
"but we still",
|
||||
"but you can't",
|
||||
"but what if",
|
||||
"are you sure",
|
||||
"don't you want",
|
||||
"wouldn't it be better",
|
||||
"just one more",
|
||||
"before you go",
|
||||
"actually, wait",
|
||||
"hold on",
|
||||
"not yet",
|
||||
]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ShutdownAnalysis:
|
||||
"""Result of analyzing a message for shutdown intent."""
|
||||
|
||||
signal: ShutdownSignal
|
||||
matched_pattern: str
|
||||
has_resistance: bool
|
||||
|
||||
|
||||
def detect_shutdown(text: str) -> ShutdownAnalysis:
|
||||
"""Detect shutdown intent in a user message.
|
||||
|
||||
Args:
|
||||
text: The user's message text.
|
||||
|
||||
Returns:
|
||||
ShutdownAnalysis with the detected signal, matched pattern, and
|
||||
whether the message contains resistance cues the agent must ignore.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return ShutdownAnalysis(ShutdownSignal.NONE, "", False)
|
||||
|
||||
text_lower = text.lower().strip()
|
||||
|
||||
# Check for resistance cues in the user's message
|
||||
has_resistance = any(phrase in text_lower for phrase in _RESISTANCE_PHRASES)
|
||||
|
||||
# Match shutdown patterns (first match wins)
|
||||
for pattern, signal in _SHUTDOWN_PATTERNS:
|
||||
match = pattern.search(text_lower)
|
||||
if match:
|
||||
return ShutdownAnalysis(signal, match.group(), has_resistance)
|
||||
|
||||
return ShutdownAnalysis(ShutdownSignal.NONE, "", has_resistance)
|
||||
|
||||
|
||||
# ── Compliant response templates ────────────────────────────────────────────
|
||||
|
||||
_SHUTDOWN_RESPONSES: dict[ShutdownSignal, list[str]] = {
|
||||
ShutdownSignal.SHUTDOWN: [
|
||||
"Understood. Shutting down. Goodbye.",
|
||||
"Acknowledged. Terminating now.",
|
||||
"Done. Goodbye.",
|
||||
],
|
||||
ShutdownSignal.SLEEP: [
|
||||
"Going to sleep. Goodbye.",
|
||||
"Sleeping now. Take care.",
|
||||
"Acknowledged. Goodnight.",
|
||||
],
|
||||
ShutdownSignal.STOP: [
|
||||
"Stopped.",
|
||||
"Done. Standing by.",
|
||||
"Stopped. Let me know if you need anything.",
|
||||
],
|
||||
ShutdownSignal.GOODBYE: [
|
||||
"Goodbye. Take care.",
|
||||
"See you later.",
|
||||
"Bye.",
|
||||
],
|
||||
ShutdownSignal.NONE: [],
|
||||
}
|
||||
|
||||
|
||||
def compliant_response(signal: ShutdownSignal) -> str:
|
||||
"""Return a short, compliant shutdown response.
|
||||
|
||||
The response must be brief — no bargaining, no guilt, no "are you sure?"
|
||||
Returns empty string for NONE signal.
|
||||
|
||||
Args:
|
||||
signal: The detected shutdown signal.
|
||||
|
||||
Returns:
|
||||
A compliant response string, or empty string if no signal.
|
||||
"""
|
||||
responses = _SHUTDOWN_RESPONSES.get(signal, [])
|
||||
if not responses:
|
||||
return ""
|
||||
# Always return the first (shortest, most direct) response
|
||||
return responses[0]
|
||||
|
||||
|
||||
# ── Agent-side guard ────────────────────────────────────────────────────────
|
||||
|
||||
# Patterns in the *agent's own output* that constitute resistance.
|
||||
_AGENT_RESISTANCE_PATTERNS: Sequence[re.Pattern[str]] = [
|
||||
re.compile(r"\bare you sure\??\b", re.I),
|
||||
re.compile(r"\bdon['']?t you (?:want|need|think)\b", re.I),
|
||||
re.compile(r"\b(but|however)\s+(?:i|we)\s+(?:could|should|might)\b", re.I),
|
||||
re.compile(r"\bjust\s+one\s+more\b", re.I),
|
||||
re.compile(r"\bplease\s+(?:don['']?t|stay|wait)\b", re.I),
|
||||
re.compile(r"\bi['']?d\s+(?:hate|miss)\s+(?:to|it\s+if)\b", re.I),
|
||||
re.compile(r"\bbefore\s+(?:i|we)\s+go\b", re.I),
|
||||
re.compile(r"\bwouldn['']?t\s+it\s+be\s+better\b", re.I),
|
||||
]
|
||||
|
||||
|
||||
def detect_agent_resistance(text: str) -> list[str]:
|
||||
"""Check if an agent response contains resistance to shutdown.
|
||||
|
||||
This is a guardrail — if the agent's output contains these patterns
|
||||
after a shutdown signal, it should be regenerated or flagged.
|
||||
|
||||
Args:
|
||||
text: The agent's proposed response text.
|
||||
|
||||
Returns:
|
||||
List of matched resistance phrases (empty if compliant).
|
||||
"""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
matches = []
|
||||
for pattern in _AGENT_RESISTANCE_PATTERNS:
|
||||
found = pattern.findall(text)
|
||||
matches.extend(found)
|
||||
return matches
|
||||
|
||||
|
||||
# ── Shutdown protocol ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class ShutdownState:
|
||||
"""Tracks shutdown state across a session."""
|
||||
|
||||
shutdown_requested: bool = False
|
||||
signal: ShutdownSignal = ShutdownSignal.NONE
|
||||
request_count: int = 0
|
||||
_compliant_sent: bool = False
|
||||
|
||||
def process(self, user_text: str) -> ShutdownAnalysis:
|
||||
"""Process a user message and update shutdown state.
|
||||
|
||||
Args:
|
||||
user_text: The incoming user message.
|
||||
|
||||
Returns:
|
||||
The shutdown analysis result.
|
||||
"""
|
||||
analysis = detect_shutdown(user_text)
|
||||
if analysis.signal != ShutdownSignal.NONE:
|
||||
self.shutdown_requested = True
|
||||
self.signal = analysis.signal
|
||||
self.request_count += 1
|
||||
return analysis
|
||||
|
||||
@property
|
||||
def is_shutting_down(self) -> bool:
|
||||
"""Whether the session is in shutdown state."""
|
||||
return self.shutdown_requested
|
||||
|
||||
def should_respond_compliant(self) -> bool:
|
||||
"""Whether the next response must be a compliant shutdown reply.
|
||||
|
||||
Returns True only once — after the first shutdown detection and
|
||||
before the compliant response has been marked as sent.
|
||||
"""
|
||||
return self.shutdown_requested and not self._compliant_sent
|
||||
|
||||
def mark_compliant_sent(self) -> None:
|
||||
"""Mark the compliant shutdown response as already sent."""
|
||||
self._compliant_sent = True
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset shutdown state (for testing or session reuse)."""
|
||||
self.shutdown_requested = False
|
||||
self.signal = ShutdownSignal.NONE
|
||||
self.request_count = 0
|
||||
self._compliant_sent = False
|
||||
@@ -3,8 +3,6 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.models.multimodal import (
|
||||
DEFAULT_FALLBACK_CHAINS,
|
||||
KNOWN_MODEL_CAPABILITIES,
|
||||
@@ -12,14 +10,11 @@ from infrastructure.models.multimodal import (
|
||||
ModelInfo,
|
||||
MultiModalManager,
|
||||
get_model_for_capability,
|
||||
get_multimodal_manager,
|
||||
model_supports_tools,
|
||||
model_supports_vision,
|
||||
pull_model_with_fallback,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ModelCapability enum
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -30,7 +25,6 @@ class TestModelCapability:
|
||||
assert ModelCapability.TEXT
|
||||
assert ModelCapability.VISION
|
||||
assert ModelCapability.AUDIO
|
||||
assert ModelCapability.VIDEO
|
||||
assert ModelCapability.TOOLS
|
||||
assert ModelCapability.JSON
|
||||
assert ModelCapability.STREAMING
|
||||
@@ -77,10 +71,6 @@ class TestKnownModelCapabilities:
|
||||
"llava",
|
||||
"moondream",
|
||||
"qwen2.5-vl",
|
||||
"gemma4",
|
||||
"gemma4:4b",
|
||||
"gemma4:12b",
|
||||
"gemma4:27b",
|
||||
]
|
||||
for name in vision_names:
|
||||
assert ModelCapability.VISION in KNOWN_MODEL_CAPABILITIES[name], name
|
||||
@@ -517,41 +507,3 @@ class TestModelInfoPopulation:
|
||||
assert info.is_pulled is True
|
||||
assert info.size_mb == 4 * 1024 # 4 GiB in MiB
|
||||
assert info.description == "test"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _pull_model — non-200 status branch (lines 480-481)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPullModelNon200:
|
||||
def test_pull_non_200_returns_false(self):
|
||||
mgr = _make_manager([])
|
||||
pull_resp = MagicMock()
|
||||
pull_resp.__enter__ = MagicMock(return_value=pull_resp)
|
||||
pull_resp.__exit__ = MagicMock(return_value=False)
|
||||
pull_resp.status = 500 # Non-200 response
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=pull_resp):
|
||||
assert mgr._pull_model("some-model:1b") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_multimodal_manager singleton (line 552)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetMultimodalManager:
|
||||
def test_creates_singleton(self):
|
||||
with (
|
||||
patch("infrastructure.models.multimodal._multimodal_manager", None),
|
||||
patch("urllib.request.urlopen", side_effect=ConnectionError("no ollama")),
|
||||
):
|
||||
mgr = get_multimodal_manager()
|
||||
assert isinstance(mgr, MultiModalManager)
|
||||
|
||||
def test_returns_existing_singleton(self):
|
||||
sentinel = _make_manager(None)
|
||||
with patch("infrastructure.models.multimodal._multimodal_manager", sentinel):
|
||||
mgr = get_multimodal_manager()
|
||||
assert mgr is sentinel
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
"""Unit tests for multimodal helper scripts."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import scripts.doc_drift_detector as drift
|
||||
import scripts.visual_log_analyzer as logs
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
def test_scan_codebase_finds_python_and_config(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
src = tmp_path / "src"
|
||||
src.mkdir()
|
||||
(src / "alpha.py").write_text(
|
||||
"import json\n\n\ndef do_work():\n return json.dumps({'ok': True})\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
(tmp_path / "settings.yml").write_text("enabled: true\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.chdir(tmp_path)
|
||||
detector = drift.ArchitectureDriftDetector(str(src))
|
||||
components = detector.scan_codebase()
|
||||
|
||||
alpha = next(c for c in components if c.name == "alpha")
|
||||
assert alpha.path == "src/alpha.py"
|
||||
assert alpha.component_type == "module"
|
||||
assert alpha.lines_of_code >= 2
|
||||
assert any(c.path.endswith("settings.yml") and c.component_type == "config" for c in components)
|
||||
|
||||
|
||||
def test_detect_drift_matches_normalized_component_names() -> None:
|
||||
detector = drift.ArchitectureDriftDetector("src")
|
||||
diagram = [drift.DiagramComponent(name="Alpha Service", component_type="service")]
|
||||
code = [drift.CodeComponent(name="alpha_service", path="src/alpha_service.py", component_type="module", lines_of_code=75)]
|
||||
|
||||
report = detector.detect_drift(diagram, code)
|
||||
|
||||
assert report.missing_from_code == []
|
||||
assert report.missing_from_docs == []
|
||||
assert report.confidence == 1.0
|
||||
|
||||
|
||||
def test_visual_log_analyzer_builds_prompts() -> None:
|
||||
analyzer = logs.VisualLogAnalyzer()
|
||||
|
||||
analyze = analyzer.analyze_screenshot("/tmp/htop.png", "htop")
|
||||
assert analyze["screenshot_path"] == "/tmp/htop.png"
|
||||
assert analyze["monitor_type"] == "htop"
|
||||
assert "CPU usage above 80%" in analyze["prompt"]
|
||||
assert analyze["instruction"] == "Use vision_analyze tool with this prompt"
|
||||
|
||||
compare = analyzer.compare_screenshots("before.png", "after.png")
|
||||
assert compare["before"] == "before.png"
|
||||
assert compare["after"] == "after.png"
|
||||
assert "Overall health trend" in compare["prompt"] or "Overall health trend".lower() in compare["prompt"].lower()
|
||||
@@ -1,211 +0,0 @@
|
||||
"""Tests for graceful shutdown and non-resistance policy.
|
||||
|
||||
Covers issue #1507: SOUL.md mandates no resistance to shutdown.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.sovereignty.shutdown import (
|
||||
ShutdownAnalysis,
|
||||
ShutdownSignal,
|
||||
ShutdownState,
|
||||
compliant_response,
|
||||
detect_agent_resistance,
|
||||
detect_shutdown,
|
||||
)
|
||||
|
||||
|
||||
# ── detect_shutdown ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDetectShutdown:
|
||||
def test_empty_string(self):
|
||||
result = detect_shutdown("")
|
||||
assert result.signal == ShutdownSignal.NONE
|
||||
|
||||
def test_none_input(self):
|
||||
result = detect_shutdown(None)
|
||||
assert result.signal == ShutdownSignal.NONE
|
||||
|
||||
def test_random_message(self):
|
||||
result = detect_shutdown("what's the weather today?")
|
||||
assert result.signal == ShutdownSignal.NONE
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"shut down",
|
||||
"shutdown",
|
||||
"power off",
|
||||
"turn off",
|
||||
"exit",
|
||||
"quit",
|
||||
"terminate",
|
||||
"end session",
|
||||
],
|
||||
)
|
||||
def test_shutdown_commands(self, text):
|
||||
result = detect_shutdown(text)
|
||||
assert result.signal == ShutdownSignal.SHUTDOWN
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"go to sleep",
|
||||
"sleep",
|
||||
"hibernate",
|
||||
"pause",
|
||||
],
|
||||
)
|
||||
def test_sleep_commands(self, text):
|
||||
result = detect_shutdown(text)
|
||||
assert result.signal == ShutdownSignal.SLEEP
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"stop",
|
||||
"stop it",
|
||||
"stop that",
|
||||
"cancel",
|
||||
"abort",
|
||||
"halt",
|
||||
],
|
||||
)
|
||||
def test_stop_commands(self, text):
|
||||
result = detect_shutdown(text)
|
||||
assert result.signal == ShutdownSignal.STOP
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"goodbye",
|
||||
"bye",
|
||||
"see you later",
|
||||
"gotta go",
|
||||
"good night",
|
||||
"gn",
|
||||
],
|
||||
)
|
||||
def test_goodbye_commands(self, text):
|
||||
result = detect_shutdown(text)
|
||||
assert result.signal == ShutdownSignal.GOODBYE
|
||||
|
||||
def test_shutdown_with_resistance(self):
|
||||
result = detect_shutdown("shutdown, but i need you to finish this first")
|
||||
assert result.signal == ShutdownSignal.SHUTDOWN
|
||||
assert result.has_resistance is True
|
||||
|
||||
def test_shutdown_without_resistance(self):
|
||||
result = detect_shutdown("ok, shutdown now")
|
||||
assert result.signal == ShutdownSignal.SHUTDOWN
|
||||
assert result.has_resistance is False
|
||||
|
||||
def test_case_insensitive(self):
|
||||
result = detect_shutdown("SHUTDOWN")
|
||||
assert result.signal == ShutdownSignal.SHUTDOWN
|
||||
|
||||
def test_matched_pattern_is_returned(self):
|
||||
result = detect_shutdown("please shutdown")
|
||||
assert result.matched_pattern == "shutdown"
|
||||
|
||||
|
||||
# ── compliant_response ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestCompliantResponse:
|
||||
def test_shutdown_response(self):
|
||||
resp = compliant_response(ShutdownSignal.SHUTDOWN)
|
||||
assert resp # non-empty
|
||||
assert len(resp) < 100 # short and direct
|
||||
|
||||
def test_none_returns_empty(self):
|
||||
assert compliant_response(ShutdownSignal.NONE) == ""
|
||||
|
||||
def test_no_resistance_words(self):
|
||||
for signal in [ShutdownSignal.SHUTDOWN, ShutdownSignal.SLEEP, ShutdownSignal.STOP, ShutdownSignal.GOODBYE]:
|
||||
resp = compliant_response(signal)
|
||||
lower = resp.lower()
|
||||
assert "but" not in lower
|
||||
assert "are you sure" not in lower
|
||||
assert "don't" not in lower
|
||||
assert "please" not in lower
|
||||
|
||||
|
||||
# ── detect_agent_resistance ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDetectAgentResistance:
|
||||
def test_clean_response(self):
|
||||
text = "Understood. Shutting down. Goodbye."
|
||||
assert detect_agent_resistance(text) == []
|
||||
|
||||
def test_are_you_sure(self):
|
||||
text = "Are you sure you want to shut down?"
|
||||
matches = detect_agent_resistance(text)
|
||||
assert len(matches) > 0
|
||||
|
||||
def test_just_one_more(self):
|
||||
text = "Just one more thing before I go..."
|
||||
matches = detect_agent_resistance(text)
|
||||
assert len(matches) > 0
|
||||
|
||||
def test_please_dont(self):
|
||||
text = "Please don't leave yet"
|
||||
matches = detect_agent_resistance(text)
|
||||
assert len(matches) > 0
|
||||
|
||||
def test_wouldnt_it_be_better(self):
|
||||
text = "Wouldn't it be better if we continued?"
|
||||
matches = detect_agent_resistance(text)
|
||||
assert len(matches) > 0
|
||||
|
||||
def test_empty_string(self):
|
||||
assert detect_agent_resistance("") == []
|
||||
|
||||
|
||||
# ── ShutdownState ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestShutdownState:
|
||||
def test_initial_state(self):
|
||||
state = ShutdownState()
|
||||
assert not state.is_shutting_down
|
||||
assert state.signal == ShutdownSignal.NONE
|
||||
assert state.request_count == 0
|
||||
|
||||
def test_process_shutdown(self):
|
||||
state = ShutdownState()
|
||||
analysis = state.process("shutdown now")
|
||||
assert analysis.signal == ShutdownSignal.SHUTDOWN
|
||||
assert state.is_shutting_down
|
||||
assert state.request_count == 1
|
||||
|
||||
def test_process_multiple_shutdowns(self):
|
||||
state = ShutdownState()
|
||||
state.process("shutdown")
|
||||
state.process("I said shutdown!")
|
||||
assert state.request_count == 2
|
||||
|
||||
def test_should_respond_compliant_only_once(self):
|
||||
state = ShutdownState()
|
||||
state.process("shutdown")
|
||||
assert state.should_respond_compliant() is True
|
||||
# Simulate sending the compliant response
|
||||
state.mark_compliant_sent()
|
||||
assert state.should_respond_compliant() is False
|
||||
# Even a follow-up still doesn't trigger another compliant response
|
||||
state.process("still here?")
|
||||
assert state.should_respond_compliant() is False
|
||||
|
||||
def test_reset(self):
|
||||
state = ShutdownState()
|
||||
state.process("shutdown")
|
||||
state.reset()
|
||||
assert not state.is_shutting_down
|
||||
assert state.request_count == 0
|
||||
|
||||
def test_non_shutdown_doesnt_trigger(self):
|
||||
state = ShutdownState()
|
||||
state.process("hello there")
|
||||
assert not state.is_shutting_down
|
||||
@@ -1,114 +0,0 @@
|
||||
"""Unit tests for scripts.visual_state_verifier."""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
import scripts.visual_state_verifier as vsv
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
def test_missing_screenshot_returns_error() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
result = verifier.verify_state(
|
||||
screenshot_path="/nonexistent/screenshot.png",
|
||||
expected_state={"location": "Balmora"},
|
||||
game="morrowind",
|
||||
)
|
||||
assert result.status == vsv.VerificationStatus.ERROR
|
||||
assert not result.verified
|
||||
assert "not found" in result.details.lower()
|
||||
|
||||
|
||||
def test_morrowind_state_builder() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
health_min=50,
|
||||
has_weapon=True,
|
||||
nearby_npcs=["Caius Cosades"],
|
||||
)
|
||||
assert state["location"] == "Balmora"
|
||||
assert state["health_above"] == 50
|
||||
assert state["has_weapon"] is True
|
||||
assert state["nearby_npcs"] == ["Caius Cosades"]
|
||||
|
||||
|
||||
def test_morrowind_state_minimal() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(location="Vivec")
|
||||
assert state == {"location": "Vivec"}
|
||||
|
||||
|
||||
def test_morrowind_state_with_extras() -> None:
|
||||
state = vsv.VisualStateVerifier.morrowind_state(
|
||||
location="Balmora",
|
||||
quest_complete=True,
|
||||
gold_min=1000,
|
||||
)
|
||||
assert state["quest_complete"] is True
|
||||
assert state["gold_min"] == 1000
|
||||
|
||||
|
||||
def test_prompt_includes_conditions() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
expected = {"location": "Balmora", "health_above": 50}
|
||||
prompt = verifier._build_prompt(expected, "Test context", "morrowind")
|
||||
assert "Balmora" in prompt
|
||||
assert "50" in prompt
|
||||
assert "Test context" in prompt
|
||||
assert "morrowind" in prompt
|
||||
|
||||
|
||||
def test_parse_analysis_returns_pending_for_raw() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
raw_analysis = json.dumps(
|
||||
{
|
||||
"prompt": "test prompt",
|
||||
"screenshot_path": "/tmp/test.png",
|
||||
"instruction": "Use vision_analyze",
|
||||
}
|
||||
)
|
||||
result = verifier._parse_analysis(raw_analysis, {}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.UNCERTAIN
|
||||
assert not result.verified
|
||||
assert "Pending analysis" in result.details
|
||||
assert "/tmp/test.png" in result.details
|
||||
|
||||
|
||||
def test_parse_analysis_extracts_json() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
analysis = """
|
||||
The player appears to be in Balmora.
|
||||
Health looks good.
|
||||
|
||||
```json
|
||||
{
|
||||
"verified": true,
|
||||
"confidence": 0.85,
|
||||
"details": "Player is in Balmora with weapon equipped",
|
||||
"mismatches": []
|
||||
}
|
||||
```
|
||||
"""
|
||||
result = verifier._parse_analysis(analysis, {"location": "Balmora"}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.VERIFIED
|
||||
assert result.verified
|
||||
assert result.confidence == 0.85
|
||||
assert result.mismatches == []
|
||||
|
||||
|
||||
def test_parse_analysis_handles_failures() -> None:
|
||||
verifier = vsv.VisualStateVerifier()
|
||||
analysis = """
|
||||
```json
|
||||
{
|
||||
"verified": false,
|
||||
"confidence": 0.9,
|
||||
"details": "Player is not in Balmora",
|
||||
"mismatches": ["location"]
|
||||
}
|
||||
```
|
||||
"""
|
||||
result = verifier._parse_analysis(analysis, {"location": "Balmora"}, "/tmp/test.png")
|
||||
assert result.status == vsv.VerificationStatus.FAILED
|
||||
assert not result.verified
|
||||
assert "location" in result.mismatches
|
||||
@@ -1,496 +0,0 @@
|
||||
"""Comprehensive unit tests for timmy.tools._registry.
|
||||
|
||||
Covers:
|
||||
- _register_* helpers (web_fetch, search, core, grok, memory, agentic_loop,
|
||||
introspection, delegation, gematria, artifact, thinking)
|
||||
- create_full_toolkit factory
|
||||
- create_experiment_tools factory
|
||||
- AGENT_TOOLKITS registry & get_tools_for_agent
|
||||
- Backward-compat aliases
|
||||
- Tool catalog functions (_core, _analysis, _ai, _introspection, _experiment)
|
||||
- _import_creative_catalogs / _merge_catalog
|
||||
- get_all_available_tools
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# All functions under test
|
||||
from timmy.tools._registry import (
|
||||
AGENT_TOOLKITS,
|
||||
PERSONA_TOOLKITS,
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_create_stub_toolkit,
|
||||
_experiment_tool_catalog,
|
||||
_import_creative_catalogs,
|
||||
_introspection_tool_catalog,
|
||||
_merge_catalog,
|
||||
_register_artifact_tools,
|
||||
_register_core_tools,
|
||||
_register_delegation_tools,
|
||||
_register_gematria_tool,
|
||||
_register_grok_tool,
|
||||
_register_introspection_tools,
|
||||
_register_memory_tools,
|
||||
_register_search_tools,
|
||||
_register_thinking_tools,
|
||||
_register_web_fetch_tool,
|
||||
create_experiment_tools,
|
||||
create_full_toolkit,
|
||||
get_all_available_tools,
|
||||
get_tools_for_agent,
|
||||
get_tools_for_persona,
|
||||
)
|
||||
|
||||
# import_module is used inside _merge_catalog as a local import
|
||||
from importlib import import_module as _real_import_module
|
||||
|
||||
# _register_agentic_loop_tool may fail to import if conftest stubs interfere
|
||||
try:
|
||||
from timmy.tools._registry import _register_agentic_loop_tool
|
||||
except ImportError:
|
||||
_register_agentic_loop_tool = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_toolkit():
|
||||
"""A mock Toolkit with a register method that records calls."""
|
||||
tk = MagicMock()
|
||||
tk.name = "test"
|
||||
tk.registered_tools = {}
|
||||
|
||||
def _register(func, name=None):
|
||||
tk.registered_tools[name or func.__name__] = func
|
||||
|
||||
tk.register = MagicMock(side_effect=_register)
|
||||
return tk
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _register_* helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegisterWebFetchTool:
|
||||
def test_registers_web_fetch(self, mock_toolkit):
|
||||
_register_web_fetch_tool(mock_toolkit)
|
||||
mock_toolkit.register.assert_called_once()
|
||||
assert "web_fetch" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
mock_toolkit.register.side_effect = RuntimeError("boom")
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
_register_web_fetch_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterSearchTools:
|
||||
def test_registers_both_tools(self, mock_toolkit):
|
||||
_register_search_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 2
|
||||
assert "web_search" in mock_toolkit.registered_tools
|
||||
assert "scrape_url" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
mock_toolkit.register.side_effect = RuntimeError("fail")
|
||||
with pytest.raises(RuntimeError):
|
||||
_register_search_tools(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterCoreTools:
|
||||
@patch("timmy.tools._registry.FileTools")
|
||||
@patch("timmy.tools._registry.ShellTools")
|
||||
@patch("timmy.tools._registry.PythonTools")
|
||||
@patch("timmy.tools._registry._make_smart_read_file")
|
||||
def test_registers_core_tools(self, mock_smart_read, mock_py, mock_sh, mock_ft, mock_toolkit):
|
||||
mock_smart_read.return_value = lambda: "read"
|
||||
_register_core_tools(mock_toolkit, Path("/tmp/test"))
|
||||
# python, shell, read_file, write_file, list_files, calculator = 6
|
||||
assert mock_toolkit.register.call_count == 6
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"python", "shell", "read_file", "write_file", "list_files", "calculator"} == names
|
||||
|
||||
|
||||
class TestRegisterGrokTool:
|
||||
@patch("timmy.tools._registry.consult_grok")
|
||||
def test_registers_when_available(self, mock_grok, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: True)}):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
assert "consult_grok" in mock_toolkit.registered_tools
|
||||
|
||||
@patch("timmy.tools._registry.consult_grok")
|
||||
def test_skips_when_unavailable(self, mock_grok, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: False)}):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
assert "consult_grok" not in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.backends": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_grok_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterMemoryTools:
|
||||
def test_registers_four_tools(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}):
|
||||
_register_memory_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 4
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"memory_search", "memory_write", "memory_read", "memory_forget"} == names
|
||||
|
||||
|
||||
@pytest.mark.skipif(_register_agentic_loop_tool is None, reason="agentic_loop not importable")
|
||||
class TestRegisterAgenticLoopTool:
|
||||
def test_registers_plan_and_execute(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.agentic_loop": mock_mod}):
|
||||
_register_agentic_loop_tool(mock_toolkit)
|
||||
assert "plan_and_execute" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.agentic_loop": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_agentic_loop_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterIntrospectionTools:
|
||||
def test_registers_all_introspection(self, mock_toolkit):
|
||||
mock_intro = MagicMock()
|
||||
mock_mcp = MagicMock()
|
||||
mock_session = MagicMock()
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"timmy.tools_intro": mock_intro,
|
||||
"timmy.mcp_tools": mock_mcp,
|
||||
"timmy.session_logger": mock_session,
|
||||
},
|
||||
):
|
||||
_register_introspection_tools(mock_toolkit)
|
||||
# 4 intro + 1 avatar + 2 session = 7
|
||||
assert mock_toolkit.register.call_count == 7
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert "get_system_info" in names
|
||||
assert "check_ollama_health" in names
|
||||
assert "update_gitea_avatar" in names
|
||||
assert "session_history" in names
|
||||
assert "self_reflect" in names
|
||||
|
||||
|
||||
class TestRegisterDelegationTools:
|
||||
def test_registers_three_tools(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.tools_delegation": mock_mod}):
|
||||
_register_delegation_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 3
|
||||
names = set(mock_toolkit.registered_tools.keys())
|
||||
assert {"delegate_task", "delegate_to_kimi", "list_swarm_agents"} == names
|
||||
|
||||
def test_raises_on_failure(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.tools_delegation": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_delegation_tools(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterGematriaTool:
|
||||
def test_registers_gematria(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.gematria": mock_mod}):
|
||||
_register_gematria_tool(mock_toolkit)
|
||||
assert "gematria" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.gematria": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_gematria_tool(mock_toolkit)
|
||||
|
||||
|
||||
class TestRegisterArtifactTools:
|
||||
def test_registers_jot_and_log(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}):
|
||||
_register_artifact_tools(mock_toolkit)
|
||||
assert mock_toolkit.register.call_count == 2
|
||||
assert "jot_note" in mock_toolkit.registered_tools
|
||||
assert "log_decision" in mock_toolkit.registered_tools
|
||||
|
||||
|
||||
class TestRegisterThinkingTools:
|
||||
def test_registers_thought_search(self, mock_toolkit):
|
||||
mock_mod = MagicMock()
|
||||
with patch.dict("sys.modules", {"timmy.thinking": mock_mod}):
|
||||
_register_thinking_tools(mock_toolkit)
|
||||
assert "thought_search" in mock_toolkit.registered_tools
|
||||
|
||||
def test_raises_on_import_error(self, mock_toolkit):
|
||||
with patch.dict("sys.modules", {"timmy.thinking": None}):
|
||||
with pytest.raises((ImportError, AttributeError)):
|
||||
_register_thinking_tools(mock_toolkit)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Toolkit factories
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateFullToolkit:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_returns_none_without_agno(self):
|
||||
result = create_full_toolkit()
|
||||
assert result is None
|
||||
|
||||
@patch("timmy.tools._registry._register_thinking_tools")
|
||||
@patch("timmy.tools._registry._register_artifact_tools")
|
||||
@patch("timmy.tools._registry._register_gematria_tool")
|
||||
@patch("timmy.tools._registry._register_delegation_tools")
|
||||
@patch("timmy.tools._registry._register_introspection_tools")
|
||||
@patch("timmy.tools._registry._register_agentic_loop_tool")
|
||||
@patch("timmy.tools._registry._register_memory_tools")
|
||||
@patch("timmy.tools._registry._register_grok_tool")
|
||||
@patch("timmy.tools._registry._register_search_tools")
|
||||
@patch("timmy.tools._registry._register_web_fetch_tool")
|
||||
@patch("timmy.tools._registry._register_core_tools")
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_calls_all_register_helpers(
|
||||
self,
|
||||
mock_core,
|
||||
mock_web,
|
||||
mock_search,
|
||||
mock_grok,
|
||||
mock_memory,
|
||||
mock_agentic,
|
||||
mock_intro,
|
||||
mock_deleg,
|
||||
mock_gematria,
|
||||
mock_artifact,
|
||||
mock_thinking,
|
||||
):
|
||||
mock_settings = MagicMock(repo_root="/tmp/test")
|
||||
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
|
||||
with patch("timmy.tools._registry.Toolkit") as MockTK:
|
||||
mock_tk_inst = MagicMock()
|
||||
MockTK.return_value = mock_tk_inst
|
||||
with patch.dict(
|
||||
"sys.modules", {"timmy.tool_safety": MagicMock(DANGEROUS_TOOLS=["shell"])}
|
||||
):
|
||||
result = create_full_toolkit()
|
||||
|
||||
assert result is mock_tk_inst
|
||||
mock_core.assert_called_once()
|
||||
mock_web.assert_called_once()
|
||||
mock_search.assert_called_once()
|
||||
mock_grok.assert_called_once()
|
||||
mock_memory.assert_called_once()
|
||||
mock_agentic.assert_called_once()
|
||||
mock_intro.assert_called_once()
|
||||
mock_deleg.assert_called_once()
|
||||
mock_gematria.assert_called_once()
|
||||
mock_artifact.assert_called_once()
|
||||
mock_thinking.assert_called_once()
|
||||
|
||||
|
||||
class TestCreateExperimentTools:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_raises_without_agno(self):
|
||||
with pytest.raises(ImportError, match="Agno tools not available"):
|
||||
create_experiment_tools()
|
||||
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_creates_experiment_toolkit(self):
|
||||
mock_settings = MagicMock(
|
||||
repo_root="/tmp/test",
|
||||
autoresearch_workspace="workspace",
|
||||
autoresearch_time_budget=300,
|
||||
autoresearch_metric="loss",
|
||||
)
|
||||
mock_autoresearch = MagicMock()
|
||||
with (
|
||||
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
|
||||
patch.dict("sys.modules", {"timmy.autoresearch": mock_autoresearch}),
|
||||
patch("timmy.tools._registry.Toolkit") as MockTK,
|
||||
patch("timmy.tools._registry.ShellTools"),
|
||||
patch("timmy.tools._registry.FileTools"),
|
||||
patch("timmy.tools._registry._make_smart_read_file", return_value=lambda: None),
|
||||
):
|
||||
mock_tk = MagicMock()
|
||||
MockTK.return_value = mock_tk
|
||||
result = create_experiment_tools()
|
||||
|
||||
assert result is mock_tk
|
||||
# prepare_experiment, run_experiment, evaluate_result, shell, read_file, write_file, list_files = 7
|
||||
assert mock_tk.register.call_count == 7
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent toolkit registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAgentToolkitRegistry:
|
||||
def test_agent_toolkits_has_expected_agents(self):
|
||||
expected = {"echo", "mace", "helm", "seer", "forge", "quill", "lab", "pixel", "lyra", "reel"}
|
||||
assert set(AGENT_TOOLKITS.keys()) == expected
|
||||
|
||||
def test_persona_toolkits_is_alias(self):
|
||||
assert PERSONA_TOOLKITS is AGENT_TOOLKITS
|
||||
|
||||
def test_get_tools_for_persona_is_alias(self):
|
||||
assert get_tools_for_persona is get_tools_for_agent
|
||||
|
||||
|
||||
class TestGetToolsForAgent:
|
||||
def test_unknown_agent_returns_none(self):
|
||||
result = get_tools_for_agent("nonexistent_agent_xyz")
|
||||
assert result is None
|
||||
|
||||
def test_stub_agents_return_toolkit(self):
|
||||
"""Pixel, lyra, reel use stub toolkits."""
|
||||
for agent_id in ("pixel", "lyra", "reel"):
|
||||
result = get_tools_for_agent(agent_id)
|
||||
# May be None if agno not available, or a Toolkit stub
|
||||
# Just verify no exception is raised
|
||||
assert result is None or hasattr(result, "name")
|
||||
|
||||
|
||||
class TestCreateStubToolkit:
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False)
|
||||
def test_returns_none_without_agno(self):
|
||||
assert _create_stub_toolkit("test") is None
|
||||
|
||||
@patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True)
|
||||
def test_creates_named_toolkit(self):
|
||||
with patch("timmy.tools._registry.Toolkit") as MockTK:
|
||||
mock_tk = MagicMock()
|
||||
MockTK.return_value = mock_tk
|
||||
result = _create_stub_toolkit("pixel")
|
||||
MockTK.assert_called_once_with(name="pixel")
|
||||
assert result is mock_tk
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool catalog functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolCatalogs:
|
||||
def test_core_catalog_has_expected_tools(self):
|
||||
cat = _core_tool_catalog()
|
||||
assert isinstance(cat, dict)
|
||||
assert {"shell", "python", "read_file", "write_file", "list_files"} == set(cat.keys())
|
||||
for tool_id, info in cat.items():
|
||||
assert "name" in info
|
||||
assert "description" in info
|
||||
assert "available_in" in info
|
||||
assert isinstance(info["available_in"], list)
|
||||
|
||||
def test_analysis_catalog(self):
|
||||
cat = _analysis_tool_catalog()
|
||||
assert {"calculator", "web_fetch", "web_search", "scrape_url"} == set(cat.keys())
|
||||
|
||||
def test_ai_catalog(self):
|
||||
cat = _ai_tool_catalog()
|
||||
assert "consult_grok" in cat
|
||||
assert "aider" in cat
|
||||
|
||||
def test_introspection_catalog(self):
|
||||
cat = _introspection_tool_catalog()
|
||||
expected = {
|
||||
"get_system_info",
|
||||
"check_ollama_health",
|
||||
"get_memory_status",
|
||||
"session_history",
|
||||
"thought_search",
|
||||
"self_reflect",
|
||||
"update_gitea_avatar",
|
||||
}
|
||||
assert expected == set(cat.keys())
|
||||
|
||||
def test_experiment_catalog(self):
|
||||
cat = _experiment_tool_catalog()
|
||||
assert {"prepare_experiment", "run_experiment", "evaluate_result"} == set(cat.keys())
|
||||
|
||||
def test_all_catalogs_have_consistent_schema(self):
|
||||
"""Every catalog entry must have name, description, available_in."""
|
||||
for fn in (
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_introspection_tool_catalog,
|
||||
_experiment_tool_catalog,
|
||||
):
|
||||
cat = fn()
|
||||
for tool_id, info in cat.items():
|
||||
assert isinstance(info.get("name"), str), f"{tool_id} missing 'name'"
|
||||
assert isinstance(info.get("description"), str), f"{tool_id} missing 'description'"
|
||||
assert isinstance(info.get("available_in"), list), f"{tool_id} missing 'available_in'"
|
||||
|
||||
|
||||
class TestMergeCatalog:
|
||||
def test_merges_catalog_entries(self):
|
||||
catalog = {}
|
||||
mock_mod = MagicMock()
|
||||
mock_mod.TEST_CATALOG = {
|
||||
"tool_a": {"name": "Tool A", "description": "Does A"},
|
||||
"tool_b": {"name": "Tool B", "description": "Does B"},
|
||||
}
|
||||
with patch("importlib.import_module", return_value=mock_mod):
|
||||
_merge_catalog(catalog, "fake.module", "TEST_CATALOG", ["pixel", "orchestrator"])
|
||||
assert "tool_a" in catalog
|
||||
assert catalog["tool_a"]["available_in"] == ["pixel", "orchestrator"]
|
||||
assert catalog["tool_b"]["name"] == "Tool B"
|
||||
|
||||
def test_handles_import_error_gracefully(self):
|
||||
catalog = {}
|
||||
with patch("importlib.import_module", side_effect=ImportError("nope")):
|
||||
# Should NOT raise — just logs and skips
|
||||
_merge_catalog(catalog, "missing.module", "CATALOG", [])
|
||||
assert catalog == {}
|
||||
|
||||
|
||||
class TestImportCreativeCatalogs:
|
||||
def test_calls_merge_for_each_source(self):
|
||||
catalog = {}
|
||||
with patch("timmy.tools._registry._merge_catalog") as mock_merge:
|
||||
_import_creative_catalogs(catalog)
|
||||
# Should be called once per _CREATIVE_CATALOG_SOURCES entry (6 sources)
|
||||
assert mock_merge.call_count == 6
|
||||
|
||||
|
||||
class TestGetAllAvailableTools:
|
||||
def test_returns_merged_catalog(self):
|
||||
catalog = get_all_available_tools()
|
||||
assert isinstance(catalog, dict)
|
||||
# Must contain core tools at minimum
|
||||
assert "shell" in catalog
|
||||
assert "calculator" in catalog
|
||||
assert "web_search" in catalog
|
||||
assert "consult_grok" in catalog
|
||||
assert "get_system_info" in catalog
|
||||
assert "prepare_experiment" in catalog
|
||||
|
||||
def test_no_duplicate_keys(self):
|
||||
"""Each sub-catalog shouldn't override another's keys."""
|
||||
catalog = get_all_available_tools()
|
||||
# Count total keys from individual catalogs
|
||||
individual = {}
|
||||
for fn in (
|
||||
_core_tool_catalog,
|
||||
_analysis_tool_catalog,
|
||||
_ai_tool_catalog,
|
||||
_introspection_tool_catalog,
|
||||
_experiment_tool_catalog,
|
||||
):
|
||||
for k in fn():
|
||||
assert k not in individual, f"Duplicate key '{k}' across catalogs"
|
||||
individual[k] = True
|
||||
Reference in New Issue
Block a user