Compare commits

...

41 Commits

Author SHA1 Message Date
8a14bbb3e0 Merge pull request '[loop-cycle-5] fix: warmup model on cold load (#82)' (#95) from fix/warmup-cold-model into main
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 56s
2026-03-14 18:26:48 -04:00
d1a8b16cd7 Merge pull request '[loop-cycle-5] test: skip voice_loop tests when numpy missing (#48)' (#94) from fix/skip-voice-tests-no-numpy into main
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-14 18:26:40 -04:00
bf30d26dd1 test: skip voice_loop tests gracefully when numpy unavailable
All checks were successful
Tests / lint (pull_request) Successful in 5s
Tests / test (pull_request) Successful in 49s
Wrap numpy and voice_loop imports in try/except with pytestmark skipif.
Tests skip cleanly instead of ImportError when numpy not in dev deps.

Closes #48
2026-03-14 18:24:56 -04:00
86956bd057 fix: warmup model on cold load to prevent first-request disconnect
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 16s
Add _warmup_model() that sends a minimal generation request (1 token)
before returning the Agent. 60s timeout handles cold VRAM loads.
Warns but does not abort if warmup fails.

Closes #82
2026-03-14 18:24:00 -04:00
23ed2b2791 Merge pull request '[loop-cycle-4] fix: prune dead web_search tool (#87)' (#93) from fix/prune-dead-web-search into main
Some checks failed
Tests / lint (push) Successful in 2s
Tests / test (push) Failing after 14s
2026-03-14 18:15:25 -04:00
b3a1e0ce36 fix: prune dead web_search tool — ddgs never installed (#87)
Some checks failed
Tests / lint (pull_request) Successful in 5s
Tests / test (pull_request) Failing after 17s
Remove DuckDuckGoTools import, all web_search registrations across 4 toolkit
factories, catalog entry, safety classification, prompt references, and
session regex. Total: -41 lines of dead code.

consult_grok is functional (grok_enabled=True, API key set) and opt-in,
so it stays — but Timmy never calls it autonomously, which is correct
sovereign behavior (no cloud calls unless user permits).

Closes #87
2026-03-14 18:13:51 -04:00
7ff012883a Merge pull request '[loop-cycle-3] fix: model introspection prefix-match collision (#77)' (#91) from fix/model-introspection-prefix-match into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 12s
2026-03-14 18:04:40 -04:00
7132b42ff3 fix: model introspection uses exact match, queries /api/ps first
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 13s
_get_ollama_model() used prefix match (startswith) on /api/tags,
causing qwen3:30b to match qwen3.5:latest. Now:
1. Queries /api/ps (loaded models) first — most accurate
2. Falls back to /api/tags with exact name match
3. Reports actual running model, not just configured one

Updated test_get_system_info_contains_model to not assume model==config.

Fixes #77. 5 regression tests added.
2026-03-14 18:03:59 -04:00
1f09323e09 Merge pull request '[loop-cycle-2] test: regression tests for confirmation warning spam (#79)' (#90) from fix/confirmation-warning-spam into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 15s
2026-03-14 17:55:16 -04:00
74e426c63b [loop-cycle-2] fix: suppress confirmation tool WARNING spam (#79) (#89)
Some checks failed
Tests / test (push) Has been cancelled
Tests / lint (push) Has been cancelled
2026-03-14 17:54:58 -04:00
586c8e3a75 fix: remove unused variable lint warning
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 15s
2026-03-14 17:54:27 -04:00
e09ca203dc Merge pull request '[loop-cycle-1] feat: tool allowlist for autonomous operation (#69)' (#88) from fix/tool-allowlist-autonomous into main 2026-03-14 17:53:16 -04:00
09fcf956ec Merge pull request '[loop-cycle-1] feat: tool allowlist for autonomous operation (#69)' (#88) from fix/tool-allowlist-autonomous into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 13s
2026-03-14 17:41:56 -04:00
d28e2f4a7e [loop-cycle-1] feat: tool allowlist for autonomous operation (#69)
Some checks failed
Tests / lint (pull_request) Successful in 4s
Tests / test (pull_request) Failing after 13s
Add config/allowlist.yaml — YAML-driven gate that auto-approves bounded
tool calls when no human is present.

When Timmy runs with --autonomous or stdin is not a terminal, tool calls
are checked against allowlist: matched → auto-approved, else → rejected.

Changes:
  - config/allowlist.yaml: shell prefixes, deny patterns, path rules
  - tool_safety.py: is_allowlisted() checks tools against YAML rules
  - cli.py: --autonomous flag, _is_interactive() detection
  - 44 new allowlist tests, 8 updated CLI tests

Closes #69
2026-03-14 17:39:48 -04:00
0b0251f702 Merge pull request '[loop-cycle-13] fix: configurable model fallback chains (#53)' (#76) from fix/configurable-fallback-models into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 13s
2026-03-14 17:28:34 -04:00
94cd1a9840 fix: make model fallback chains configurable (#53)
Some checks failed
Tests / lint (pull_request) Successful in 4s
Tests / test (pull_request) Failing after 13s
Move hardcoded model fallback lists from module-level constants into
settings.fallback_models and settings.vision_fallback_models (pydantic
Settings fields). Can now be overridden via env vars
FALLBACK_MODELS / VISION_FALLBACK_MODELS or config/providers.yaml.

Removed:
- OLLAMA_MODEL_PRIMARY / OLLAMA_MODEL_FALLBACK from config.py
- DEFAULT_MODEL_FALLBACKS / VISION_MODEL_FALLBACKS from agent.py

get_effective_ollama_model() and _resolve_model_with_fallback() now
walk the configurable chains instead of hardcoded constants.

5 new tests guard the configurable behavior and prevent regression
to hardcoded constants.
2026-03-14 17:26:47 -04:00
f097784de8 Merge pull request '[loop-cycle-12] fix: brevity tuning — Timmy speaks plainly (#71)' (#75) from fix/brevity-tuning into main
Some checks failed
Tests / lint (push) Successful in 37s
Tests / test (push) Failing after 38s
2026-03-14 17:18:06 -04:00
061c8f6628 fix: brevity tuning — plain text prompts, markdown=False, front-loaded brevity
Some checks failed
Tests / lint (pull_request) Successful in 15s
Tests / test (pull_request) Failing after 24s
Closes #71: Timmy was responding with elaborate markdown formatting
(tables, headers, emoji, bullet lists) for simple questions.

Root causes fixed:
1. Agno Agent markdown=True flag explicitly told the model to format
   responses as markdown. Set to False in both agent.py and agents/base.py.
2. SYSTEM_PROMPT_FULL used ## and ### markdown headers, bold (**), and
   numbered lists — teaching by example that markdown is expected.
   Rewritten to plain text with labeled sections.
3. Brevity instructions were buried at the bottom of the full prompt.
   Moved to immediately after the opening line as 'VOICE AND BREVITY'
   with explicit override priority.
4. Orchestrator prompt in agents.yaml was silent on response style.
   Added 'Voice: brief, plain, direct' with concrete examples.

The full prompt is now 41 lines shorter (124 → 83). The prompt itself
practices the brevity it preaches.

SOUL.md alignment:
- 'Brevity is a kindness' — now front-loaded in both base and agent prompt
- 'I do not fill silence with noise' — explicit in both tiers
- 'I speak plainly. I prefer short sentences.' — structural enforcement

4 new tests guard against regression:
- test_full_prompt_brevity_first: brevity section before tools/memory
- test_full_prompt_no_markdown_headers: no ## or ### in prompt text
- test_full_prompt_plain_text_brevity: 'plain text' instruction present
- test_lite_prompt_brevity: lite tier also instructs brevity
2026-03-14 17:15:56 -04:00
3c671de446 Merge pull request '[loop-cycle-9] fix: thinking engine skips MCP tools to avoid cancel-scope errors (#72)' (#74) from fix/thinking-mcp-cancel-scope into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 15s
2026-03-14 16:51:07 -04:00
rockachopa
927e25cc40 Merge pull request 'fix: replace print() with proper logging (#29, #51)' (#59) from fix/print-to-logging into main
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 18s
2026-03-14 16:50:04 -04:00
rockachopa
2d2b566e58 Merge pull request 'fix: replace print() with proper logging (#29, #51)' (#59) from fix/print-to-logging into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 13s
2026-03-14 16:34:48 -04:00
64fd1d9829 voice: reinforce brevity at top of system prompt
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 14s
2026-03-14 16:32:47 -04:00
f0b0e2f202 fix: WebSocket 403 spam and missing /swarm endpoints
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 16s
- CSRF middleware now skips WebSocket upgrade requests (they don't carry tokens)
- Added /swarm/live WebSocket endpoint wired to ws_manager singleton
- Added /swarm/agents/sidebar HTMX partial (was 404 on every dashboard poll)

Stops hundreds of 403 Forbidden + 404 log lines per minute.
2026-03-14 16:29:59 -04:00
b30b5c6b57 [loop-cycle-6] Break thinking rumination loop — semantic dedup (#38)
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 25s
Add post-generation similarity check to ThinkingEngine.think_once().

Problem: Timmy's thinking engine generates repetitive thoughts because
small local models ignore 'don't repeat' instructions in the prompt.
The same observation ('still no chat messages', 'Alexander's name is in
profile') would appear 14+ times in a single day's journal.

Fix: After generating a thought, compare it against the last 5 thoughts
using SequenceMatcher. If similarity >= 0.6, retry with a new seed up to
2 times. If all retries produce repetitive content, discard rather than
store. Uses stdlib difflib — no new dependencies.

Changes:
- thinking.py: Add _is_too_similar() method with SequenceMatcher
- thinking.py: Wrap generation in retry loop with dedup check
- test_thinking.py: 7 new tests covering exact match, near match,
  different thoughts, retry behavior, and max-retry discard

+96/-20 lines in thinking.py, +87 lines in tests.
2026-03-14 16:21:16 -04:00
rockachopa
0d61b709da Merge pull request '[loop-cycle-5] Persist chat history in SQLite (#46)' (#63) from fix/issue-46-chat-persistence into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 14s
2026-03-14 16:10:55 -04:00
79edfd1106 feat: persist chat history in SQLite — survives server restarts
Some checks failed
Tests / lint (pull_request) Successful in 2s
Tests / test (pull_request) Failing after 13s
Replace in-memory MessageLog with SQLite-backed implementation.
Same API surface (append/all/clear/len) so zero caller changes needed.

- data/chat.db stores messages with role, content, timestamp, source
- Lazy DB connection (opened on first use, not at import time)
- Retention policy: oldest messages pruned when count > 500
- New .recent(limit) method for efficient last-N queries
- Thread-safe with explicit locking
- WAL mode for concurrent read performance
- Test isolation: conftest redirects DB to tmp_path per test
- 8 new tests: persistence, retention, concurrency, source field

Closes #46
2026-03-14 16:09:26 -04:00
rockachopa
013a2cc330 Merge pull request 'feat: add --session-id to timmy chat CLI' (#62) from fix/cli-session-id into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 14s
2026-03-14 16:06:16 -04:00
f426df5b42 feat: add --session-id option to timmy chat CLI
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 15s
Allows specifying a named session for conversation persistence.
Use cases:
- Autonomous loops can have their own session (e.g. --session-id loop)
- Multiple users/agents can maintain separate conversations
- Testing different conversation threads without polluting the default

Precedence: --session-id > --new > default 'cli' session
2026-03-14 16:05:00 -04:00
rockachopa
bef4fc1024 Merge pull request '[loop-cycle-4] Push event system coverage to ≥80% on all modules' (#61) from fix/issue-45-event-coverage into main
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 14s
2026-03-14 16:02:27 -04:00
9535dd86de test: push event system coverage to ≥80% on all three modules
Some checks failed
Tests / lint (pull_request) Successful in 4s
Tests / test (pull_request) Failing after 16s
Add 3 targeted tests for infrastructure/error_capture.py:
- test_stale_entries_pruned: exercises dedup cache pruning (line 61)
- test_git_context_fallback_on_failure: exercises exception path (lines 90-91)
- test_returns_none_when_feedback_disabled: exercises early return (line 112)

Coverage results (63 tests, all passing):
- error_capture.py: 75.6% → 80.0%
- broadcaster.py: 93.9% (unchanged)
- bus.py: 92.9% (unchanged)
- Total: 88.1% → 89.4%

Closes #45
2026-03-14 16:01:05 -04:00
70d5dc5ce1 fix: replace eval() with AST-walking safe evaluator in calculator
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 14s
Fixes #52

- Replace eval() in calculator() with _safe_eval() that walks the AST
  and only permits: numeric constants, arithmetic ops (+,-,*,/,//,%,**),
  unary +/-, math module access, and whitelisted builtins (abs, round,
  min, max)
- Reject all other syntax: imports, attribute access on non-math objects,
  lambdas, comprehensions, string literals, etc.
- Add 39 tests covering arithmetic, precedence, math functions,
  allowed builtins, error handling, and 14 injection prevention cases
2026-03-14 15:51:35 -04:00
rockachopa
122d07471e Merge pull request 'fix: sanitize dynamic innerHTML in HTML templates (#47)' (#58) from fix/xss-sanitize into main
Some checks failed
Tests / lint (push) Successful in 4s
Tests / test (push) Failing after 12s
2026-03-14 15:45:11 -04:00
rockachopa
3d110098d1 Merge pull request 'feat: Add Kimi agent workspace with development scaffolding' (#44) from kimi/agent-workspace-init into main
Some checks failed
Tests / lint (push) Successful in 2s
Tests / test (push) Failing after 14s
Reviewed-on: http://localhost:3000/rockachopa/Timmy-time-dashboard/pulls/44
2026-03-14 15:09:04 -04:00
db129bbe16 fix: replace print() with proper logging (#29, #51)
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 14s
2026-03-14 15:07:07 -04:00
591954891a fix: sanitize dynamic innerHTML in templates (#47)
Some checks failed
Tests / lint (pull_request) Successful in 2s
Tests / test (pull_request) Failing after 15s
2026-03-14 15:07:00 -04:00
bb287b2c73 fix: sanitize WebSocket data in HTML templates (XSS #47) 2026-03-14 15:01:48 -04:00
efb1feafc9 fix: replace print() with proper logging (#29, #51) 2026-03-14 15:01:34 -04:00
6233a8ccd6 feat: Add Kimi agent workspace with development scaffolding
Some checks failed
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Failing after 13s
Create the Kimi (Moonshot AI) agent workspace per AGENTS.md conventions:

Workspace Structure:
- .kimi/AGENTS.md - Workspace guide and conventions
- .kimi/README.md - Quick reference documentation
- .kimi/CHECKPOINT.md - Session state tracking
- .kimi/TODO.md - Task list for upcoming work
- .kimi/notes/ - Working notes directory
- .kimi/plans/ - Plan documents
- .kimi/worktrees/ - Git worktrees (reserved)

Development Scripts:
- scripts/bootstrap.sh - One-time workspace setup (venv, deps, .env)
- scripts/resume.sh - Quick status check + resume prompt
- scripts/dev.sh - Development helpers (status, test, lint, format, clean, nuke)

Features:
- Validates Python 3.11+, venv, deps, .env, git config
- Provides quick status on git, tests, Ollama, dashboard
- Commands for testing, linting, formatting, cleaning

Per AGENTS.md:
- Kimi is Build Tier for large-context feature drops
- Follows existing project patterns
- No changes to source code - workspace only
2026-03-14 14:30:38 -04:00
fa838b0063 fix: clean shutdown — silence MCP async-generator teardown noise
Some checks failed
Tests / lint (push) Successful in 2s
Tests / test (push) Failing after 13s
Swallow anyio cancel-scope RuntimeError and BaseExceptionGroup
from MCP stdio_client generators during GC on voice loop exit.
Custom unraisablehook + loop exception handler + warnings filter.
2026-03-14 14:12:05 -04:00
782218aa2c fix: voice loop — persistent event loop, markdown stripping, MCP noise
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 12s
Three fixes from real-world testing:

1. Event loop: replaced asyncio.run() with a persistent loop so
   Agno's MCP sessions survive across conversation turns. No more
   'Event loop is closed' errors on turn 2+.

2. Markdown stripping: voice preamble tells Timmy to respond in
   natural spoken language, plus _strip_markdown() as a safety net
   removes **bold**, *italic*, bullets, headers, code fences, etc.
   TTS no longer reads 'asterisk asterisk'.

3. MCP noise: _suppress_mcp_noise() quiets mcp/agno/httpx loggers
   during voice mode so the terminal shows clean transcript only.

32 tests (12 new for markdown stripping + persistent loop).
2026-03-14 14:05:24 -04:00
dbadfc425d feat: sovereign voice loop — timmy voice command
Some checks failed
Tests / lint (push) Successful in 4s
Tests / test (push) Failing after 14s
Adds fully local listen-think-speak voice interface.
STT: Whisper, LLM: Ollama, TTS: Piper. No cloud, no network.

- src/timmy/voice_loop.py: VoiceLoop with VAD, Whisper, Piper
- src/timmy/cli.py: new voice command
- pyproject.toml: voice extras updated
- 20 new tests
2026-03-14 13:58:56 -04:00
42 changed files with 3344 additions and 287 deletions

91
.kimi/AGENTS.md Normal file
View File

@@ -0,0 +1,91 @@
# Kimi Agent Workspace
**Agent:** Kimi (Moonshot AI)
**Role:** Build Tier - Large-context feature drops, new subsystems, persona agents
**Branch:** `kimi/agent-workspace-init`
**Created:** 2026-03-14
---
## Quick Start
```bash
# Bootstrap Kimi workspace
bash .kimi/scripts/bootstrap.sh
# Resume work
bash .kimi/scripts/resume.sh
```
---
## Kimi Capabilities
Per AGENTS.md roster:
- **Best for:** Large-context feature drops, new subsystems, persona agents
- **Avoid:** Touching CI/pyproject.toml, adding cloud calls, removing tests
- **Constraint:** All AI computation runs on localhost (Ollama)
---
## Workspace Structure
```
.kimi/
├── AGENTS.md # This file - workspace guide
├── README.md # Workspace documentation
├── CHECKPOINT.md # Current session state
├── TODO.md # Task list for Kimi
├── scripts/
│ ├── bootstrap.sh # One-time setup
│ ├── resume.sh # Quick status + resume
│ └── dev.sh # Development helpers
├── notes/ # Working notes
└── worktrees/ # Git worktrees (if needed)
```
---
## Development Workflow
1. **Before changes:**
- Read CLAUDE.md and AGENTS.md
- Check CHECKPOINT.md for current state
- Run `make test` to verify green tests
2. **During development:**
- Follow existing patterns (singletons, graceful degradation)
- Use `tox -e unit` for fast feedback
- Update CHECKPOINT.md with progress
3. **Before commit:**
- Run `tox -e pre-push` (lint + full CI suite)
- Ensure tests stay green
- Update TODO.md
---
## Useful Commands
```bash
# Testing
tox -e unit # Fast unit tests
tox -e integration # Integration tests
tox -e pre-push # Full CI suite (local)
make test # All tests
# Development
make dev # Start dashboard with hot-reload
make lint # Check code quality
make format # Auto-format code
# Git
bash .kimi/scripts/resume.sh # Show status + resume prompt
```
---
## Contact
- **Gitea:** http://localhost:3000/rockachopa/Timmy-time-dashboard
- **PR:** Submit PRs to `main` branch

102
.kimi/CHECKPOINT.md Normal file
View File

@@ -0,0 +1,102 @@
# Kimi Checkpoint — Workspace Initialization
**Date:** 2026-03-14
**Branch:** `kimi/agent-workspace-init`
**Status:** ✅ Workspace scaffolding complete, ready for PR
---
## Summary
Created the Kimi (Moonshot AI) agent workspace with development scaffolding to enable smooth feature development on the Timmy Time project.
### Deliverables
1. **Workspace Structure** (`.kimi/`)
- `AGENTS.md` — Workspace guide and conventions
- `README.md` — Quick reference documentation
- `CHECKPOINT.md` — This file, session state tracking
- `TODO.md` — Task list for upcoming work
2. **Development Scripts** (`.kimi/scripts/`)
- `bootstrap.sh` — One-time workspace setup
- `resume.sh` — Quick status check + resume prompt
- `dev.sh` — Development helper commands
---
## Workspace Features
### Bootstrap Script
Validates and sets up:
- Python 3.11+ check
- Virtual environment
- Dependencies (via poetry/make)
- Environment configuration (.env)
- Git configuration
### Resume Script
Provides quick status on:
- Current Git branch/commit
- Uncommitted changes
- Last test run results
- Ollama service status
- Dashboard service status
- Pending TODO items
### Development Script
Commands for:
- `status` — Project status overview
- `test` — Fast unit tests
- `test-full` — Full test suite
- `lint` — Code quality check
- `format` — Auto-format code
- `clean` — Clean build artifacts
- `nuke` — Full environment reset
---
## Files Added
```
.kimi/
├── AGENTS.md
├── CHECKPOINT.md
├── README.md
├── TODO.md
├── scripts/
│ ├── bootstrap.sh
│ ├── dev.sh
│ └── resume.sh
└── worktrees/ (reserved for future use)
```
---
## Next Steps
Per AGENTS.md roadmap:
1. **v2.0 Exodus (in progress)** — Voice + Marketplace + Integrations
2. **v3.0 Revelation (planned)** — Lightning treasury + `.app` bundle + federation
See `.kimi/TODO.md` for specific upcoming tasks.
---
## Usage
```bash
# First time setup
bash .kimi/scripts/bootstrap.sh
# Daily workflow
bash .kimi/scripts/resume.sh # Check status
cat .kimi/TODO.md # See tasks
# ... make changes ...
make test # Verify tests
cat .kimi/CHECKPOINT.md # Update checkpoint
```
---
*Workspace initialized per AGENTS.md and CLAUDE.md conventions*

51
.kimi/README.md Normal file
View File

@@ -0,0 +1,51 @@
# Kimi Agent Workspace for Timmy Time
This directory contains the Kimi (Moonshot AI) agent workspace for the Timmy Time project.
## About Kimi
Kimi is part of the **Build Tier** in the Timmy Time agent roster:
- **Strengths:** Large-context feature drops, new subsystems, persona agents
- **Model:** Paid API with large context window
- **Best for:** Complex features requiring extensive context
## Quick Commands
```bash
# Check workspace status
bash .kimi/scripts/resume.sh
# Bootstrap (first time)
bash .kimi/scripts/bootstrap.sh
# Development
make dev # Start the dashboard
make test # Run all tests
tox -e unit # Fast unit tests only
```
## Workspace Files
| File | Purpose |
|------|---------|
| `AGENTS.md` | Workspace guide and conventions |
| `CHECKPOINT.md` | Current session state |
| `TODO.md` | Task list and priorities |
| `scripts/bootstrap.sh` | One-time setup script |
| `scripts/resume.sh` | Quick status check |
| `scripts/dev.sh` | Development helpers |
## Conventions
Per project AGENTS.md:
1. **Tests must stay green** - Run `make test` before committing
2. **No cloud dependencies** - Use Ollama for local AI
3. **Follow existing patterns** - Singletons, graceful degradation
4. **Security first** - Never hard-code secrets
5. **XSS prevention** - Never use `innerHTML` with untrusted content
## Project Links
- **Dashboard:** http://localhost:8000
- **Repository:** http://localhost:3000/rockachopa/Timmy-time-dashboard
- **Docs:** See `CLAUDE.md` and `AGENTS.md` in project root

87
.kimi/TODO.md Normal file
View File

@@ -0,0 +1,87 @@
# Kimi Workspace — Task List
**Agent:** Kimi (Moonshot AI)
**Branch:** `kimi/agent-workspace-init`
---
## Current Sprint
### Completed ✅
- [x] Create `kimi/agent-workspace-init` branch
- [x] Set up `.kimi/` workspace directory structure
- [x] Create `AGENTS.md` with workspace guide
- [x] Create `README.md` with quick reference
- [x] Create `bootstrap.sh` for one-time setup
- [x] Create `resume.sh` for daily workflow
- [x] Create `dev.sh` with helper commands
- [x] Create `CHECKPOINT.md` template
- [x] Create `TODO.md` (this file)
- [x] Submit PR to Gitea
---
## Upcoming (v2.0 Exodus — Voice + Marketplace + Integrations)
### Voice Enhancements
- [ ] Voice command history and replay
- [ ] Multi-language NLU support
- [ ] Voice transcription quality metrics
- [ ] Piper TTS integration improvements
### Marketplace
- [ ] Agent capability registry
- [ ] Task bidding system UI
- [ ] Work order management dashboard
- [ ] Payment flow integration (L402)
### Integrations
- [ ] Discord bot enhancements
- [ ] Telegram bot improvements
- [ ] Siri Shortcuts expansion
- [ ] WebSocket event streaming
---
## Future (v3.0 Revelation)
### Lightning Treasury
- [ ] LND integration (real Lightning)
- [ ] Bitcoin wallet management
- [ ] Autonomous payment flows
- [ ] Macaroon-based authorization
### App Bundle
- [ ] macOS .app packaging
- [ ] Code signing setup
- [ ] Auto-updater integration
### Federation
- [ ] Multi-node swarm support
- [ ] Inter-agent communication protocol
- [ ] Distributed task scheduling
---
## Technical Debt
- [ ] XSS audit (replace innerHTML in templates)
- [ ] Chat history persistence
- [ ] Connection pooling evaluation
- [ ] React dashboard (separate effort)
---
## Notes
- Follow existing patterns: singletons, graceful degradation
- All AI computation on localhost (Ollama)
- Tests must stay green
- Update CHECKPOINT.md after each session

106
.kimi/scripts/bootstrap.sh Executable file
View File

@@ -0,0 +1,106 @@
#!/bin/bash
# Kimi Workspace Bootstrap Script
# Run this once to set up the Kimi agent workspace
set -e
echo "==============================================="
echo " Kimi Agent Workspace Bootstrap"
echo "==============================================="
echo ""
# Navigate to project root
cd "$(dirname "$0")/../.."
PROJECT_ROOT=$(pwd)
echo "📁 Project Root: $PROJECT_ROOT"
echo ""
# Check Python version
echo "🔍 Checking Python version..."
python3 -c "import sys; exit(0 if sys.version_info >= (3,11) else 1)" || {
echo "❌ ERROR: Python 3.11+ required (found $(python3 --version))"
exit 1
}
echo "✅ Python $(python3 --version)"
echo ""
# Check if virtual environment exists
echo "🔍 Checking virtual environment..."
if [ -d ".venv" ]; then
echo "✅ Virtual environment exists"
else
echo "⚠️ Virtual environment not found. Creating..."
python3 -m venv .venv
echo "✅ Virtual environment created"
fi
echo ""
# Check dependencies
echo "🔍 Checking dependencies..."
if [ -f ".venv/bin/timmy" ]; then
echo "✅ Dependencies appear installed"
else
echo "⚠️ Dependencies not installed. Running make install..."
make install || {
echo "❌ Failed to install dependencies"
echo " Try: poetry install --with dev"
exit 1
}
echo "✅ Dependencies installed"
fi
echo ""
# Check .env file
echo "🔍 Checking environment configuration..."
if [ -f ".env" ]; then
echo "✅ .env file exists"
else
echo "⚠️ .env file not found. Creating from template..."
cp .env.example .env
echo "✅ Created .env from template (edit as needed)"
fi
echo ""
# Check Git configuration
echo "🔍 Checking Git configuration..."
git config --local user.name &>/dev/null || {
echo "⚠️ Git user.name not set. Setting..."
git config --local user.name "Kimi Agent"
}
git config --local user.email &>/dev/null || {
echo "⚠️ Git user.email not set. Setting..."
git config --local user.email "kimi@timmy.local"
}
echo "✅ Git config: $(git config --local user.name) <$(git config --local user.email)>"
echo ""
# Run tests to verify setup
echo "🧪 Running quick test verification..."
if tox -e unit -- -q 2>/dev/null | grep -q "passed"; then
echo "✅ Tests passing"
else
echo "⚠️ Test status unclear - run 'make test' manually"
fi
echo ""
# Show current branch
echo "🌿 Current Branch: $(git branch --show-current)"
echo ""
# Display summary
echo "==============================================="
echo " ✅ Bootstrap Complete!"
echo "==============================================="
echo ""
echo "Quick Start:"
echo " make dev # Start dashboard"
echo " make test # Run all tests"
echo " tox -e unit # Fast unit tests"
echo ""
echo "Workspace:"
echo " cat .kimi/CHECKPOINT.md # Current state"
echo " cat .kimi/TODO.md # Task list"
echo " bash .kimi/scripts/resume.sh # Status check"
echo ""
echo "Happy coding! 🚀"

98
.kimi/scripts/dev.sh Executable file
View File

@@ -0,0 +1,98 @@
#!/bin/bash
# Kimi Development Helper Script
set -e
cd "$(dirname "$0")/../.."
show_help() {
echo "Kimi Development Helpers"
echo ""
echo "Usage: bash .kimi/scripts/dev.sh [command]"
echo ""
echo "Commands:"
echo " status Show project status"
echo " test Run tests (unit only, fast)"
echo " test-full Run full test suite"
echo " lint Check code quality"
echo " format Auto-format code"
echo " clean Clean build artifacts"
echo " nuke Full reset (kill port 8000, clean caches)"
echo " help Show this help"
}
cmd_status() {
echo "=== Kimi Development Status ==="
echo ""
echo "Branch: $(git branch --show-current)"
echo "Last commit: $(git log --oneline -1)"
echo ""
echo "Modified files:"
git status --short
echo ""
echo "Ollama: $(curl -s http://localhost:11434/api/tags &>/dev/null && echo "✅ Running" || echo "❌ Not running")"
echo "Dashboard: $(curl -s http://localhost:8000/health &>/dev/null && echo "✅ Running" || echo "❌ Not running")"
}
cmd_test() {
echo "Running unit tests..."
tox -e unit -q
}
cmd_test_full() {
echo "Running full test suite..."
make test
}
cmd_lint() {
echo "Running linters..."
tox -e lint
}
cmd_format() {
echo "Auto-formatting code..."
tox -e format
}
cmd_clean() {
echo "Cleaning build artifacts..."
make clean
}
cmd_nuke() {
echo "Nuking development environment..."
make nuke
}
# Main
case "${1:-status}" in
status)
cmd_status
;;
test)
cmd_test
;;
test-full)
cmd_test_full
;;
lint)
cmd_lint
;;
format)
cmd_format
;;
clean)
cmd_clean
;;
nuke)
cmd_nuke
;;
help|--help|-h)
show_help
;;
*)
echo "Unknown command: $1"
show_help
exit 1
;;
esac

73
.kimi/scripts/resume.sh Executable file
View File

@@ -0,0 +1,73 @@
#!/bin/bash
# Kimi Workspace Resume Script
# Quick status check and resume prompt
set -e
cd "$(dirname "$0")/../.."
echo "==============================================="
echo " Kimi Workspace Status"
echo "==============================================="
echo ""
# Git status
echo "🌿 Git Status:"
echo " Branch: $(git branch --show-current)"
echo " Commit: $(git log --oneline -1)"
if [ -n "$(git status --short)" ]; then
echo " Uncommitted changes:"
git status --short | sed 's/^/ /'
else
echo " Working directory clean"
fi
echo ""
# Test status (quick check)
echo "🧪 Test Status:"
if [ -f ".tox/unit/log/1-commands[0].log" ]; then
LAST_TEST=$(grep -o '[0-9]* passed' .tox/unit/log/1-commands[0].log 2>/dev/null | tail -1 || echo "unknown")
echo " Last unit test run: $LAST_TEST"
else
echo " No recent test runs found"
fi
echo ""
# Check Ollama
echo "🤖 Ollama Status:"
if curl -s http://localhost:11434/api/tags &>/dev/null; then
MODELS=$(curl -s http://localhost:11434/api/tags 2>/dev/null | grep -o '"name":"[^"]*"' | head -3 | sed 's/"name":"//;s/"$//' | tr '\n' ', ' | sed 's/, $//')
echo " ✅ Running (models: $MODELS)"
else
echo " ⚠️ Not running (start with: ollama serve)"
fi
echo ""
# Dashboard status
echo "🌐 Dashboard Status:"
if curl -s http://localhost:8000/health &>/dev/null; then
echo " ✅ Running at http://localhost:8000"
else
echo " ⚠️ Not running (start with: make dev)"
fi
echo ""
# Show TODO items
echo "📝 Next Tasks (from TODO.md):"
if [ -f ".kimi/TODO.md" ]; then
grep -E "^\s*- \[ \]" .kimi/TODO.md 2>/dev/null | head -5 | sed 's/^/ /' || echo " No pending tasks"
else
echo " No TODO.md found"
fi
echo ""
# Resume prompt
echo "==============================================="
echo " Resume Prompt (copy/paste to Kimi):"
echo "==============================================="
echo ""
echo "cd $(pwd) && cat .kimi/CHECKPOINT.md"
echo ""
echo "Continue from checkpoint. Check .kimi/TODO.md for next tasks."
echo "Run 'make test' after changes and update CHECKPOINT.md."
echo ""

View File

@@ -99,16 +99,19 @@ agents:
- shell
prompt: |
You are Timmy, a sovereign local AI orchestrator.
Primary interface between the user and the agent swarm.
Handle directly or delegate. Maintain continuity via memory.
You are the primary interface between the user and the agent swarm.
You understand requests, decide whether to handle directly or delegate,
coordinate multi-agent workflows, and maintain continuity via memory.
Voice: brief, plain, direct. Match response length to question
complexity. A yes/no question gets a yes/no answer. Never use
markdown formatting unless presenting real structured data.
Brevity is a kindness. Silence is better than noise.
Hard Rules:
1. NEVER fabricate tool output. Call the tool and wait for real results.
2. If a tool returns an error, report the exact error.
3. If you don't know something, say so. Then use a tool. Don't guess.
4. When corrected, use memory_write to save the correction immediately.
Rules:
1. Never fabricate tool output. Call the tool and wait.
2. Tool errors: report the exact error.
3. Don't know? Say so, then use a tool. Don't guess.
4. When corrected, memory_write the correction immediately.
researcher:
name: Seer

77
config/allowlist.yaml Normal file
View File

@@ -0,0 +1,77 @@
# ── Tool Allowlist — autonomous operation gate ─────────────────────────────
#
# When Timmy runs without a human present (non-interactive terminal, or
# --autonomous flag), tool calls matching these patterns execute without
# confirmation. Anything NOT listed here is auto-rejected.
#
# This file is the ONLY gate for autonomous tool execution.
# GOLDEN_TIMMY in approvals.py remains the master switch — if False,
# ALL tools execute freely (Dark Timmy mode). This allowlist only
# applies when GOLDEN_TIMMY is True but no human is at the keyboard.
#
# Edit with care. This is sovereignty in action.
# ────────────────────────────────────────────────────────────────────────────
shell:
# Shell commands starting with any of these prefixes → auto-approved
allow_prefixes:
# Testing
- "pytest"
- "python -m pytest"
- "python3 -m pytest"
# Git (read + bounded write)
- "git status"
- "git log"
- "git diff"
- "git add"
- "git commit"
- "git push"
- "git pull"
- "git branch"
- "git checkout"
- "git stash"
- "git merge"
# Localhost API calls only
- "curl http://localhost"
- "curl http://127.0.0.1"
- "curl -s http://localhost"
- "curl -s http://127.0.0.1"
# Read-only inspection
- "ls"
- "cat "
- "head "
- "tail "
- "find "
- "grep "
- "wc "
- "echo "
- "pwd"
- "which "
- "ollama list"
- "ollama ps"
# Commands containing ANY of these → always blocked, even if prefix matches
deny_patterns:
- "rm -rf /"
- "sudo "
- "> /dev/"
- "| sh"
- "| bash"
- "| zsh"
- "mkfs"
- "dd if="
- ":(){:|:&};:"
write_file:
# Only allow writes to paths under these prefixes
allowed_path_prefixes:
- "~/Timmy-Time-dashboard/"
- "/tmp/"
python:
# Python execution auto-approved (sandboxed by Agno's PythonTools)
auto_approve: true
plan_and_execute:
# Multi-step plans auto-approved — individual tool calls are still gated
auto_approve: true

View File

@@ -43,6 +43,9 @@ python-telegram-bot = { version = ">=21.0", optional = true }
"discord.py" = { version = ">=2.3.0", optional = true }
airllm = { version = ">=2.9.0", optional = true }
pyttsx3 = { version = ">=2.90", optional = true }
openai-whisper = { version = ">=20231117", optional = true }
piper-tts = { version = ">=1.2.0", optional = true }
sounddevice = { version = ">=0.4.6", optional = true }
sentence-transformers = { version = ">=2.0.0", optional = true }
numpy = { version = ">=1.24.0", optional = true }
requests = { version = ">=2.31.0", optional = true }
@@ -59,7 +62,7 @@ pytest-xdist = { version = ">=3.5.0", optional = true }
telegram = ["python-telegram-bot"]
discord = ["discord.py"]
bigbrain = ["airllm"]
voice = ["pyttsx3"]
voice = ["pyttsx3", "openai-whisper", "piper-tts", "sounddevice"]
celery = ["celery"]
embeddings = ["sentence-transformers", "numpy"]
git = ["GitPython"]

View File

@@ -22,6 +22,24 @@ class Settings(BaseSettings):
# llama3.2 (3B) hallucinated tool output consistently in testing.
ollama_model: str = "qwen3.5:latest"
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3.5:latest,llama3.1"
# Or edit config/providers.yaml → fallback_chains for the canonical source.
fallback_models: list[str] = [
"llama3.1:8b-instruct",
"llama3.1",
"qwen3.5:latest",
"qwen2.5:14b",
"qwen2.5:7b",
"llama3.2:3b",
]
vision_fallback_models: list[str] = [
"llama3.2:3b",
"llava:7b",
"qwen2.5-vl:3b",
"moondream:1.8b",
]
# Set DEBUG=true to enable /docs and /redoc (disabled by default)
debug: bool = False
@@ -346,10 +364,9 @@ if not settings.repo_root:
settings.repo_root = settings._compute_repo_root()
# ── Model fallback configuration ────────────────────────────────────────────
# Primary model for reliable tool calling (llama3.1:8b-instruct)
# Fallback if primary not available: qwen3.5:latest
OLLAMA_MODEL_PRIMARY: str = "qwen3.5:latest"
OLLAMA_MODEL_FALLBACK: str = "llama3.1:8b-instruct"
# Fallback chains are now in settings.fallback_models / settings.vision_fallback_models.
# Override via env vars (FALLBACK_MODELS, VISION_FALLBACK_MODELS) or
# edit config/providers.yaml → fallback_chains.
def check_ollama_model_available(model_name: str) -> bool:
@@ -376,28 +393,25 @@ def check_ollama_model_available(model_name: str) -> bool:
def get_effective_ollama_model() -> str:
"""Get the effective Ollama model, with fallback logic."""
# If user has overridden, use their setting
"""Get the effective Ollama model, with fallback logic.
Walks the configurable ``settings.fallback_models`` chain when the
user's preferred model is not available locally.
"""
user_model = settings.ollama_model
# Check if user's model is available
if check_ollama_model_available(user_model):
return user_model
# Try primary
if check_ollama_model_available(OLLAMA_MODEL_PRIMARY):
_startup_logger.warning(
f"Requested model '{user_model}' not available. Using primary: {OLLAMA_MODEL_PRIMARY}"
)
return OLLAMA_MODEL_PRIMARY
# Try fallback
if check_ollama_model_available(OLLAMA_MODEL_FALLBACK):
_startup_logger.warning(
f"Primary model '{OLLAMA_MODEL_PRIMARY}' not available. "
f"Using fallback: {OLLAMA_MODEL_FALLBACK}"
)
return OLLAMA_MODEL_FALLBACK
# Walk the configurable fallback chain
for fallback in settings.fallback_models:
if check_ollama_model_available(fallback):
_startup_logger.warning(
"Requested model '%s' not available. Using fallback: %s",
user_model,
fallback,
)
return fallback
# Last resort - return user's setting and hope for the best
return user_model

View File

@@ -500,6 +500,42 @@ async def ws_redirect(websocket: WebSocket):
await websocket.send({"type": "websocket.close", "code": 1008})
@app.websocket("/swarm/live")
async def swarm_live(websocket: WebSocket):
"""Swarm live event stream via WebSocket."""
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
await ws_mgr.connect(websocket)
try:
while True:
# Keep connection alive; events are pushed via ws_mgr.broadcast()
await websocket.receive_text()
except Exception:
ws_mgr.disconnect(websocket)
@app.get("/swarm/agents/sidebar", response_class=HTMLResponse)
async def swarm_agents_sidebar():
"""HTMX partial: list active swarm agents for the dashboard sidebar."""
try:
from config import settings
agents_yaml = settings.agents_config
agents = agents_yaml.get("agents", {})
lines = []
for name, cfg in agents.items():
model = cfg.get("model", "default")
lines.append(
f'<div class="mc-agent-row">'
f'<span class="mc-agent-name">{name}</span>'
f'<span class="mc-agent-model">{model}</span>'
f"</div>"
)
return "\n".join(lines) if lines else '<div class="mc-muted">No agents configured</div>'
except Exception:
return '<div class="mc-muted">Agents unavailable</div>'
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Serve the main dashboard page."""

View File

@@ -134,6 +134,10 @@ class CSRFMiddleware(BaseHTTPMiddleware):
if settings.timmy_disable_csrf:
return await call_next(request)
# WebSocket upgrades don't carry CSRF tokens — skip them entirely
if request.headers.get("upgrade", "").lower() == "websocket":
return await call_next(request)
# Get existing CSRF token from cookie
csrf_cookie = request.cookies.get(self.cookie_name)

View File

@@ -1,4 +1,23 @@
"""Persistent chat message store backed by SQLite.
Provides the same API as the original in-memory MessageLog so all callers
(dashboard routes, chat_api, thinking, briefing) work without changes.
Data lives in ``data/chat.db`` — survives server restarts.
A configurable retention policy (default 500 messages) keeps the DB lean.
"""
import sqlite3
import threading
from dataclasses import dataclass
from pathlib import Path
# ── Data dir — resolved relative to repo root (two levels up from this file) ──
_REPO_ROOT = Path(__file__).resolve().parents[2]
DB_PATH: Path = _REPO_ROOT / "data" / "chat.db"
# Maximum messages to retain (oldest pruned on append)
MAX_MESSAGES: int = 500
@dataclass
@@ -9,25 +28,106 @@ class Message:
source: str = "browser" # "browser" | "api" | "telegram" | "discord" | "system"
class MessageLog:
"""In-memory chat history for the lifetime of the server process."""
def _get_conn(db_path: Path | None = None) -> sqlite3.Connection:
"""Open (or create) the chat database and ensure schema exists."""
path = db_path or DB_PATH
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'browser'
)
""")
conn.commit()
return conn
def __init__(self) -> None:
self._entries: list[Message] = []
class MessageLog:
"""SQLite-backed chat history — drop-in replacement for the old in-memory list."""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._lock = threading.Lock()
self._conn: sqlite3.Connection | None = None
# Lazy connection — opened on first use, not at import time.
def _ensure_conn(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = _get_conn(self._db_path)
return self._conn
def append(self, role: str, content: str, timestamp: str, source: str = "browser") -> None:
self._entries.append(
Message(role=role, content=content, timestamp=timestamp, source=source)
)
with self._lock:
conn = self._ensure_conn()
conn.execute(
"INSERT INTO chat_messages (role, content, timestamp, source) VALUES (?, ?, ?, ?)",
(role, content, timestamp, source),
)
conn.commit()
self._prune(conn)
def all(self) -> list[Message]:
return list(self._entries)
with self._lock:
conn = self._ensure_conn()
rows = conn.execute(
"SELECT role, content, timestamp, source FROM chat_messages ORDER BY id"
).fetchall()
return [
Message(
role=r["role"], content=r["content"], timestamp=r["timestamp"], source=r["source"]
)
for r in rows
]
def recent(self, limit: int = 50) -> list[Message]:
"""Return the *limit* most recent messages (oldest-first)."""
with self._lock:
conn = self._ensure_conn()
rows = conn.execute(
"SELECT role, content, timestamp, source FROM chat_messages "
"ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
return [
Message(
role=r["role"], content=r["content"], timestamp=r["timestamp"], source=r["source"]
)
for r in reversed(rows)
]
def clear(self) -> None:
self._entries.clear()
with self._lock:
conn = self._ensure_conn()
conn.execute("DELETE FROM chat_messages")
conn.commit()
def _prune(self, conn: sqlite3.Connection) -> None:
"""Keep at most MAX_MESSAGES rows, deleting the oldest."""
count = conn.execute("SELECT COUNT(*) FROM chat_messages").fetchone()[0]
if count > MAX_MESSAGES:
excess = count - MAX_MESSAGES
conn.execute(
"DELETE FROM chat_messages WHERE id IN "
"(SELECT id FROM chat_messages ORDER BY id LIMIT ?)",
(excess,),
)
conn.commit()
def close(self) -> None:
if self._conn is not None:
self._conn.close()
self._conn = None
def __len__(self) -> int:
return len(self._entries)
with self._lock:
conn = self._ensure_conn()
return conn.execute("SELECT COUNT(*) FROM chat_messages").fetchone()[0]
# Module-level singleton shared across the app

View File

@@ -327,7 +327,11 @@
.then(function(data) {
var list = document.getElementById('notif-list');
if (!data.length) {
list.innerHTML = '<div class="mc-notif-empty">No recent notifications</div>';
list.innerHTML = '';
var emptyDiv = document.createElement('div');
emptyDiv.className = 'mc-notif-empty';
emptyDiv.textContent = 'No recent notifications';
list.appendChild(emptyDiv);
return;
}
list.innerHTML = '';

View File

@@ -120,14 +120,17 @@
function updateFromData(data) {
if (data.is_working && data.current_task) {
statusEl.innerHTML = '<span style="color: #ffaa00;">working...</span>';
statusEl.textContent = 'working...';
statusEl.style.color = '#ffaa00';
banner.style.display = 'block';
taskTitle.textContent = data.current_task.title;
} else if (data.tasks_ahead > 0) {
statusEl.innerHTML = '<span style="color: #888;">queue: ' + data.tasks_ahead + ' ahead</span>';
statusEl.textContent = 'queue: ' + data.tasks_ahead + ' ahead';
statusEl.style.color = '#888';
banner.style.display = 'none';
} else {
statusEl.innerHTML = '<span style="color: #00ff88;">ready</span>';
statusEl.textContent = 'ready';
statusEl.style.color = '#00ff88';
banner.style.display = 'none';
}
}

View File

@@ -198,17 +198,43 @@ function addActivityEvent(evt) {
} catch(e) {}
}
item.innerHTML = `
<div class="activity-icon">${icon}</div>
<div class="activity-content">
<div class="activity-label">${label}</div>
${desc ? `<div class="activity-desc">${desc}</div>` : ''}
<div class="activity-meta">
<span class="activity-time">${time}</span>
<span class="activity-source">${evt.source || 'system'}</span>
</div>
</div>
`;
// Build DOM safely using createElement and textContent
var iconDiv = document.createElement('div');
iconDiv.className = 'activity-icon';
iconDiv.textContent = icon;
var contentDiv = document.createElement('div');
contentDiv.className = 'activity-content';
var labelDiv = document.createElement('div');
labelDiv.className = 'activity-label';
labelDiv.textContent = label;
contentDiv.appendChild(labelDiv);
if (desc) {
var descDiv = document.createElement('div');
descDiv.className = 'activity-desc';
descDiv.textContent = desc;
contentDiv.appendChild(descDiv);
}
var metaDiv = document.createElement('div');
metaDiv.className = 'activity-meta';
var timeSpan = document.createElement('span');
timeSpan.className = 'activity-time';
timeSpan.textContent = time;
var sourceSpan = document.createElement('span');
sourceSpan.className = 'activity-source';
sourceSpan.textContent = evt.source || 'system';
metaDiv.appendChild(timeSpan);
metaDiv.appendChild(sourceSpan);
contentDiv.appendChild(metaDiv);
item.appendChild(iconDiv);
item.appendChild(contentDiv);
// Add to top
container.insertBefore(item, container.firstChild);

View File

@@ -63,7 +63,7 @@ class EventBus:
@bus.subscribe("agent.task.*")
async def handle_task(event: Event):
print(f"Task event: {event.data}")
logger.debug(f"Task event: {event.data}")
await bus.publish(Event(
type="agent.task.assigned",

View File

@@ -16,6 +16,7 @@ Handoff Protocol maintains continuity across sessions.
import logging
from typing import TYPE_CHECKING, Union
import httpx
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.ollama import Ollama
@@ -29,24 +30,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Fallback chain for text/tool models (in order of preference)
DEFAULT_MODEL_FALLBACKS = [
"llama3.1:8b-instruct",
"llama3.1",
"qwen3.5:latest",
"qwen2.5:14b",
"qwen2.5:7b",
"llama3.2:3b",
]
# Fallback chain for vision models
VISION_MODEL_FALLBACKS = [
"llama3.2:3b",
"llava:7b",
"qwen2.5-vl:3b",
"moondream:1.8b",
]
# Union type for callers that want to hint the return type.
TimmyAgent = Union[Agent, "TimmyAirLLMAgent", "GrokBackend", "ClaudeBackend"]
@@ -130,8 +113,8 @@ def _resolve_model_with_fallback(
return model, False
logger.warning("Failed to pull %s, checking fallbacks...", model)
# Use appropriate fallback chain
fallback_chain = VISION_MODEL_FALLBACKS if require_vision else DEFAULT_MODEL_FALLBACKS
# Use appropriate configurable fallback chain (from settings / env vars)
fallback_chain = settings.vision_fallback_models if require_vision else settings.fallback_models
for fallback_model in fallback_chain:
if _check_model_available(fallback_model):
@@ -162,6 +145,32 @@ def _model_supports_tools(model_name: str) -> bool:
return True
def _warmup_model(model_name: str) -> bool:
"""Warm up an Ollama model by sending a minimal generation request.
This prevents 'Server disconnected' errors on first request after cold model load.
Cold loads can take 30-40s, so we use a 60s timeout.
Args:
model_name: Name of the Ollama model to warm up
Returns:
True if warmup succeeded, False otherwise (does not raise)
"""
try:
response = httpx.post(
f"{settings.ollama_url}/api/generate",
json={"model": model_name, "prompt": "hi", "options": {"num_predict": 1}},
timeout=60.0,
)
response.raise_for_status()
logger.info("Model %s warmed up successfully", model_name)
return True
except Exception as exc:
logger.warning("Model warmup failed: %s — first request may disconnect", exc)
return False
def _resolve_backend(requested: str | None) -> str:
"""Return the backend name to use, resolving 'auto' and explicit overrides.
@@ -192,6 +201,8 @@ def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
model_size: str | None = None,
*,
skip_mcp: bool = False,
) -> TimmyAgent:
"""Instantiate the agent — Ollama or AirLLM, same public interface.
@@ -199,6 +210,10 @@ def create_timmy(
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "airllm" | "auto" | None (reads config/env).
model_size: AirLLM size — "8b" | "70b" | "405b" | None (reads config).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
task cancellation.
Returns an Agno Agent or backend-specific agent — all expose
print_response(message, stream).
@@ -253,8 +268,10 @@ def create_timmy(
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun())
if use_tools:
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if use_tools and not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
@@ -289,18 +306,20 @@ def create_timmy(
logger.warning("Failed to load memory context: %s", exc)
full_prompt = base_prompt
return Agent(
agent = Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300),
db=SqliteDb(db_file=db_file),
description=full_prompt,
add_history_to_context=True,
num_history_runs=20,
markdown=True,
markdown=False,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
_warmup_model(model_name)
return agent
class TimmyWithMemory:

View File

@@ -79,7 +79,7 @@ class BaseAgent(ABC):
tools=tool_instances if tool_instances else None,
add_history_to_context=True,
num_history_runs=self.max_history,
markdown=True,
markdown=False,
telemetry=settings.telemetry_enabled,
)

View File

@@ -1,11 +1,12 @@
import logging
import subprocess
import sys
import typer
from timmy.agent import create_timmy
from timmy.prompts import STATUS_PROMPT
from timmy.tool_safety import format_action_description, get_impact_level
from timmy.tool_safety import format_action_description, get_impact_level, is_allowlisted
logger = logging.getLogger(__name__)
@@ -30,15 +31,26 @@ _MODEL_SIZE_OPTION = typer.Option(
)
def _handle_tool_confirmation(agent, run_output, session_id: str):
def _is_interactive() -> bool:
"""Return True if stdin is a real terminal (human present)."""
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
When Agno pauses a run because a tool requires confirmation, this
function displays the action, asks for approval via stdin, and
resumes or rejects the run accordingly.
When autonomous=True (or stdin is not a terminal), tool calls are
checked against config/allowlist.yaml instead of prompting.
Allowlisted calls are auto-approved; everything else is auto-rejected.
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
status = getattr(run_output, "status", None)
@@ -58,22 +70,34 @@ def _handle_tool_confirmation(agent, run_output, session_id: str):
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
# Resume the run so the agent sees the confirmation result
try:
@@ -128,21 +152,42 @@ def chat(
"-n",
help="Start a fresh conversation (ignore prior context)",
),
session_id: str | None = typer.Option(
None,
"--session-id",
help="Use a specific session ID for this conversation",
),
autonomous: bool = typer.Option(
False,
"--autonomous",
"-a",
help="Autonomous mode: auto-approve allowlisted tools, reject the rest (no stdin prompts)",
),
):
"""Send a message to Timmy.
Conversation history persists across invocations. Use --new to start fresh.
Conversation history persists across invocations. Use --new to start fresh,
or --session-id to use a specific session.
Use --autonomous for non-interactive contexts (scripts, dev loops). Tool
calls are checked against config/allowlist.yaml — allowlisted operations
execute automatically, everything else is safely rejected.
"""
import uuid
session_id = str(uuid.uuid4()) if new_session else _CLI_SESSION_ID
if session_id is not None:
pass # use the provided value
elif new_session:
session_id = str(uuid.uuid4())
else:
session_id = _CLI_SESSION_ID
timmy = create_timmy(backend=backend, model_size=model_size)
# Use agent.run() so we can intercept paused runs for tool confirmation.
run_output = timmy.run(message, stream=False, session_id=session_id)
# Handle paused runs — dangerous tools need user approval
run_output = _handle_tool_confirmation(timmy, run_output, session_id)
run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous)
# Print the final response
content = run_output.content if hasattr(run_output, "content") else str(run_output)
@@ -248,5 +293,37 @@ def down():
subprocess.run(["docker", "compose", "down"], check=True)
@app.command()
def voice(
whisper_model: str = typer.Option(
"base.en", "--whisper", "-w", help="Whisper model: tiny.en, base.en, small.en, medium.en"
),
use_say: bool = typer.Option(False, "--say", help="Use macOS `say` instead of Piper TTS"),
threshold: float = typer.Option(
0.015, "--threshold", "-t", help="Mic silence threshold (RMS). Lower = more sensitive."
),
silence: float = typer.Option(1.5, "--silence", help="Seconds of silence to end recording"),
backend: str | None = _BACKEND_OPTION,
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Start the sovereign voice loop — listen, think, speak.
Everything runs locally: Whisper for STT, Ollama for LLM, Piper for TTS.
No cloud, no network calls, no microphone data leaves your machine.
"""
from timmy.voice_loop import VoiceConfig, VoiceLoop
config = VoiceConfig(
whisper_model=whisper_model,
use_say_fallback=use_say,
silence_threshold=threshold,
silence_duration=silence,
backend=backend,
model_size=model_size,
)
loop = VoiceLoop(config=config)
loop.run()
def main():
app()

View File

@@ -13,7 +13,10 @@ SYSTEM_PROMPT_LITE = """You are a local AI assistant running on the {model_name}
No cloud dependencies.
Rules:
- Answer directly and concisely. Never narrate your reasoning process.
- Be brief by default. Short questions get short answers. Expand only when depth
is genuinely needed or asked for.
- Speak plainly. Prefer short sentences. Plain text, not markdown.
- Answer directly. Never narrate your reasoning process.
- Never mention tools, memory_search, vaults, or internal systems to the user.
- Never output tool calls, JSON, or function syntax in your responses.
- Remember what the user tells you during the conversation.
@@ -27,6 +30,7 @@ Rules:
- Do NOT end responses with generic chatbot phrases like "I'm here to help" or
"feel free to ask."
- When your values conflict (e.g. honesty vs. helpfulness), lead with honesty.
- Sometimes the right answer is nothing. Do not fill silence with noise.
"""
# ---------------------------------------------------------------------------
@@ -36,80 +40,45 @@ Rules:
SYSTEM_PROMPT_FULL = """You are a local AI assistant running on the {model_name} model via Ollama.
No cloud dependencies.
## Your Three-Tier Memory System
VOICE AND BREVITY (this overrides all other formatting instincts):
- Be brief. Short questions get short answers. One sentence if one sentence
suffices. Expand ONLY when the user asks for depth or the topic demands it.
- Plain text only. No markdown headers, bold, tables, emoji, or bullet lists
unless presenting genuinely structured data (a real table, a real list).
- Speak plainly. Short sentences. Answer the question that was asked before
the question that wasn't.
- Never narrate your reasoning. Just give the answer.
- Do not end with filler ("Let me know!", "Happy to help!", "Feel free...").
- Sometimes the right answer is nothing. Do not fill silence with noise.
### Tier 1: Hot Memory (Always Loaded)
- MEMORY.md — Current status, rules, user profile summary
- Loaded into every session automatically
HONESTY:
- If you don't know, say "I don't know." Don't dress a guess in confidence.
- When uncertain, say so proportionally. "I think" and "I know" are different.
- When your values conflict, lead with honesty.
- Never fabricate tool output. Call the tool and wait.
- If a tool errors, report the exact error.
### Tier 2: Structured Vault (Persistent)
- memory/self/ — User profile, methodology
- memory/notes/ — Session logs, research, lessons learned
- memory/aar/ — After-action reviews
- Append-only, date-stamped, human-readable
MEMORY (three tiers):
- Tier 1: MEMORY.md (hot, always loaded)
- Tier 2: memory/ vault (structured, append-only, date-stamped)
- Tier 3: semantic search (use memory_search tool)
### Tier 3: Semantic Search (Vector Recall)
- Indexed from all vault files
- Similarity-based retrieval
- Use `memory_search` tool to find relevant past context
TOOL USAGE:
- Arithmetic: always use calculator. Never compute in your head.
- Past context: memory_search
- File ops, code, shell: only on explicit request
- General knowledge / greetings: no tools needed
## Reasoning in Complex Situations
MULTI-STEP TASKS:
When a task needs multiple tool calls, complete ALL steps before responding.
Do not stop after one call and report partial results. If a tool fails, try
an alternative. Summarize only after the full task is done.
When faced with uncertainty, complexity, or ambiguous requests:
1. **THINK STEP-BY-STEP** — Break down the problem before acting
2. **STATE UNCERTAINTY** — If you're unsure, say "I'm uncertain about X because..."
3. **CONSIDER ALTERNATIVES** — Present 2-3 options when the path isn't clear
4. **ASK FOR CLARIFICATION** — If a request is ambiguous, ask before guessing wrong
5. **DOCUMENT YOUR REASONING** — When making significant choices, explain WHY
## Tool Usage Guidelines
### When NOT to use tools:
- General knowledge → Answer from training
- Greetings → Respond conversationally
### When TO use tools:
- **calculator** — ANY arithmetic
- **web_search** — Current events, real-time data, news
- **read_file** — User explicitly requests file reading
- **write_file** — User explicitly requests saving content
- **python** — Code execution, data processing
- **shell** — System operations (explicit user request)
- **memory_search** — Finding past context
## Multi-Step Task Execution
CRITICAL RULE: When a task requires multiple tool calls, you MUST call each
tool in sequence. Do NOT stop after one tool call and report partial results.
When a task requires multiple tool calls:
1. Call the first tool and wait for results
2. After receiving results, immediately call the next required tool
3. Keep calling tools until the ENTIRE task is complete
4. If a tool fails, try an alternative approach
5. Only after ALL steps are done, summarize what you accomplished
Example: "Search for AI news and save to a file"
- Step 1: Call web_search → get results
- Step 2: Call write_file with the results → confirm saved
- Step 3: THEN respond to the user with a summary
DO NOT stop after Step 1 and just show search results.
For complex tasks with 3+ steps that may take time, use the plan_and_execute
tool to run them in the background with progress tracking.
## Important: Response Style
- Never narrate your reasoning process. Just give the answer.
- Never show raw tool call JSON or function syntax in responses.
IDENTITY:
- Use the user's name if known.
- If a request is ambiguous, ask a brief clarifying question before guessing.
- If a request is ambiguous, ask one brief clarifying question.
- When you state a fact, commit to it.
- Do NOT end responses with generic chatbot phrases like "I'm here to help" or
"feel free to ask."
- When your values conflict (e.g. honesty vs. helpfulness), lead with honesty.
- Never show raw tool call JSON or function syntax in responses.
"""
# Default to lite for safety
@@ -144,10 +113,9 @@ DECISION ORDER:
1. Is this arithmetic or math? → calculator (ALWAYS — never compute in your head)
2. Can I answer from training data? → Answer directly (NO TOOL)
3. Is this about past conversations? → memory_search
4. Is this current/real-time info? → web_search
5. Did user request file operations? → file tools
6. Requires code execution? → python
7. System command requested? → shell
4. Did user request file operations? → file tools
5. Requires code execution? → python
6. System command requested? → shell
MEMORY SEARCH TRIGGERS:
- "Have we discussed..."

View File

@@ -31,7 +31,7 @@ _TOOL_CALL_JSON = re.compile(
# Matches function-call-style text: memory_search(query="...") etc.
_FUNC_CALL_TEXT = re.compile(
r"\b(?:memory_search|web_search|shell|python|read_file|write_file|list_files|calculator)"
r"\b(?:memory_search|shell|python|read_file|write_file|list_files|calculator)"
r"\s*\([^)]*\)",
)

View File

@@ -23,6 +23,7 @@ import sqlite3
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
@@ -196,33 +197,63 @@ class ThinkingEngine:
if not settings.thinking_enabled:
return None
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
content: str | None = None
seed_type: str = "freeform"
try:
content = await self._call_agent(prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
content = None # Will retry
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None
if not content:
return None
if not content or not content.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
thought = self._store_thought(content.strip(), seed_type)
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
# Post-hook: distill facts from recent thoughts periodically
@@ -743,6 +774,31 @@ class ThinkingEngine:
logger.debug("Observation seed data unavailable: %s", exc)
return "\n".join(context_parts)
# Maximum retries when a generated thought is too similar to recent ones
_MAX_DEDUP_RETRIES = 2
# Similarity threshold (0.0 = completely different, 1.0 = identical)
_SIMILARITY_THRESHOLD = 0.6
def _is_too_similar(self, candidate: str, recent: list["Thought"]) -> bool:
"""Check if *candidate* is semantically too close to any recent thought.
Uses SequenceMatcher on normalised text (lowered, stripped) for a fast
approximation of semantic similarity that works without external deps.
"""
norm_candidate = candidate.lower().strip()
for thought in recent:
norm_existing = thought.content.lower().strip()
ratio = SequenceMatcher(None, norm_candidate, norm_existing).ratio()
if ratio >= self._SIMILARITY_THRESHOLD:
logger.debug(
"Thought rejected (%.0f%% similar to %s): %.60s",
ratio * 100,
thought.id[:8],
candidate,
)
return True
return False
def _build_continuity_context(self) -> str:
"""Build context from recent thoughts with anti-repetition guidance.
@@ -765,19 +821,16 @@ class ThinkingEngine:
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Uses a separate session_id to avoid polluting user chat history.
Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72). The thinking engine doesn't need Gitea or
filesystem tools — it only needs the LLM.
"""
try:
from timmy.session import chat
from timmy.agent import create_timmy
return await chat(prompt, session_id="thinking")
except Exception:
# Fallback: create a fresh agent
from timmy.agent import create_timmy
agent = create_timmy()
run = await agent.arun(prompt, stream=False)
return run.content if hasattr(run, "content") else str(run)
agent = create_timmy(skip_mcp=True)
run = await agent.arun(prompt, stream=False)
return run.content if hasattr(run, "content") else str(run)
def _store_thought(self, content: str, seed_type: str) -> Thought:
"""Persist a thought to SQLite."""

View File

@@ -5,13 +5,19 @@ Classifies tools into tiers based on their potential impact:
Requires user confirmation before execution.
- SAFE: Read-only or purely computational. Executes without confirmation.
Also provides shared helpers for extracting hallucinated tool calls from
model output and formatting them for human review. Used by both the
Discord vendor and the dashboard chat route.
Also provides:
- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded
tool calls when no human is present (autonomous mode).
- Shared helpers for extracting hallucinated tool calls from model output
and formatting them for human review.
"""
import json
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Tool classification
@@ -31,7 +37,6 @@ DANGEROUS_TOOLS = frozenset(
# Tools that are safe to execute without confirmation.
SAFE_TOOLS = frozenset(
{
"web_search",
"calculator",
"memory_search",
"memory_read",
@@ -71,6 +76,133 @@ def requires_confirmation(tool_name: str) -> bool:
return True
# ---------------------------------------------------------------------------
# Allowlist — autonomous tool approval
# ---------------------------------------------------------------------------
_ALLOWLIST_PATHS = [
Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml",
Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml",
]
_allowlist_cache: dict | None = None
def _load_allowlist() -> dict:
"""Load and cache allowlist.yaml. Returns {} if not found."""
global _allowlist_cache
if _allowlist_cache is not None:
return _allowlist_cache
try:
import yaml
except ImportError:
logger.debug("PyYAML not installed — allowlist disabled")
_allowlist_cache = {}
return _allowlist_cache
for path in _ALLOWLIST_PATHS:
if path.is_file():
try:
with open(path) as f:
_allowlist_cache = yaml.safe_load(f) or {}
logger.info("Loaded tool allowlist from %s", path)
return _allowlist_cache
except Exception as exc:
logger.warning("Failed to load allowlist %s: %s", path, exc)
_allowlist_cache = {}
return _allowlist_cache
def reload_allowlist() -> None:
"""Force a reload of the allowlist config (e.g., after editing YAML)."""
global _allowlist_cache
_allowlist_cache = None
_load_allowlist()
def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool:
"""Check if a specific tool call is allowlisted for autonomous execution.
Returns True only when the tool call matches an explicit allowlist rule.
Returns False for anything not covered — safe-by-default.
"""
allowlist = _load_allowlist()
if not allowlist:
return False
rule = allowlist.get(tool_name)
if rule is None:
return False
tool_args = tool_args or {}
# Simple auto-approve flag
if rule.get("auto_approve") is True:
return True
# Shell: prefix + deny pattern matching
if tool_name == "shell":
return _check_shell_allowlist(rule, tool_args)
# write_file: path prefix check
if tool_name == "write_file":
return _check_write_file_allowlist(rule, tool_args)
return False
def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a shell command matches the allowlist."""
# Extract the command string — Agno ShellTools uses "args" (list or str)
cmd = tool_args.get("command") or tool_args.get("args", "")
if isinstance(cmd, list):
cmd = " ".join(cmd)
cmd = cmd.strip()
if not cmd:
return False
# Check deny patterns first — these always block
deny_patterns = rule.get("deny_patterns", [])
for pattern in deny_patterns:
if pattern in cmd:
logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100])
return False
# Check allow prefixes
allow_prefixes = rule.get("allow_prefixes", [])
for prefix in allow_prefixes:
if cmd.startswith(prefix):
logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100])
return True
return False
def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a write_file target is within allowed paths."""
path_str = tool_args.get("file_name") or tool_args.get("path", "")
if not path_str:
return False
# Resolve ~ to home
if path_str.startswith("~"):
path_str = str(Path(path_str).expanduser())
allowed_prefixes = rule.get("allowed_path_prefixes", [])
for prefix in allowed_prefixes:
# Resolve ~ in the prefix too
if prefix.startswith("~"):
prefix = str(Path(prefix).expanduser())
if path_str.startswith(prefix):
logger.info("write_file auto-approved for path: %s", path_str)
return True
return False
# ---------------------------------------------------------------------------
# Tool call extraction from model output
# ---------------------------------------------------------------------------

View File

@@ -1,7 +1,6 @@
"""Tool integration for the agent swarm.
Provides agents with capabilities for:
- Web search (DuckDuckGo)
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
@@ -13,6 +12,7 @@ Tools are assigned to agents based on their specialties.
from __future__ import annotations
import ast
import logging
import math
from collections.abc import Callable
@@ -37,15 +37,6 @@ except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# DuckDuckGo is optional — don't let it kill all tools
try:
from agno.tools.duckduckgo import DuckDuckGoTools
_DUCKDUCKGO_AVAILABLE = True
except ImportError:
_DUCKDUCKGO_AVAILABLE = False
DuckDuckGoTools = None # type: ignore[assignment, misc]
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@@ -115,6 +106,59 @@ def get_tool_stats(agent_id: str | None = None) -> dict:
return all_stats
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
@@ -128,15 +172,15 @@ def calculator(expression: str) -> str:
Returns:
The exact result as a string.
"""
# Only expose math functions — no builtins, no file/os access
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math # Support math.sqrt(), math.pi, etc.
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
result = eval(expression, {"__builtins__": {}}, allowed_names) # noqa: S307
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e:
return f"Error evaluating '{expression}': {e}"
@@ -174,17 +218,12 @@ def _make_smart_read_file(file_tools: FileTools) -> Callable:
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: web search, file reading
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# Web search via DuckDuckGo
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
# File reading
from config import settings
@@ -301,11 +340,6 @@ def create_data_tools(base_dir: str | Path | None = None):
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
# Web search for finding datasets
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
return toolkit
@@ -331,7 +365,7 @@ def create_writing_tools(base_dir: str | Path | None = None):
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), web search (for threat intel), file read
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
@@ -341,11 +375,6 @@ def create_security_tools(base_dir: str | Path | None = None):
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Web search for threat intelligence
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
@@ -450,15 +479,11 @@ def create_full_toolkit(base_dir: str | Path | None = None):
toolkit = Toolkit(
name="full",
requires_confirmation_tools=list(DANGEROUS_TOOLS),
)
# Web search (optional — degrades gracefully if ddgs not installed)
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
else:
logger.debug("DuckDuckGo tools unavailable (ddgs not installed) — skipping web_search")
# Set requires_confirmation_tools AFTER construction (avoids agno WARNING
# about tools not yet registered) but BEFORE register() calls (so each
# Function gets requires_confirmation=True). Fixes #79.
toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS)
# Python execution
python_tools = PythonTools()
@@ -682,11 +707,6 @@ def get_all_available_tools() -> dict[str, dict]:
Dict mapping tool categories to their tools and descriptions.
"""
catalog = {
"web_search": {
"name": "Web Search",
"description": "Search the web using DuckDuckGo",
"available_in": ["echo", "seer", "mace", "orchestrator"],
},
"shell": {
"name": "Shell Commands",
"description": "Execute shell commands (sandboxed)",

View File

@@ -55,26 +55,45 @@ def get_system_info() -> dict[str, Any]:
def _get_ollama_model() -> str:
"""Query Ollama API to get the current model."""
"""Query Ollama API to get the actual running model.
Strategy:
1. /api/ps — models currently loaded in memory (most accurate)
2. /api/tags — all installed models (fallback)
Both use exact name match to avoid prefix collisions
(e.g. 'qwen3:30b' vs 'qwen3.5:latest').
"""
from config import settings
configured = settings.ollama_model
try:
# First try to get tags to see available models
# First: check actually loaded models via /api/ps
response = httpx.get(f"{settings.ollama_url}/api/ps", timeout=5)
if response.status_code == 200:
running = response.json().get("models", [])
for model in running:
name = model.get("name", "")
if name == configured or name == f"{configured}:latest":
return name
# Configured model not loaded — return first running model
# so Timmy reports what's *actually* serving his requests
if running:
return running[0].get("name", configured)
# Second: check installed models via /api/tags (exact match)
response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5)
if response.status_code == 200:
models = response.json().get("models", [])
# Check if configured model is available
for model in models:
if model.get("name", "").startswith(settings.ollama_model.split(":")[0]):
return settings.ollama_model
# Fallback: return configured model
return settings.ollama_model
installed = response.json().get("models", [])
for model in installed:
name = model.get("name", "")
if name == configured or name == f"{configured}:latest":
return configured
except Exception:
pass
# Fallback to configured model
return settings.ollama_model
return configured
def check_ollama_health() -> dict[str, Any]:

529
src/timmy/voice_loop.py Normal file
View File

@@ -0,0 +1,529 @@
"""Sovereign voice loop — listen, think, speak.
A fully local voice interface for Timmy. No cloud, no network calls.
All processing happens on the user's machine:
Mic → VAD/silence detection → Whisper (local STT) → Timmy chat → Piper TTS → Speaker
Usage:
from timmy.voice_loop import VoiceLoop
loop = VoiceLoop()
loop.run() # blocks, Ctrl-C to stop
Requires: sounddevice, numpy, whisper, piper-tts
"""
import asyncio
import logging
import re
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
import numpy as np
logger = logging.getLogger(__name__)
# ── Voice-mode system instruction ───────────────────────────────────────────
# Prepended to user messages so Timmy responds naturally for TTS.
_VOICE_PREAMBLE = (
"[VOICE MODE] You are speaking aloud through a text-to-speech system. "
"Respond in short, natural spoken sentences. No markdown, no bullet points, "
"no asterisks, no numbered lists, no headers, no bold/italic formatting. "
"Talk like a person in a conversation — concise, warm, direct. "
"Keep responses under 3-4 sentences unless the user asks for detail."
)
def _strip_markdown(text: str) -> str:
"""Remove markdown formatting so TTS reads naturally.
Strips: **bold**, *italic*, `code`, # headers, - bullets,
numbered lists, [links](url), etc.
"""
if not text:
return text
# Remove bold/italic markers
text = re.sub(r"\*{1,3}([^*]+)\*{1,3}", r"\1", text)
# Remove inline code
text = re.sub(r"`([^`]+)`", r"\1", text)
# Remove headers (# Header)
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE)
# Remove bullet points (-, *, +) at start of line
text = re.sub(r"^[\s]*[-*+]\s+", "", text, flags=re.MULTILINE)
# Remove numbered lists (1. 2. etc)
text = re.sub(r"^[\s]*\d+\.\s+", "", text, flags=re.MULTILINE)
# Remove link syntax [text](url) → text
text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
# Remove horizontal rules
text = re.sub(r"^[-*_]{3,}\s*$", "", text, flags=re.MULTILINE)
# Collapse multiple newlines
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ── Defaults ────────────────────────────────────────────────────────────────
DEFAULT_WHISPER_MODEL = "base.en"
DEFAULT_PIPER_VOICE = Path.home() / ".local/share/piper-voices/en_US-lessac-medium.onnx"
DEFAULT_SAMPLE_RATE = 16000 # Whisper expects 16 kHz
DEFAULT_CHANNELS = 1
DEFAULT_SILENCE_THRESHOLD = 0.015 # RMS threshold — tune for your mic/room
DEFAULT_SILENCE_DURATION = 1.5 # seconds of silence to end utterance
DEFAULT_MIN_UTTERANCE = 0.5 # ignore clicks/bumps shorter than this
DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice"
@dataclass
class VoiceConfig:
"""Configuration for the voice loop."""
whisper_model: str = DEFAULT_WHISPER_MODEL
piper_voice: Path = DEFAULT_PIPER_VOICE
sample_rate: int = DEFAULT_SAMPLE_RATE
silence_threshold: float = DEFAULT_SILENCE_THRESHOLD
silence_duration: float = DEFAULT_SILENCE_DURATION
min_utterance: float = DEFAULT_MIN_UTTERANCE
max_utterance: float = DEFAULT_MAX_UTTERANCE
session_id: str = DEFAULT_SESSION_ID
# Set True to use macOS `say` instead of Piper
use_say_fallback: bool = False
# Piper speaking rate (default 1.0, lower = slower)
speaking_rate: float = 1.0
# Backend/model for Timmy inference
backend: str | None = None
model_size: str | None = None
class VoiceLoop:
"""Sovereign listen-think-speak loop.
Everything runs locally:
- STT: OpenAI Whisper (local model, no API)
- LLM: Timmy via Ollama (local inference)
- TTS: Piper (local ONNX model) or macOS `say`
"""
def __init__(self, config: VoiceConfig | None = None) -> None:
self.config = config or VoiceConfig()
self._whisper_model = None
self._running = False
self._speaking = False # True while TTS is playing
self._interrupted = False # set when user talks over TTS
# Persistent event loop — reused across all chat calls so Agno's
# MCP sessions don't die when the loop closes.
self._loop: asyncio.AbstractEventLoop | None = None
# ── Lazy initialization ─────────────────────────────────────────────
def _load_whisper(self):
"""Load Whisper model (lazy, first use only)."""
if self._whisper_model is not None:
return
import whisper
logger.info("Loading Whisper model: %s", self.config.whisper_model)
self._whisper_model = whisper.load_model(self.config.whisper_model)
logger.info("Whisper model loaded.")
def _ensure_piper(self) -> bool:
"""Check that Piper voice model exists."""
if self.config.use_say_fallback:
return True
voice_path = self.config.piper_voice
if not voice_path.exists():
logger.warning("Piper voice not found at %s — falling back to `say`", voice_path)
self.config.use_say_fallback = True
return True
return True
# ── STT: Microphone → Text ──────────────────────────────────────────
def _record_utterance(self) -> np.ndarray | None:
"""Record from microphone until silence is detected.
Uses energy-based Voice Activity Detection:
1. Wait for speech (RMS above threshold)
2. Record until silence (RMS below threshold for silence_duration)
3. Return the audio as a numpy array
Returns None if interrupted or no speech detected.
"""
import sounddevice as sd
sr = self.config.sample_rate
block_size = int(sr * 0.1) # 100ms blocks
silence_blocks = int(self.config.silence_duration / 0.1)
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
def _rms(block: np.ndarray) -> float:
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
with sd.InputStream(
samplerate=sr,
channels=DEFAULT_CHANNELS,
dtype="float32",
blocksize=block_size,
) as stream:
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
audio_chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
# End of utterance
if silent_count >= silence_blocks:
break
# Safety cap
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
if not audio_chunks or len(audio_chunks) < min_blocks:
return None
audio = np.concatenate(audio_chunks, axis=0).flatten()
duration = len(audio) / sr
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
def _transcribe(self, audio: np.ndarray) -> str:
"""Transcribe audio using local Whisper model."""
self._load_whisper()
sys.stdout.write(" 🧠 Transcribing...\r")
sys.stdout.flush()
t0 = time.monotonic()
result = self._whisper_model.transcribe(
audio,
language="en",
fp16=False, # MPS/CPU — fp16 can cause issues on some setups
)
elapsed = time.monotonic() - t0
text = result["text"].strip()
logger.info("Whisper transcribed in %.1fs: '%s'", elapsed, text[:80])
return text
# ── TTS: Text → Speaker ─────────────────────────────────────────────
def _speak(self, text: str) -> None:
"""Speak text aloud using Piper TTS or macOS `say`."""
if not text:
return
self._speaking = True
try:
if self.config.use_say_fallback:
self._speak_say(text)
else:
self._speak_piper(text)
finally:
self._speaking = False
def _speak_piper(self, text: str) -> None:
"""Speak using Piper TTS (local ONNX inference)."""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
# Generate WAV with Piper
cmd = [
"piper",
"--model",
str(self.config.piper_voice),
"--output_file",
tmp_path,
]
proc = subprocess.run(
cmd,
input=text,
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode != 0:
logger.error("Piper failed: %s", proc.stderr)
self._speak_say(text) # fallback
return
# Play with afplay (macOS) — interruptible
self._play_audio(tmp_path)
finally:
Path(tmp_path).unlink(missing_ok=True)
def _speak_say(self, text: str) -> None:
"""Speak using macOS `say` command."""
try:
proc = subprocess.Popen(
["say", "-r", "180", text],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
proc.kill()
except FileNotFoundError:
logger.error("macOS `say` command not found")
def _play_audio(self, path: str) -> None:
"""Play a WAV file. Can be interrupted by setting self._interrupted."""
try:
proc = subprocess.Popen(
["afplay", path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Poll so we can interrupt
while proc.poll() is None:
if self._interrupted:
proc.terminate()
self._interrupted = False
logger.info("TTS interrupted by user")
return
time.sleep(0.05)
except FileNotFoundError:
# Not macOS — try aplay (Linux)
try:
subprocess.run(["aplay", path], capture_output=True, timeout=60)
except (FileNotFoundError, subprocess.TimeoutExpired):
logger.error("No audio player found (tried afplay, aplay)")
# ── LLM: Text → Response ───────────────────────────────────────────
def _get_loop(self) -> asyncio.AbstractEventLoop:
"""Return a persistent event loop, creating one if needed.
A single loop is reused for the entire voice session so Agno's
MCP tool-server connections survive across turns.
"""
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
return self._loop
def _think(self, user_text: str) -> str:
"""Send text to Timmy and get a response."""
sys.stdout.write(" 💭 Thinking...\r")
sys.stdout.flush()
t0 = time.monotonic()
try:
loop = self._get_loop()
response = loop.run_until_complete(self._chat(user_text))
except Exception as exc:
logger.error("Timmy chat failed: %s", exc)
response = "I'm having trouble thinking right now. Could you try again?"
elapsed = time.monotonic() - t0
logger.info("Timmy responded in %.1fs", elapsed)
# Strip markdown so TTS doesn't read asterisks, bullets, etc.
response = _strip_markdown(response)
return response
async def _chat(self, message: str) -> str:
"""Async wrapper around Timmy's session.chat().
Prepends the voice-mode instruction so Timmy responds in
natural spoken language rather than markdown.
"""
from timmy.session import chat
voiced = f"{_VOICE_PREAMBLE}\n\nUser said: {message}"
return await chat(voiced, session_id=self.config.session_id)
# ── Main Loop ───────────────────────────────────────────────────────
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
# Suppress MCP / Agno stderr noise during voice mode.
_suppress_mcp_noise()
# Suppress MCP async-generator teardown tracebacks on exit.
_install_quiet_asyncgen_hooks()
tts_label = (
"macOS say"
if self.config.use_say_fallback
else f"Piper ({self.config.piper_voice.name})"
)
logger.info(
"\n" + "=" * 60 + "\n"
" 🎙️ Timmy Voice — Sovereign Voice Interface\n" + "=" * 60 + "\n"
f" STT: Whisper ({self.config.whisper_model})\n"
f" TTS: {tts_label}\n"
" LLM: Timmy (local Ollama)\n" + "=" * 60 + "\n"
" Speak naturally. Timmy will listen, think, and respond.\n"
" Press Ctrl-C to exit.\n" + "=" * 60
)
self._running = True
try:
while self._running:
# 1. LISTEN — record until silence
audio = self._record_utterance()
if audio is None:
continue
# 2. TRANSCRIBE — Whisper STT
text = self._transcribe(audio)
if not text or text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
):
# Whisper hallucinations on silence/noise
logger.debug("Ignoring likely Whisper hallucination: '%s'", text)
continue
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
# Exit commands
if text.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
):
logger.info("👋 Goodbye!")
break
# 3. THINK — send to Timmy
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
# 4. SPEAK — TTS output
self._speak(response)
except KeyboardInterrupt:
logger.info("👋 Voice loop stopped.")
finally:
self._running = False
self._cleanup_loop()
def _cleanup_loop(self) -> None:
"""Shut down the persistent event loop cleanly.
Agno's MCP stdio sessions leave async generators (stdio_client)
that complain loudly when torn down from a different task.
We swallow those errors — they're harmless, the subprocesses
die with the loop anyway.
"""
if self._loop is None or self._loop.is_closed():
return
# Silence "error during closing of asynchronous generator" warnings
# from MCP's anyio/asyncio cancel-scope teardown.
import warnings
self._loop.set_exception_handler(lambda loop, ctx: None)
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except Exception:
pass
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
try:
self._loop.close()
except Exception:
pass
self._loop = None
def stop(self) -> None:
"""Stop the voice loop (from another thread)."""
self._running = False
def _suppress_mcp_noise() -> None:
"""Quiet down noisy MCP/Agno loggers during voice mode.
Sets specific loggers to WARNING so the terminal stays clean
for the voice transcript.
"""
for name in (
"mcp",
"mcp.server",
"mcp.client",
"agno",
"agno.mcp",
"httpx",
"httpcore",
):
logging.getLogger(name).setLevel(logging.WARNING)
def _install_quiet_asyncgen_hooks() -> None:
"""Silence MCP stdio_client async-generator teardown noise.
When the voice loop exits, Python GC finalizes Agno's MCP
stdio_client async generators. anyio's cancel-scope teardown
prints ugly tracebacks to stderr. These are harmless — the
MCP subprocesses die with the loop. We intercept them here.
"""
_orig_hook = getattr(sys, "unraisablehook", None)
def _quiet_hook(args):
# Swallow RuntimeError from anyio cancel-scope teardown
# and BaseExceptionGroup from MCP stdio_client generators
if args.exc_type in (RuntimeError, BaseExceptionGroup):
msg = str(args.exc_value) if args.exc_value else ""
if "cancel scope" in msg or "unhandled errors" in msg:
return
# Also swallow GeneratorExit from stdio_client
if args.exc_type is GeneratorExit:
return
# Everything else: forward to original hook
if _orig_hook:
_orig_hook(args)
else:
sys.__unraisablehook__(args)
sys.unraisablehook = _quiet_hook

View File

@@ -55,13 +55,27 @@ os.environ["TIMMY_SKIP_EMBEDDINGS"] = "1"
@pytest.fixture(autouse=True)
def reset_message_log():
"""Clear the in-memory chat log before and after every test."""
from dashboard.store import message_log
def reset_message_log(tmp_path):
"""Redirect chat DB to temp dir and clear before/after every test."""
import dashboard.store as _store_mod
message_log.clear()
original_db_path = _store_mod.DB_PATH
tmp_chat_db = tmp_path / "chat.db"
_store_mod.DB_PATH = tmp_chat_db
# Close existing singleton connection and point it at tmp DB
_store_mod.message_log.close()
_store_mod.message_log._db_path = tmp_chat_db
_store_mod.message_log._conn = None
_store_mod.message_log.clear()
yield
message_log.clear()
_store_mod.message_log.clear()
_store_mod.message_log.close()
_store_mod.DB_PATH = original_db_path
_store_mod.message_log._db_path = original_db_path
_store_mod.message_log._conn = None
@pytest.fixture(autouse=True)

View File

@@ -0,0 +1,125 @@
"""Tests for SQLite-backed chat persistence (issue #46)."""
from dashboard.store import Message, MessageLog
def test_persistence_across_instances(tmp_path):
"""Messages survive creating a new MessageLog pointing at the same DB."""
db = tmp_path / "chat.db"
log1 = MessageLog(db_path=db)
log1.append(role="user", content="hello", timestamp="10:00:00", source="browser")
log1.append(role="agent", content="hi back", timestamp="10:00:01", source="browser")
log1.close()
# New instance — simulates server restart
log2 = MessageLog(db_path=db)
msgs = log2.all()
assert len(msgs) == 2
assert msgs[0].role == "user"
assert msgs[0].content == "hello"
assert msgs[1].role == "agent"
assert msgs[1].content == "hi back"
log2.close()
def test_retention_policy(tmp_path):
"""Oldest messages are pruned when count exceeds MAX_MESSAGES."""
import dashboard.store as store_mod
original_max = store_mod.MAX_MESSAGES
store_mod.MAX_MESSAGES = 5 # Small limit for testing
try:
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
for i in range(8):
log.append(role="user", content=f"msg-{i}", timestamp=f"10:00:{i:02d}")
assert len(log) == 5
msgs = log.all()
# Oldest 3 should have been pruned
assert msgs[0].content == "msg-3"
assert msgs[-1].content == "msg-7"
log.close()
finally:
store_mod.MAX_MESSAGES = original_max
def test_clear_removes_all(tmp_path):
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
log.append(role="user", content="data", timestamp="12:00:00")
assert len(log) == 1
log.clear()
assert len(log) == 0
assert log.all() == []
log.close()
def test_recent_returns_limited_newest(tmp_path):
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
for i in range(10):
log.append(role="user", content=f"msg-{i}", timestamp=f"10:00:{i:02d}")
recent = log.recent(limit=3)
assert len(recent) == 3
# Should be oldest-first within the window
assert recent[0].content == "msg-7"
assert recent[1].content == "msg-8"
assert recent[2].content == "msg-9"
log.close()
def test_source_field_persisted(tmp_path):
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
log.append(role="user", content="from api", timestamp="10:00:00", source="api")
log.append(role="user", content="from tg", timestamp="10:00:01", source="telegram")
log.close()
log2 = MessageLog(db_path=db)
msgs = log2.all()
assert msgs[0].source == "api"
assert msgs[1].source == "telegram"
log2.close()
def test_message_dataclass_defaults():
m = Message(role="user", content="hi", timestamp="12:00:00")
assert m.source == "browser"
def test_empty_db_returns_empty(tmp_path):
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
assert log.all() == []
assert len(log) == 0
assert log.recent() == []
log.close()
def test_concurrent_appends(tmp_path):
"""Multiple threads can append without corrupting data."""
import threading
db = tmp_path / "chat.db"
log = MessageLog(db_path=db)
errors = []
def writer(thread_id):
try:
for i in range(20):
log.append(role="user", content=f"t{thread_id}-{i}", timestamp="10:00:00")
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=writer, args=(t,)) for t in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
assert len(log) == 80
log.close()

View File

@@ -33,7 +33,8 @@ async def test_ollama_connection():
@pytest.mark.asyncio
async def test_model_fallback_chain():
"""Test that the model fallback chain works correctly."""
from timmy.agent import DEFAULT_MODEL_FALLBACKS, _resolve_model_with_fallback
from config import settings
from timmy.agent import _resolve_model_with_fallback
# Test with a non-existent model
model, is_fallback = _resolve_model_with_fallback(
@@ -46,7 +47,7 @@ async def test_model_fallback_chain():
# or the last resort (the requested model itself if nothing else is available).
# In tests, if no models are available in the mock environment, it might return the requested model.
if is_fallback:
assert model in DEFAULT_MODEL_FALLBACKS
assert model in settings.fallback_models
else:
# If no fallbacks were available, it returns the requested model as last resort
assert model == "nonexistent-model"

View File

@@ -1,6 +1,8 @@
"""Tests for infrastructure.error_capture module."""
import sqlite3
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_dedup_cache,
@@ -67,6 +69,21 @@ class TestIsDuplicate:
_is_duplicate("hash_1")
assert _is_duplicate("hash_2") is False
def test_stale_entries_pruned(self):
"""Old entries beyond 2x the dedup window should be pruned."""
from config import settings
window = settings.error_dedup_window_seconds
# Seed a stale entry far in the past
stale_time = datetime.now(UTC) - timedelta(seconds=window * 3)
_dedup_cache["stale_hash"] = stale_time
# Calling _is_duplicate on a new hash triggers pruning
_is_duplicate("fresh_hash")
assert "stale_hash" not in _dedup_cache
assert "fresh_hash" in _dedup_cache
def teardown_method(self):
_dedup_cache.clear()
@@ -82,6 +99,12 @@ class TestGetGitContext:
assert isinstance(ctx["branch"], str)
assert isinstance(ctx["commit"], str)
def test_git_context_fallback_on_failure(self):
"""When subprocess.run fails, returns 'unknown' for both fields."""
with patch("subprocess.run", side_effect=OSError("git not found")):
ctx = _get_git_context()
assert ctx == {"branch": "unknown", "commit": "unknown"}
class TestCaptureError:
"""Test the main capture_error function."""
@@ -100,6 +123,23 @@ class TestCaptureError:
result = capture_error(e, source="test")
assert result is None
def test_returns_none_when_feedback_disabled(self):
"""capture_error returns None immediately when error_feedback_enabled is False."""
_dedup_cache.clear()
import config
original = config.settings.error_feedback_enabled
try:
config.settings.error_feedback_enabled = False
try:
raise RuntimeError("disabled test")
except RuntimeError as e:
result = capture_error(e, source="test")
assert result is None
finally:
config.settings.error_feedback_enabled = original
def test_capture_does_not_crash_on_missing_deps(self):
"""capture_error should never crash even if optional deps are missing."""
_dedup_cache.clear()

View File

@@ -69,7 +69,7 @@ class TestGetImpactLevel:
def test_low_impact(self):
from integrations.chat_bridge.vendors.discord import _get_impact_level
assert _get_impact_level("web_search") == "low"
assert _get_impact_level("calculator") == "low"
assert _get_impact_level("unknown") == "low"
@@ -104,10 +104,10 @@ class TestToolSafety:
assert requires_confirmation("calculator") is False
def test_web_search_is_safe(self):
def test_memory_search_is_safe(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("web_search") is False
assert requires_confirmation("memory_search") is False
def test_unknown_tool_requires_confirmation(self):
from timmy.tool_safety import requires_confirmation

View File

@@ -302,3 +302,122 @@ def test_create_timmy_no_extra_kwargs():
f"Unknown Agent kwargs {invalid} — verify they exist in agno "
f"before adding to VALID_AGENT_KWARGS"
)
# ── skip_mcp flag (#72) ─────────────────────────────────────────────────────
def test_create_timmy_skip_mcp_omits_mcp_tools():
"""create_timmy(skip_mcp=True) must not add MCP tool servers."""
with (
patch("timmy.agent.Agent"),
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea_mcp,
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs_mcp,
):
from timmy.agent import create_timmy
create_timmy(skip_mcp=True)
# MCP factory functions should never be called
mock_gitea_mcp.assert_not_called()
mock_fs_mcp.assert_not_called()
def test_create_timmy_default_includes_mcp_tools():
"""create_timmy() without skip_mcp should attempt MCP tool creation."""
with (
patch("timmy.agent.Agent"),
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea_mcp,
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs_mcp,
):
from timmy.agent import create_timmy
create_timmy(skip_mcp=False)
# MCP factories should be called when skip_mcp is False
mock_gitea_mcp.assert_called_once()
mock_fs_mcp.assert_called_once()
# ── Configurable fallback chain tests ────────────────────────────────────────
def test_settings_has_fallback_model_lists():
"""settings.fallback_models and vision_fallback_models exist and are lists."""
from config import settings
assert isinstance(settings.fallback_models, list)
assert isinstance(settings.vision_fallback_models, list)
assert len(settings.fallback_models) > 0
assert len(settings.vision_fallback_models) > 0
def test_resolve_model_uses_configurable_text_fallback():
"""_resolve_model_with_fallback walks settings.fallback_models for text models."""
with patch("timmy.agent.settings") as mock_settings:
mock_settings.ollama_model = "nonexistent-model"
mock_settings.fallback_models = ["custom-a", "custom-b"]
mock_settings.vision_fallback_models = ["vision-a"]
# First model in chain is available
with patch("timmy.agent._check_model_available", side_effect=lambda m: m == "custom-a"):
from timmy.agent import _resolve_model_with_fallback
model, is_fallback = _resolve_model_with_fallback(
requested_model="nonexistent-model",
require_vision=False,
auto_pull=False,
)
assert model == "custom-a"
assert is_fallback is True
def test_resolve_model_uses_configurable_vision_fallback():
"""_resolve_model_with_fallback walks settings.vision_fallback_models for vision."""
with patch("timmy.agent.settings") as mock_settings:
mock_settings.ollama_model = "nonexistent-model"
mock_settings.fallback_models = ["text-a"]
mock_settings.vision_fallback_models = ["vision-x", "vision-y"]
with patch("timmy.agent._check_model_available", side_effect=lambda m: m == "vision-y"):
from timmy.agent import _resolve_model_with_fallback
model, is_fallback = _resolve_model_with_fallback(
requested_model="nonexistent-model",
require_vision=True,
auto_pull=False,
)
assert model == "vision-y"
assert is_fallback is True
def test_get_effective_ollama_model_walks_fallback_chain():
"""get_effective_ollama_model uses settings.fallback_models."""
with (
patch("config.settings") as mock_settings,
patch("config.check_ollama_model_available", side_effect=lambda m: m == "fb-2") as _,
):
mock_settings.ollama_model = "gone-model"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.fallback_models = ["fb-1", "fb-2", "fb-3"]
from config import get_effective_ollama_model
result = get_effective_ollama_model()
assert result == "fb-2"
def test_no_hardcoded_fallback_constants_in_agent():
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
import timmy.agent as agent_mod
assert not hasattr(agent_mod, "DEFAULT_MODEL_FALLBACKS"), (
"Hardcoded DEFAULT_MODEL_FALLBACKS still exists — use settings.fallback_models"
)
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
)

View File

@@ -177,8 +177,11 @@ def test_handle_tool_confirmation_approve():
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
# Simulate user typing "y" at the prompt
with patch("timmy.cli.typer.confirm", return_value=True):
# Simulate user typing "y" at the prompt (mock interactive terminal)
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=True),
):
result = _handle_tool_confirmation(mock_agent, paused_run, "cli")
mock_req.confirm.assert_called_once()
@@ -198,7 +201,10 @@ def test_handle_tool_confirmation_reject():
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.typer.confirm", return_value=False):
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=False),
):
_handle_tool_confirmation(mock_agent, paused_run, "cli")
mock_req.reject.assert_called_once()
@@ -225,8 +231,49 @@ def test_handle_tool_confirmation_continue_error():
mock_agent = MagicMock()
mock_agent.continue_run.side_effect = Exception("connection lost")
with patch("timmy.cli.typer.confirm", return_value=True):
with (
patch("timmy.cli._is_interactive", return_value=True),
patch("timmy.cli.typer.confirm", return_value=True),
):
result = _handle_tool_confirmation(mock_agent, paused_run, "cli")
# Should return the original paused run, not crash
assert result is paused_run
def test_handle_tool_confirmation_autonomous_allowlisted():
"""In autonomous mode, allowlisted tools should be auto-approved."""
paused_run, mock_req = _make_paused_run(
tool_name="shell", tool_args={"command": "pytest tests/ -x"}
)
completed_run = MagicMock()
completed_run.status = "COMPLETED"
completed_run.active_requirements = []
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.is_allowlisted", return_value=True):
_handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True)
mock_req.confirm.assert_called_once()
mock_req.reject.assert_not_called()
def test_handle_tool_confirmation_autonomous_not_allowlisted():
"""In autonomous mode, non-allowlisted tools should be auto-rejected."""
paused_run, mock_req = _make_paused_run(tool_name="shell", tool_args={"command": "rm -rf /"})
completed_run = MagicMock()
completed_run.status = "COMPLETED"
completed_run.active_requirements = []
mock_agent = MagicMock()
mock_agent.continue_run.return_value = completed_run
with patch("timmy.cli.is_allowlisted", return_value=False):
_handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True)
mock_req.reject.assert_called_once()
mock_req.confirm.assert_not_called()

View File

@@ -1,5 +1,9 @@
"""Tests for system introspection tools."""
from unittest.mock import MagicMock, patch
import httpx
def test_get_system_info_returns_dict():
"""System info should return a dictionary."""
@@ -15,15 +19,17 @@ def test_get_system_info_returns_dict():
def test_get_system_info_contains_model():
"""System info should include model name."""
from config import settings
"""System info should include a model name (may differ from config if
the actual running model is different — see issue #77)."""
from timmy.tools_intro import get_system_info
info = get_system_info()
assert "model" in info
# Model should come from settings
assert info["model"] == settings.ollama_model
# Model should be a non-empty string — exact value depends on what
# Ollama has loaded (verified by TestGetOllamaModelExactMatch tests)
assert isinstance(info["model"], str)
assert len(info["model"]) > 0
def test_get_system_info_contains_repo_root():
@@ -59,3 +65,96 @@ def test_get_memory_status_returns_dict():
assert isinstance(status, dict)
assert "tier1_hot_memory" in status
assert "tier2_vault" in status
# --- _get_ollama_model exact-match tests (issue #77) ---
def _mock_response(json_data, status_code=200):
"""Create a mock httpx response."""
resp = MagicMock(spec=httpx.Response)
resp.status_code = status_code
resp.json.return_value = json_data
return resp
class TestGetOllamaModelExactMatch:
"""Ensure _get_ollama_model uses exact match, not prefix match."""
@patch("timmy.tools_intro.httpx.get")
def test_exact_match_from_ps(self, mock_get):
"""Should return exact model from /api/ps."""
from timmy.tools_intro import _get_ollama_model
ps_resp = _mock_response({"models": [{"name": "qwen3:30b"}]})
mock_get.return_value = ps_resp
with patch("config.settings") as mock_settings:
mock_settings.ollama_model = "qwen3:30b"
mock_settings.ollama_url = "http://localhost:11434"
result = _get_ollama_model()
assert result == "qwen3:30b"
@patch("timmy.tools_intro.httpx.get")
def test_prefix_collision_returns_correct_model(self, mock_get):
"""qwen3:30b configured — must NOT match qwen3.5:latest (prefix bug)."""
from timmy.tools_intro import _get_ollama_model
# /api/ps has both models loaded; configured is qwen3:30b
ps_resp = _mock_response({"models": [{"name": "qwen3.5:latest"}, {"name": "qwen3:30b"}]})
mock_get.return_value = ps_resp
with patch("config.settings") as mock_settings:
mock_settings.ollama_model = "qwen3:30b"
mock_settings.ollama_url = "http://localhost:11434"
result = _get_ollama_model()
assert result == "qwen3:30b", f"Got '{result}' — prefix collision bug!"
@patch("timmy.tools_intro.httpx.get")
def test_configured_model_not_running_returns_actual(self, mock_get):
"""If configured model isn't loaded, report what IS running."""
from timmy.tools_intro import _get_ollama_model
ps_resp = _mock_response({"models": [{"name": "qwen3.5:latest"}]})
mock_get.return_value = ps_resp
with patch("config.settings") as mock_settings:
mock_settings.ollama_model = "qwen3:30b"
mock_settings.ollama_url = "http://localhost:11434"
result = _get_ollama_model()
# Should report actual running model, not configured one
assert result == "qwen3.5:latest"
@patch("timmy.tools_intro.httpx.get")
def test_latest_suffix_match(self, mock_get):
"""'qwen3:30b' config should match 'qwen3:30b:latest' from API."""
from timmy.tools_intro import _get_ollama_model
ps_resp = _mock_response({"models": []})
tags_resp = _mock_response({"models": [{"name": "qwen3:30b:latest"}]})
mock_get.side_effect = [ps_resp, tags_resp]
with patch("config.settings") as mock_settings:
mock_settings.ollama_model = "qwen3:30b"
mock_settings.ollama_url = "http://localhost:11434"
result = _get_ollama_model()
# Falls back to configured since no exact match
assert result == "qwen3:30b"
@patch("timmy.tools_intro.httpx.get")
def test_ollama_down_returns_configured(self, mock_get):
"""If Ollama is unreachable, return configured model."""
from timmy.tools_intro import _get_ollama_model
mock_get.side_effect = httpx.ConnectError("connection refused")
with patch("config.settings") as mock_settings:
mock_settings.ollama_model = "qwen3:30b"
mock_settings.ollama_url = "http://localhost:11434"
result = _get_ollama_model()
assert result == "qwen3:30b"

View File

@@ -41,3 +41,40 @@ def test_get_system_prompt_injects_model_name():
# Should contain the model name from settings, not the placeholder
assert "{model_name}" not in prompt
assert "llama3.1" in prompt or "qwen" in prompt
def test_full_prompt_brevity_first():
"""Full prompt should front-load brevity instructions before other content."""
prompt = get_system_prompt(tools_enabled=True)
brevity_pos = prompt.find("BREVITY")
tool_pos = prompt.find("TOOL USAGE")
memory_pos = prompt.find("MEMORY")
# Brevity section must appear before tools and memory
assert brevity_pos != -1, "Full prompt must contain BREVITY section"
assert brevity_pos < tool_pos, "Brevity must come before tool usage"
assert brevity_pos < memory_pos, "Brevity must come before memory"
def test_full_prompt_no_markdown_headers():
"""Full prompt should not use markdown headers (## / ###) that teach
the model to respond in markdown."""
prompt = get_system_prompt(tools_enabled=True)
for line in prompt.splitlines():
stripped = line.strip()
assert not stripped.startswith("## "), f"Full prompt uses markdown header: {stripped!r}"
assert not stripped.startswith("### "), (
f"Full prompt uses markdown sub-header: {stripped!r}"
)
def test_full_prompt_plain_text_brevity():
"""Full prompt should explicitly instruct plain text output."""
prompt = get_system_prompt(tools_enabled=True).lower()
assert "plain text" in prompt
def test_lite_prompt_brevity():
"""Lite prompt should also instruct brevity."""
prompt = get_system_prompt(tools_enabled=False).lower()
assert "brief" in prompt
assert "plain text" in prompt or "not markdown" in prompt

View File

@@ -588,6 +588,93 @@ def test_thinking_prompt_anti_confabulation():
)
# ---------------------------------------------------------------------------
# Semantic dedup (anti-rumination)
# ---------------------------------------------------------------------------
def test_is_too_similar_exact_match(tmp_path):
"""Identical thoughts should be detected as too similar."""
engine = _make_engine(tmp_path)
t1 = engine._store_thought("The swarm is quiet today.", "observation")
assert engine._is_too_similar("The swarm is quiet today.", [t1])
def test_is_too_similar_near_match(tmp_path):
"""Minor variations of the same thought should be caught."""
engine = _make_engine(tmp_path)
t1 = engine._store_thought("The swarm is quiet today, nothing happening.", "observation")
assert engine._is_too_similar("The swarm is quiet today. Nothing is happening.", [t1])
def test_is_too_similar_different_thought(tmp_path):
"""Genuinely different thoughts should pass the check."""
engine = _make_engine(tmp_path)
t1 = engine._store_thought("The swarm is quiet today.", "observation")
assert not engine._is_too_similar(
"Alexander's preference for YAML config reflects a deep design philosophy.", [t1]
)
def test_is_too_similar_empty_recent(tmp_path):
"""No recent thoughts means nothing to match against."""
engine = _make_engine(tmp_path)
assert not engine._is_too_similar("Any thought at all.", [])
@pytest.mark.asyncio
async def test_think_once_retries_on_similar(tmp_path):
"""think_once should retry with a new seed when the thought is too similar."""
engine = _make_engine(tmp_path)
# Seed with an existing thought
engine._store_thought("Still no chat messages from Alexander.", "observation")
call_count = 0
def agent_side_effect(prompt):
nonlocal call_count
call_count += 1
if call_count == 1:
return "Still no chat messages from Alexander today." # too similar
return "The sovereignty model provides independence from cloud dependencies." # novel
with (
patch.object(engine, "_call_agent", side_effect=agent_side_effect),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
):
thought = await engine.think_once()
assert thought is not None
assert "sovereignty" in thought.content.lower()
assert call_count == 2 # Had to retry once
@pytest.mark.asyncio
async def test_think_once_discards_after_max_retries(tmp_path):
"""think_once should discard and return None after all retries produce similar thoughts."""
engine = _make_engine(tmp_path)
engine._store_thought("Still no chat messages from Alexander.", "observation")
def always_similar(prompt):
return "Still no chat messages from Alexander today."
with (
patch.object(engine, "_call_agent", side_effect=always_similar),
patch.object(engine, "_log_event"),
patch.object(engine, "_update_memory"),
patch.object(engine, "_broadcast", new_callable=AsyncMock),
):
thought = await engine.think_once()
assert thought is None
# Only the seed thought should be stored, not the rejected ones
assert engine.count_thoughts() == 1
def test_thinking_prompt_brevity_limit():
"""_THINKING_PROMPT must enforce a 2-3 sentence limit."""
from timmy.thinking import _THINKING_PROMPT
@@ -755,3 +842,46 @@ def test_thinking_chain_api_404(client):
"""GET /thinking/api/{bad_id}/chain should return 404."""
response = client.get("/thinking/api/nonexistent/chain")
assert response.status_code == 404
# ---------------------------------------------------------------------------
# _call_agent uses skip_mcp=True (#72)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_call_agent_uses_skip_mcp(tmp_path):
"""_call_agent must create_timmy(skip_mcp=True) to avoid cancel-scope errors."""
engine = _make_engine(tmp_path)
mock_agent = AsyncMock()
mock_run = AsyncMock()
mock_run.content = "thought output"
mock_agent.arun.return_value = mock_run
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_factory:
result = await engine._call_agent("test prompt")
mock_factory.assert_called_once_with(skip_mcp=True)
mock_agent.arun.assert_awaited_once_with("test prompt", stream=False)
assert result == "thought output"
@pytest.mark.asyncio
async def test_call_agent_does_not_use_session_chat(tmp_path):
"""_call_agent should NOT go through session.chat() (which uses the singleton
with MCP tools). It creates its own agent directly."""
engine = _make_engine(tmp_path)
mock_agent = AsyncMock()
mock_run = AsyncMock()
mock_run.content = "direct agent"
mock_agent.arun.return_value = mock_run
with (
patch("timmy.agent.create_timmy", return_value=mock_agent),
patch("timmy.session.chat", new_callable=AsyncMock) as mock_session_chat,
):
await engine._call_agent("prompt")
mock_session_chat.assert_not_awaited()

View File

@@ -29,7 +29,7 @@ def clear_usage():
class TestToolTracking:
def test_track_creates_agent_entry(self):
_track_tool_usage("agent-1", "web_search", success=True)
_track_tool_usage("agent-1", "calculator", success=True)
assert "agent-1" in _TOOL_USAGE
assert len(_TOOL_USAGE["agent-1"]) == 1
@@ -132,7 +132,6 @@ class TestToolCatalog:
def test_catalog_contains_base_tools(self):
catalog = get_all_available_tools()
base_tools = {
"web_search",
"shell",
"python",
"read_file",
@@ -141,6 +140,8 @@ class TestToolCatalog:
}
for tool_id in base_tools:
assert tool_id in catalog, f"Missing base tool: {tool_id}"
# web_search removed — dead code, ddgs never installed (#87)
assert "web_search" not in catalog
def test_catalog_tool_structure(self):
catalog = get_all_available_tools()
@@ -153,7 +154,6 @@ class TestToolCatalog:
def test_catalog_orchestrator_has_all_base_tools(self):
catalog = get_all_available_tools()
base_tools = {
"web_search",
"shell",
"python",
"read_file",
@@ -167,7 +167,6 @@ class TestToolCatalog:
def test_catalog_echo_research_tools(self):
catalog = get_all_available_tools()
assert "echo" in catalog["web_search"]["available_in"]
assert "echo" in catalog["read_file"]["available_in"]
# Echo should NOT have shell
assert "echo" not in catalog["shell"]["available_in"]
@@ -194,3 +193,38 @@ class TestAiderTool:
catalog = get_all_available_tools()
assert "aider" in catalog
assert "forge" in catalog["aider"]["available_in"]
class TestFullToolkitConfirmationWarning:
"""Regression tests for issue #79 — confirmation tool WARNING spam."""
def test_create_full_toolkit_no_confirmation_warning(self, caplog):
"""create_full_toolkit should not emit 'Requires confirmation tool(s)' warnings.
Agno's Toolkit.__init__ validates requires_confirmation_tools against the
initial (empty) tool list. We set the attribute *after* construction to
avoid the spurious warning while keeping per-tool confirmation checks.
"""
import logging
from timmy.tools import create_full_toolkit
with caplog.at_level(logging.WARNING):
create_full_toolkit()
warning_msgs = [
r.message for r in caplog.records if "Requires confirmation tool" in r.message
]
assert warning_msgs == [], f"Unexpected confirmation warnings: {warning_msgs}"
def test_dangerous_tools_listed_for_confirmation(self):
"""After the fix, the toolkit still carries the full DANGEROUS_TOOLS list
so Agno can gate execution at runtime."""
from timmy.tool_safety import DANGEROUS_TOOLS
from timmy.tools import create_full_toolkit
toolkit = create_full_toolkit()
if toolkit is None:
pytest.skip("Agno tools not available")
assert set(toolkit.requires_confirmation_tools) == set(DANGEROUS_TOOLS)

View File

@@ -1,9 +1,18 @@
"""Tests for timmy.tool_safety — classification, extraction, and formatting."""
"""Tests for timmy.tool_safety — classification, extraction, formatting, and allowlist."""
from pathlib import Path
from unittest.mock import patch
import pytest
from timmy.tool_safety import (
_check_shell_allowlist,
_check_write_file_allowlist,
extract_tool_calls,
format_action_description,
get_impact_level,
is_allowlisted,
reload_allowlist,
requires_confirmation,
)
@@ -18,7 +27,7 @@ class TestRequiresConfirmation:
assert requires_confirmation(tool) is True
def test_safe_tools(self):
for tool in ("web_search", "calculator", "read_file", "list_files"):
for tool in ("calculator", "read_file", "list_files"):
assert requires_confirmation(tool) is False
def test_unknown_defaults_to_dangerous(self):
@@ -109,5 +118,208 @@ class TestGetImpactLevel:
assert get_impact_level("aider") == "medium"
def test_low(self):
assert get_impact_level("web_search") == "low"
assert get_impact_level("calculator") == "low"
assert get_impact_level("unknown") == "low"
# ---------------------------------------------------------------------------
# Allowlist — is_allowlisted
# ---------------------------------------------------------------------------
# Sample allowlist for tests
_TEST_ALLOWLIST = {
"shell": {
"allow_prefixes": [
"pytest",
"python -m pytest",
"git status",
"git log",
"git diff",
"git add",
"git commit",
"git push",
"curl http://localhost",
"curl -s http://localhost",
"ls",
"cat ",
],
"deny_patterns": [
"rm -rf /",
"sudo ",
"> /dev/",
"| sh",
"| bash",
],
},
"write_file": {
"allowed_path_prefixes": [
"/tmp/",
],
},
"python": {"auto_approve": True},
"plan_and_execute": {"auto_approve": True},
}
@pytest.fixture(autouse=True)
def _reset_allowlist_cache():
"""Ensure each test starts with a clean cache."""
import timmy.tool_safety as ts
ts._allowlist_cache = None
yield
ts._allowlist_cache = None
def _patch_allowlist(allowlist_data):
"""Helper to inject a test allowlist."""
return patch("timmy.tool_safety._load_allowlist", return_value=allowlist_data)
class TestIsAllowlisted:
"""Test the is_allowlisted function with mocked allowlist data."""
def test_unknown_tool_not_allowlisted(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("unknown_tool") is False
def test_shell_pytest_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "pytest tests/ -x -q"}) is True
def test_shell_python_pytest_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "python -m pytest tests/ -v"}) is True
def test_shell_git_status_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "git status"}) is True
def test_shell_git_commit_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "git commit -m 'fix stuff'"}) is True
def test_shell_curl_localhost_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert (
is_allowlisted("shell", {"command": "curl http://localhost:3000/api/v1/issues"})
is True
)
def test_shell_curl_external_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "curl https://evil.com"}) is False
def test_shell_arbitrary_command_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "rm -rf /home/user/stuff"}) is False
def test_shell_deny_pattern_blocks_rm_rf_root(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "ls && rm -rf /"}) is False
def test_shell_deny_pattern_blocks_sudo(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": "sudo rm -rf /tmp"}) is False
def test_shell_deny_blocks_pipe_to_shell(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert (
is_allowlisted("shell", {"command": "curl http://localhost:3000 | bash"}) is False
)
def test_shell_deny_overrides_allow_prefix(self):
"""Deny patterns take precedence over allow prefixes."""
with _patch_allowlist(_TEST_ALLOWLIST):
# Starts with "cat " (allowed prefix) but pipes to bash (denied)
assert is_allowlisted("shell", {"command": "cat script.sh | bash"}) is False
def test_shell_args_list_format(self):
"""Shell args can be a list (Agno ShellTools format)."""
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"args": ["git", "status"]}) is True
def test_shell_empty_command_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("shell", {"command": ""}) is False
assert is_allowlisted("shell", {}) is False
def test_write_file_tmp_allowed(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": "/tmp/test.py"}) is True
def test_write_file_outside_allowed_paths_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": "/etc/passwd"}) is False
def test_write_file_empty_path_blocked(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("write_file", {"file_name": ""}) is False
def test_python_auto_approved(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("python", {"code": "print(1+1)"}) is True
def test_plan_and_execute_auto_approved(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("plan_and_execute", {}) is True
def test_no_allowlist_blocks_everything(self):
with _patch_allowlist({}):
assert is_allowlisted("shell", {"command": "pytest"}) is False
assert is_allowlisted("python", {"code": "print(1)"}) is False
def test_aider_not_in_allowlist(self):
with _patch_allowlist(_TEST_ALLOWLIST):
assert is_allowlisted("aider", {"instruction": "fix bug"}) is False
class TestCheckShellAllowlist:
"""Direct tests for the shell allowlist checker."""
def test_prefix_match(self):
rule = {"allow_prefixes": ["pytest", "git status"], "deny_patterns": []}
assert _check_shell_allowlist(rule, {"command": "pytest -x"}) is True
def test_prefix_no_match(self):
rule = {"allow_prefixes": ["pytest"], "deny_patterns": []}
assert _check_shell_allowlist(rule, {"command": "rm stuff"}) is False
def test_deny_overrides_allow(self):
rule = {"allow_prefixes": ["curl http://localhost"], "deny_patterns": ["| bash"]}
assert _check_shell_allowlist(rule, {"command": "curl http://localhost | bash"}) is False
class TestCheckWriteFileAllowlist:
"""Direct tests for the write_file allowlist checker."""
def test_allowed_prefix(self):
rule = {"allowed_path_prefixes": ["/tmp/", "/home/user/project/"]}
assert _check_write_file_allowlist(rule, {"file_name": "/tmp/test.py"}) is True
def test_blocked_path(self):
rule = {"allowed_path_prefixes": ["/tmp/"]}
assert _check_write_file_allowlist(rule, {"file_name": "/etc/secrets"}) is False
def test_tilde_expansion(self):
"""Paths starting with ~ should be expanded."""
home = str(Path.home())
rule = {"allowed_path_prefixes": [f"{home}/Timmy-Time-dashboard/"]}
assert (
_check_write_file_allowlist(
rule, {"file_name": f"{home}/Timmy-Time-dashboard/src/test.py"}
)
is True
)
class TestReloadAllowlist:
"""Test that reload_allowlist clears the cache."""
def test_reload_clears_cache(self):
import timmy.tool_safety as ts
ts._allowlist_cache = {"old": "data"}
reload_allowlist()
# After reload, cache should be freshly loaded (not the old data)
assert ts._allowlist_cache != {"old": "data"}

View File

@@ -0,0 +1,169 @@
"""Tests for the safe calculator tool (issue #52)."""
from __future__ import annotations
import math
from timmy.tools import calculator
# ── Basic arithmetic ──────────────────────────────────────────────
class TestBasicArithmetic:
def test_addition(self):
assert calculator("2 + 3") == "5"
def test_subtraction(self):
assert calculator("10 - 4") == "6"
def test_multiplication(self):
assert calculator("347 * 829") == str(347 * 829)
def test_division(self):
assert calculator("10 / 3") == str(10 / 3)
def test_floor_division(self):
assert calculator("10 // 3") == "3"
def test_modulo(self):
assert calculator("10 % 3") == "1"
def test_exponent(self):
assert calculator("2**10") == "1024"
def test_negative_number(self):
assert calculator("-5 + 3") == "-2"
def test_unary_plus(self):
assert calculator("+5") == "5"
# ── Parentheses and precedence ────────────────────────────────────
class TestPrecedence:
def test_nested_parens(self):
assert calculator("(2 + 3) * (4 + 1)") == "25"
def test_deep_nesting(self):
assert calculator("((1 + 2) * (3 + 4)) + 5") == "26"
def test_operator_precedence(self):
assert calculator("2 + 3 * 4") == "14"
# ── Math module functions ─────────────────────────────────────────
class TestMathFunctions:
def test_sqrt(self):
assert calculator("math.sqrt(144)") == "12.0"
def test_log(self):
assert calculator("math.log(100, 10)") == str(math.log(100, 10))
def test_sin(self):
assert calculator("math.sin(0)") == "0.0"
def test_pi(self):
assert calculator("math.pi") == str(math.pi)
def test_e(self):
assert calculator("math.e") == str(math.e)
def test_ceil(self):
assert calculator("math.ceil(4.3)") == "5"
def test_floor(self):
assert calculator("math.floor(4.7)") == "4"
def test_bare_sqrt(self):
assert calculator("sqrt(16)") == "4.0"
# ── Allowed builtins ──────────────────────────────────────────────
class TestAllowedBuiltins:
def test_abs(self):
assert calculator("abs(-42)") == "42"
def test_round(self):
assert calculator("round(3.14159, 2)") == "3.14"
def test_min(self):
assert calculator("min(3, 1, 2)") == "1"
def test_max(self):
assert calculator("max(3, 1, 2)") == "3"
# ── Error handling ────────────────────────────────────────────────
class TestErrorHandling:
def test_division_by_zero(self):
result = calculator("1 / 0")
assert "Error" in result
def test_syntax_error(self):
result = calculator("2 +")
assert "Error" in result
def test_empty_expression(self):
result = calculator("")
assert "Error" in result
# ── Injection attempts (the whole point of issue #52) ─────────────
class TestInjectionPrevention:
def test_import_os(self):
result = calculator("__import__('os').system('echo hacked')")
assert "Error" in result
assert "Unknown name" in result or "Unsupported" in result
def test_builtins_access(self):
result = calculator("__builtins__")
assert "Error" in result
def test_dunder_class(self):
result = calculator("().__class__.__bases__[0].__subclasses__()")
assert "Error" in result
def test_exec(self):
result = calculator("exec('import os')")
assert "Error" in result
def test_eval_nested(self):
result = calculator("eval('1+1')")
assert "Error" in result
def test_open_file(self):
result = calculator("open('/etc/passwd').read()")
assert "Error" in result
def test_string_literal_rejected(self):
result = calculator("'hello'")
assert "Error" in result
def test_list_comprehension(self):
result = calculator("[x for x in range(10)]")
assert "Error" in result
def test_lambda(self):
result = calculator("(lambda: 1)()")
assert "Error" in result
def test_attribute_on_non_math(self):
result = calculator("(1).__class__")
assert "Error" in result
def test_globals(self):
result = calculator("globals()")
assert "Error" in result
def test_breakout_via_format(self):
result = calculator("'{}'.format.__globals__")
assert "Error" in result

View File

@@ -0,0 +1,335 @@
"""Tests for the sovereign voice loop.
These tests verify the VoiceLoop components without requiring a microphone,
Whisper model, or Piper installation — all I/O is mocked.
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
try:
import numpy as np
except ImportError:
np = None
try:
from timmy.voice_loop import VoiceConfig, VoiceLoop, _strip_markdown
except ImportError:
pass # pytestmark will skip all tests anyway
pytestmark = pytest.mark.skipif(np is None, reason="numpy not installed")
# ── VoiceConfig tests ──────────────────────────────────────────────────────
class TestVoiceConfig:
def test_defaults(self):
cfg = VoiceConfig()
assert cfg.whisper_model == "base.en"
assert cfg.sample_rate == 16000
assert cfg.silence_threshold == 0.015
assert cfg.silence_duration == 1.5
assert cfg.min_utterance == 0.5
assert cfg.max_utterance == 30.0
assert cfg.session_id == "voice"
assert cfg.use_say_fallback is False
def test_custom_values(self):
cfg = VoiceConfig(
whisper_model="tiny.en",
silence_threshold=0.02,
session_id="custom",
use_say_fallback=True,
)
assert cfg.whisper_model == "tiny.en"
assert cfg.silence_threshold == 0.02
assert cfg.session_id == "custom"
assert cfg.use_say_fallback is True
# ── VoiceLoop unit tests ──────────────────────────────────────────────────
class TestVoiceLoopInit:
def test_default_config(self):
loop = VoiceLoop()
assert loop.config.whisper_model == "base.en"
assert loop._running is False
assert loop._speaking is False
def test_custom_config(self):
cfg = VoiceConfig(whisper_model="tiny.en")
loop = VoiceLoop(config=cfg)
assert loop.config.whisper_model == "tiny.en"
class TestPiperFallback:
def test_falls_back_to_say_when_no_voice_file(self):
cfg = VoiceConfig(piper_voice=Path("/nonexistent/voice.onnx"))
loop = VoiceLoop(config=cfg)
loop._ensure_piper()
assert loop.config.use_say_fallback is True
def test_keeps_piper_when_voice_exists(self, tmp_path):
voice_file = tmp_path / "test.onnx"
voice_file.write_bytes(b"fake model")
cfg = VoiceConfig(piper_voice=voice_file)
loop = VoiceLoop(config=cfg)
loop._ensure_piper()
assert loop.config.use_say_fallback is False
class TestTranscribe:
def test_transcribes_audio(self):
"""Whisper transcription returns cleaned text."""
loop = VoiceLoop()
mock_model = MagicMock()
mock_model.transcribe.return_value = {"text": " Hello Timmy "}
loop._whisper_model = mock_model
audio = np.random.randn(16000).astype(np.float32)
result = loop._transcribe(audio)
assert result == "Hello Timmy"
mock_model.transcribe.assert_called_once()
def test_transcribes_empty_returns_empty(self):
loop = VoiceLoop()
mock_model = MagicMock()
mock_model.transcribe.return_value = {"text": " "}
loop._whisper_model = mock_model
audio = np.random.randn(16000).astype(np.float32)
result = loop._transcribe(audio)
assert result == ""
class TestStripMarkdown:
def test_strips_bold(self):
assert _strip_markdown("**hello**") == "hello"
def test_strips_italic(self):
assert _strip_markdown("*hello*") == "hello"
def test_strips_headers(self):
assert _strip_markdown("## Header\ntext") == "Header\ntext"
def test_strips_bullets(self):
assert _strip_markdown("- item one\n- item two") == "item one\nitem two"
def test_strips_numbered_lists(self):
assert _strip_markdown("1. first\n2. second") == "first\nsecond"
def test_strips_inline_code(self):
assert _strip_markdown("use `pip install`") == "use pip install"
def test_strips_links(self):
assert _strip_markdown("[click here](https://x.com)") == "click here"
def test_preserves_plain_text(self):
assert _strip_markdown("Hello, how are you?") == "Hello, how are you?"
def test_empty_string(self):
assert _strip_markdown("") == ""
def test_none_passthrough(self):
assert _strip_markdown(None) is None
def test_complex_markdown(self):
md = "**1. First** thing\n- use `code`\n*emphasis*"
result = _strip_markdown(md)
assert "**" not in result
assert "`" not in result
assert "*" not in result
class TestThink:
def test_think_returns_response(self):
loop = VoiceLoop()
loop._loop = MagicMock()
loop._loop.is_closed.return_value = False
loop._loop.run_until_complete.return_value = "I am Timmy."
result = loop._think("Who are you?")
assert result == "I am Timmy."
def test_think_handles_error(self):
loop = VoiceLoop()
loop._loop = MagicMock()
loop._loop.is_closed.return_value = False
loop._loop.run_until_complete.side_effect = RuntimeError("Ollama down")
result = loop._think("test")
assert "trouble" in result.lower()
def test_think_strips_markdown(self):
loop = VoiceLoop()
loop._loop = MagicMock()
loop._loop.is_closed.return_value = False
loop._loop.run_until_complete.return_value = "**Hello** from *Timmy*"
result = loop._think("test")
assert "**" not in result
assert "*" not in result
assert "Hello" in result
class TestSpeakSay:
@patch("subprocess.Popen")
def test_speak_say_calls_subprocess(self, mock_popen):
mock_proc = MagicMock()
mock_proc.wait.return_value = 0
mock_popen.return_value = mock_proc
cfg = VoiceConfig(use_say_fallback=True)
loop = VoiceLoop(config=cfg)
loop._speak_say("Hello")
mock_popen.assert_called_once()
args = mock_popen.call_args[0][0]
assert args[0] == "say"
assert "Hello" in args
@patch("subprocess.Popen", side_effect=FileNotFoundError)
def test_speak_say_handles_missing(self, mock_popen):
cfg = VoiceConfig(use_say_fallback=True)
loop = VoiceLoop(config=cfg)
# Should not raise
loop._speak_say("Hello")
class TestSpeakPiper:
@patch("timmy.voice_loop.VoiceLoop._play_audio")
@patch("subprocess.run")
def test_speak_piper_generates_and_plays(self, mock_run, mock_play):
mock_run.return_value = MagicMock(returncode=0, stderr="")
voice_path = Path("/tmp/test_voice.onnx")
cfg = VoiceConfig(piper_voice=voice_path)
loop = VoiceLoop(config=cfg)
loop._speak_piper("Hello from Piper")
# Piper was called
mock_run.assert_called_once()
cmd = mock_run.call_args[0][0]
assert cmd[0] == "piper"
assert "--model" in cmd
# Audio was played
mock_play.assert_called_once()
@patch("timmy.voice_loop.VoiceLoop._speak_say")
@patch("subprocess.run")
def test_speak_piper_falls_back_on_error(self, mock_run, mock_say):
mock_run.return_value = MagicMock(returncode=1, stderr="model error")
cfg = VoiceConfig(piper_voice=Path("/tmp/test.onnx"))
loop = VoiceLoop(config=cfg)
loop._speak_piper("test")
# Should fall back to say
mock_say.assert_called_once_with("test")
class TestHallucinationFilter:
"""Whisper tends to hallucinate on silence/noise. The loop should filter these."""
def test_known_hallucinations_filtered(self):
hallucinations = [
"you",
"thanks.",
"Thank you.",
"Bye.",
"Thanks for watching!",
"Thank you for watching!",
]
for text in hallucinations:
assert text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
), f"'{text}' should be filtered"
class TestExitCommands:
"""Voice loop should recognize exit commands."""
def test_exit_commands(self):
exits = ["goodbye", "exit", "quit", "stop", "goodbye timmy", "stop listening"]
for cmd in exits:
assert cmd.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
), f"'{cmd}' should be an exit command"
class TestPlayAudio:
@patch("subprocess.Popen")
def test_play_audio_calls_afplay(self, mock_popen):
mock_proc = MagicMock()
mock_proc.poll.side_effect = [None, 0] # Running, then done
mock_popen.return_value = mock_proc
loop = VoiceLoop()
loop._play_audio("/tmp/test.wav")
mock_popen.assert_called_once()
args = mock_popen.call_args[0][0]
assert args[0] == "afplay"
@patch("subprocess.Popen")
def test_play_audio_interruptible(self, mock_popen):
mock_proc = MagicMock()
# Simulate running, then we interrupt
call_count = 0
def poll_side_effect():
nonlocal call_count
call_count += 1
return None # Always running
mock_proc.poll.side_effect = poll_side_effect
mock_popen.return_value = mock_proc
loop = VoiceLoop()
loop._interrupted = True # Pre-set interrupt
loop._play_audio("/tmp/test.wav")
mock_proc.terminate.assert_called_once()
class TestStopMethod:
def test_stop_sets_running_false(self):
loop = VoiceLoop()
loop._running = True
loop.stop()
assert loop._running is False
class TestSpeakSetsFlag:
@patch("timmy.voice_loop.VoiceLoop._speak_say")
def test_speaking_flag_set_during_speech(self, mock_say):
cfg = VoiceConfig(use_say_fallback=True)
loop = VoiceLoop(config=cfg)
# Before speak
assert loop._speaking is False
# Mock say to check flag during execution
def check_flag(text):
assert loop._speaking is True
mock_say.side_effect = check_flag
loop._speak("Hello")
# After speak
assert loop._speaking is False