Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1fb9a1cdd4 chore: delete 22 stale/abandoned/duplicate branches (Fixes #1217)
Some checks failed
Tests / lint (pull_request) Failing after 14s
Tests / test (pull_request) Has been skipped
Deleted the following branches as identified in the audit (#1210):

Gemini abandoned (no PR, 2026-03-22):
- feature/voice-customization
- feature/enhanced-memory-ui
- feature/soul-customization
- feature/dreaming-mode
- feature/memory-visualization
- feature/voice-customization-ui
- feature/issue-1015 through feature/issue-1019

Only merge-from-main (no unique work):
- feature/self-reflection
- feature/memory-search-ui
- claude/issue-1005

Exact duplicate of feature/internal-monologue:
- feature/issue-1005

Automated salvage commits only (incomplete agent sessions):
- claude/issue-962, claude/issue-972
- gemini/issue-1006, gemini/issue-1008, gemini/issue-1010
- gemini/issue-1134, gemini/issue-1139

All 22 branches deleted via Gitea API (HTTP 204).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 18:44:50 -04:00
32 changed files with 618 additions and 4376 deletions

View File

@@ -34,44 +34,6 @@ Read [`CLAUDE.md`](CLAUDE.md) for architecture patterns and conventions.
---
## One-Agent-Per-Issue Convention
**An issue must only be worked by one agent at a time.** Duplicate branches from
multiple agents on the same issue cause merge conflicts, redundant code, and wasted compute.
### Labels
When an agent picks up an issue, add the corresponding label:
| Label | Meaning |
|-------|---------|
| `assigned-claude` | Claude is actively working this issue |
| `assigned-gemini` | Gemini is actively working this issue |
| `assigned-kimi` | Kimi is actively working this issue |
| `assigned-manus` | Manus is actively working this issue |
### Rules
1. **Before starting an issue**, check that none of the `assigned-*` labels are present.
If one is, skip the issue — another agent owns it.
2. **When you start**, add the label matching your agent (e.g. `assigned-claude`).
3. **When your PR is merged or closed**, remove the label (or it auto-clears when
the branch is deleted — see Auto-Delete below).
4. **Never assign the same issue to two agents simultaneously.**
### Auto-Delete Merged Branches
`default_delete_branch_after_merge` is **enabled** on this repo. Branches are
automatically deleted after a PR merges — no manual cleanup needed and no stale
`claude/*`, `gemini/*`, or `kimi/*` branches accumulate.
If you discover stale merged branches, they can be pruned with:
```bash
git fetch --prune
```
---
## Merge Policy (PR-Only)
**Gitea branch protection is active on `main`.** This is not a suggestion.

View File

@@ -25,19 +25,6 @@ providers:
tier: local
url: "http://localhost:11434"
models:
# ── Dual-model routing: Qwen3-8B (fast) + Qwen3-14B (quality) ──────────
# Both models fit simultaneously: ~6.6 GB + ~10.5 GB = ~17 GB combined.
# Requires OLLAMA_MAX_LOADED_MODELS=2 (set in .env) to stay hot.
# Ref: issue #1065 — Qwen3-8B/14B dual-model routing strategy
- name: qwen3:8b
context_window: 32768
capabilities: [text, tools, json, streaming, routine]
description: "Qwen3-8B Q6_K — fast router for routine tasks (~6.6 GB, 45-55 tok/s)"
- name: qwen3:14b
context_window: 40960
capabilities: [text, tools, json, streaming, complex, reasoning]
description: "Qwen3-14B Q5_K_M — complex reasoning and planning (~10.5 GB, 20-28 tok/s)"
# Text + Tools models
- name: qwen3:30b
default: true
@@ -200,20 +187,6 @@ fallback_chains:
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Complexity-based routing chains (issue #1065) ───────────────────────
# Routine tasks: prefer Qwen3-8B for low latency (~45-55 tok/s)
routine:
- qwen3:8b # Primary fast model
- llama3.1:8b-instruct # Fallback fast model
- llama3.2:3b # Smallest available
# Complex tasks: prefer Qwen3-14B for quality (~20-28 tok/s)
complex:
- qwen3:14b # Primary quality model
- hermes4-14b # Native tool calling, hybrid reasoning
- qwen3:30b # Highest local quality
- qwen2.5:14b # Additional fallback
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

View File

@@ -1,244 +0,0 @@
# Gitea Activity & Branch Audit — 2026-03-23
**Requested by:** Issue #1210
**Audited by:** Claude (Sonnet 4.6)
**Date:** 2026-03-23
**Scope:** All repos under the sovereign AI stack
---
## Executive Summary
- **18 repos audited** across 9 Gitea organizations/users
- **~6570 branches identified** as safe to delete (merged or abandoned)
- **4 open PRs** are bottlenecks awaiting review
- **3+ instances of duplicate work** across repos and agents
- **5+ branches** contain valuable unmerged code with no open PR
- **5 PRs closed without merge** on active p0-critical issues in Timmy-time-dashboard
Improvement tickets have been filed on each affected repo following this report.
---
## Repo-by-Repo Findings
---
### 1. rockachopa/Timmy-time-dashboard
**Status:** Most active repo. 1,200+ PRs, 50+ branches.
#### Dead/Abandoned Branches
| Branch | Last Commit | Status |
|--------|-------------|--------|
| `feature/voice-customization` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/enhanced-memory-ui` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/soul-customization` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/dreaming-mode` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/memory-visualization` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/voice-customization-ui` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/issue-1015` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/issue-1016` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/issue-1017` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/issue-1018` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/issue-1019` | 2026-03-22 | Gemini-created, no PR, abandoned |
| `feature/self-reflection` | 2026-03-22 | Only merge-from-main commits, no unique work |
| `feature/memory-search-ui` | 2026-03-22 | Only merge-from-main commits, no unique work |
| `claude/issue-962` | 2026-03-22 | Automated salvage commit only |
| `claude/issue-972` | 2026-03-22 | Automated salvage commit only |
| `gemini/issue-1006` | 2026-03-22 | Incomplete agent session |
| `gemini/issue-1008` | 2026-03-22 | Incomplete agent session |
| `gemini/issue-1010` | 2026-03-22 | Incomplete agent session |
| `gemini/issue-1134` | 2026-03-22 | Incomplete agent session |
| `gemini/issue-1139` | 2026-03-22 | Incomplete agent session |
#### Duplicate Branches (Identical SHA)
| Branch A | Branch B | Action |
|----------|----------|--------|
| `feature/internal-monologue` | `feature/issue-1005` | Exact duplicate — delete one |
| `claude/issue-1005` | (above) | Merge-from-main only — delete |
#### Unmerged Work With No Open PR (HIGH PRIORITY)
| Branch | Content | Issues |
|--------|---------|--------|
| `claude/issue-987` | Content moderation pipeline, Llama Guard integration | No open PR — potentially lost |
| `claude/issue-1011` | Automated skill discovery system | No open PR — potentially lost |
| `gemini/issue-976` | Semantic index for research outputs | No open PR — potentially lost |
#### PRs Closed Without Merge (Issues Still Open)
| PR | Title | Issue Status |
|----|-------|-------------|
| PR#1163 | Three-Strike Detector (#962) | p0-critical, still open |
| PR#1162 | Session Sovereignty Report Generator (#957) | p0-critical, still open |
| PR#1157 | Qwen3 routing | open |
| PR#1156 | Agent Dreaming Mode | open |
| PR#1145 | Qwen3-14B config | open |
#### Workflow Observations
- `loop-cycle` bot auto-creates micro-fix PRs at high frequency (PR numbers climbing past 1209 rapidly)
- Many `gemini/*` branches represent incomplete agent sessions, not full feature work
- Issues get reassigned across agents causing duplicate branch proliferation
---
### 2. rockachopa/hermes-agent
**Status:** Active — AutoLoRA training pipeline in progress.
#### Open PRs Awaiting Review
| PR | Title | Age |
|----|-------|-----|
| PR#33 | AutoLoRA v1 MLX QLoRA training pipeline | ~1 week |
#### Valuable Unmerged Branches (No PR)
| Branch | Content | Age |
|--------|---------|-----|
| `sovereign` | Full fallback chain: Groq/Kimi/Ollama cascade recovery | 9 days |
| `fix/vision-api-key-fallback` | Vision API key fallback fix | 9 days |
#### Stale Merged Branches (~12)
12 merged `claude/*` and `gemini/*` branches are safe to delete.
---
### 3. rockachopa/the-matrix
**Status:** 8 open PRs from `claude/the-matrix` fork all awaiting review, all batch-created on 2026-03-23.
#### Open PRs (ALL Awaiting Review)
| PR | Feature |
|----|---------|
| PR#916 | Touch controls, agent feed, particles, audio, day/night cycle, metrics panel, ASCII logo, click-to-view-PR |
These were created in a single agent session within 5 minutes — needs human review before merge.
---
### 4. replit/timmy-tower
**Status:** Very active — 100+ PRs, complex feature roadmap.
#### Open PRs Awaiting Review
| PR | Title | Age |
|----|-------|-----|
| PR#93 | Task decomposition view | Recent |
| PR#80 | `session_messages` table | 22 hours |
#### Unmerged Work With No Open PR
| Branch | Content |
|--------|---------|
| `gemini/issue-14` | NIP-07 Nostr identity |
| `gemini/issue-42` | Timmy animated eyes |
| `claude/issue-11` | Kimi + Perplexity agent integrations |
| `claude/issue-13` | Nostr event publishing |
| `claude/issue-29` | Mobile Nostr identity |
| `claude/issue-45` | Test kit |
| `claude/issue-47` | SQL migration helpers |
| `claude/issue-67` | Session Mode UI |
#### Cleanup
~30 merged `claude/*` and `gemini/*` branches are safe to delete.
---
### 5. replit/token-gated-economy
**Status:** Active roadmap, no current open PRs.
#### Stale Branches (~23)
- 8 Replit Agent branches from 2026-03-19 (PRs closed/merged)
- 15 merged `claude/issue-*` branches
All are safe to delete.
---
### 6. hermes/timmy-time-app
**Status:** 2-commit repo, created 2026-03-14, no activity since. **Candidate for archival.**
Functionality appears to be superseded by other repos in the stack. Recommend archiving or deleting if not planned for future development.
---
### 7. google/maintenance-tasks & google/wizard-council-automation
**Status:** Single-commit repos from 2026-03-19 created by "Google AI Studio". No follow-up activity.
Unclear ownership and purpose. Recommend clarifying with rockachopa whether these are active or can be archived.
---
### 8. hermes/hermes-config
**Status:** Single branch, updated 2026-03-23 (today). Active — contains Timmy orchestrator config.
No action needed.
---
### 9. Timmy_Foundation/the-nexus
**Status:** Greenfield — created 2026-03-23. 19 issues filed as roadmap. PR#2 (contributor audit) open.
No cleanup needed yet. PR#2 needs review.
---
### 10. rockachopa/alexanderwhitestone.com
**Status:** All recent `claude/*` PRs merged. 7 non-main branches are post-merge and safe to delete.
---
### 11. hermes/hermes-config, rockachopa/hermes-config, Timmy_Foundation/.profile
**Status:** Dormant config repos. No action needed.
---
## Cross-Repo Patterns & Inefficiencies
### Duplicate Work
1. **Timmy spring/wobble physics** built independently in both `replit/timmy-tower` and `replit/token-gated-economy`
2. **Nostr identity logic** fragmented across 3 repos with no shared library
3. **`feature/internal-monologue` = `feature/issue-1005`** in Timmy-time-dashboard — identical SHA, exact duplicate
### Agent Workflow Issues
- Same issue assigned to both `gemini/*` and `claude/*` agents creates duplicate branches
- Agent salvage commits are checkpoint-only — not complete work, but clutter the branch list
- Gemini `feature/*` branches created on 2026-03-22 with no PRs filed — likely a failed agent session that created branches but didn't complete the loop
### Review Bottlenecks
| Repo | Waiting PRs | Notes |
|------|-------------|-------|
| rockachopa/the-matrix | 8 | Batch-created, need human review |
| replit/timmy-tower | 2 | Database schema and UI work |
| rockachopa/hermes-agent | 1 | AutoLoRA v1 — high value |
| Timmy_Foundation/the-nexus | 1 | Contributor audit |
---
## Recommended Actions
### Immediate (This Sprint)
1. **Review & merge** PR#33 in `hermes-agent` (AutoLoRA v1)
2. **Review** 8 open PRs in `the-matrix` before merging as a batch
3. **Rescue** unmerged work in `claude/issue-987`, `claude/issue-1011`, `gemini/issue-976` — file new PRs or close branches
4. **Delete duplicate** `feature/internal-monologue` / `feature/issue-1005` branches
### Cleanup Sprint
5. **Delete ~65 stale branches** across all repos (itemized above)
6. **Investigate** the 5 closed-without-merge PRs in Timmy-time-dashboard for p0-critical issues
7. **Archive** `hermes/timmy-time-app` if no longer needed
8. **Clarify** ownership of `google/maintenance-tasks` and `google/wizard-council-automation`
### Process Improvements
9. **Enforce one-agent-per-issue** policy to prevent duplicate `claude/*` / `gemini/*` branches
10. **Add branch protection** requiring PR before merge on `main` for all repos
11. **Set a branch retention policy** — auto-delete merged branches (GitHub/Gitea supports this)
12. **Share common libraries** for Nostr identity and animation physics across repos
---
*Report generated by Claude audit agent. Improvement tickets filed per repo as follow-up to this report.*

View File

@@ -1,160 +0,0 @@
# ADR-024: Canonical Nostr Identity Location
**Status:** Accepted
**Date:** 2026-03-23
**Issue:** #1223
**Refs:** #1210 (duplicate-work audit), ROADMAP.md Phase 2
---
## Context
Nostr identity logic has been independently implemented in at least three
repos (`replit/timmy-tower`, `replit/token-gated-economy`,
`rockachopa/Timmy-time-dashboard`), each building keypair generation, event
publishing, and NIP-07 browser-extension auth in isolation.
This duplication causes:
- Bug fixes applied in one repo but silently missed in others.
- Diverging implementations of the same NIPs (NIP-01, NIP-07, NIP-44).
- Agent time wasted re-implementing logic that already exists.
ROADMAP.md Phase 2 already names `timmy-nostr` as the planned home for Nostr
infrastructure. This ADR makes that decision explicit and prescribes how
other repos consume it.
---
## Decision
**The canonical home for all Nostr identity logic is `rockachopa/timmy-nostr`.**
All other repos (`Timmy-time-dashboard`, `timmy-tower`,
`token-gated-economy`) become consumers, not implementers, of Nostr identity
primitives.
### What lives in `timmy-nostr`
| Module | Responsibility |
|--------|---------------|
| `nostr_id/keypair.py` | Keypair generation, nsec/npub encoding, encrypted storage |
| `nostr_id/identity.py` | Agent identity lifecycle (NIP-01 kind:0 profile events) |
| `nostr_id/auth.py` | NIP-07 browser-extension signer; NIP-42 relay auth |
| `nostr_id/event.py` | Event construction, signing, serialisation (NIP-01) |
| `nostr_id/crypto.py` | NIP-44 encryption (XChaCha20-Poly1305 v2) |
| `nostr_id/nip05.py` | DNS-based identifier verification |
| `nostr_id/relay.py` | WebSocket relay client (publish / subscribe) |
### What does NOT live in `timmy-nostr`
- Business logic that combines Nostr with application-specific concepts
(e.g. "publish a task-completion event" lives in the application layer
that calls `timmy-nostr`).
- Reputation scoring algorithms (depends on application policy).
- Dashboard UI components.
---
## How Other Repos Reference `timmy-nostr`
### Python repos (`Timmy-time-dashboard`, `timmy-tower`)
Add to `pyproject.toml` dependencies:
```toml
[tool.poetry.dependencies]
timmy-nostr = {git = "https://gitea.hermes.local/rockachopa/timmy-nostr.git", tag = "v0.1.0"}
```
Import pattern:
```python
from nostr_id.keypair import generate_keypair, load_keypair
from nostr_id.event import build_event, sign_event
from nostr_id.relay import NostrRelayClient
```
### JavaScript/TypeScript repos (`token-gated-economy` frontend)
Add to `package.json` (once published or via local path):
```json
"dependencies": {
"timmy-nostr": "rockachopa/timmy-nostr#v0.1.0"
}
```
Import pattern:
```typescript
import { generateKeypair, signEvent } from 'timmy-nostr';
```
Until `timmy-nostr` publishes a JS package, use NIP-07 browser extension
directly and delegate all key-management to the browser signer — never
re-implement crypto in JS without the shared library.
---
## Migration Plan
Existing duplicated code should be migrated in this order:
1. **Keypair generation** — highest duplication, clearest interface.
2. **NIP-01 event construction/signing** — used by all three repos.
3. **NIP-07 browser auth** — currently in `timmy-tower` and `token-gated-economy`.
4. **NIP-44 encryption** — lowest priority, least duplicated.
Each step: implement in `timmy-nostr` → cut over one repo → delete the
duplicate → repeat.
---
## Interface Contract
`timmy-nostr` must expose a stable public API:
```python
# Keypair
keypair = generate_keypair() # -> NostrKeypair(nsec, npub, privkey_bytes, pubkey_bytes)
keypair = load_keypair(encrypted_nsec, secret_key)
# Events
event = build_event(kind=0, content=profile_json, keypair=keypair)
event = sign_event(event, keypair) # attaches .id and .sig
# Relay
async with NostrRelayClient(url) as relay:
await relay.publish(event)
async for msg in relay.subscribe(filters):
...
```
Breaking changes to this interface require a semver major bump and a
migration note in `timmy-nostr`'s CHANGELOG.
---
## Consequences
- **Positive:** Bug fixes in cryptographic or protocol code propagate to all
repos via a version bump.
- **Positive:** New NIPs are implemented once and adopted everywhere.
- **Negative:** Adds a cross-repo dependency; version pinning discipline
required.
- **Negative:** `timmy-nostr` must be stood up and tagged before any
migration can begin.
---
## Action Items
- [ ] Create `rockachopa/timmy-nostr` repo with the module structure above.
- [ ] Implement keypair generation + NIP-01 signing as v0.1.0.
- [ ] Replace `Timmy-time-dashboard` inline Nostr code (if any) with
`timmy-nostr` import once v0.1.0 is tagged.
- [ ] Add `src/infrastructure/clients/nostr_client.py` as the thin
application-layer wrapper (see ROADMAP.md §2.6).
- [ ] File issues in `timmy-tower` and `token-gated-economy` to migrate their
duplicate implementations.

View File

@@ -1,33 +0,0 @@
import os
import sys
from pathlib import Path
# Add the src directory to the Python path
sys.path.insert(0, str(Path(__file__).parent / "src"))
from timmy.memory_system import memory_store
def index_research_documents():
research_dir = Path("docs/research")
if not research_dir.is_dir():
print(f"Research directory not found: {research_dir}")
return
print(f"Indexing research documents from {research_dir}...")
indexed_count = 0
for file_path in research_dir.glob("*.md"):
try:
content = file_path.read_text()
topic = file_path.stem.replace("-", " ").title() # Derive topic from filename
print(f"Storing '{topic}' from {file_path.name}...")
# Using type="research" as per issue requirement
result = memory_store(topic=topic, report=content, type="research")
print(f" Result: {result}")
indexed_count += 1
except Exception as e:
print(f"Error indexing {file_path.name}: {e}")
print(f"Finished indexing. Total documents indexed: {indexed_count}")
if __name__ == "__main__":
index_research_documents()

View File

@@ -1,23 +0,0 @@
# Research Direction
This file guides the `timmy learn` autoresearch loop. Edit it to focus
autonomous experiments on a specific goal.
## Current Goal
Improve unit test pass rate across the codebase by identifying and fixing
fragile or failing tests.
## Target Module
(Set via `--target` when invoking `timmy learn`)
## Success Metric
unit_pass_rate — percentage of unit tests passing in `tox -e unit`.
## Notes
- Experiments run one at a time; each is time-boxed by `--budget`.
- Improvements are committed automatically; regressions are reverted.
- Use `--dry-run` to preview hypotheses without making changes.

View File

@@ -51,13 +51,6 @@ class Settings(BaseSettings):
# Set to 0 to use model defaults.
ollama_num_ctx: int = 32768
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
# or add it to your systemd/launchd unit before starting the harness.
ollama_max_loaded_models: int = 2
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:8b,qwen2.5:14b"
# Or edit config/providers.yaml → fallback_chains for the canonical source.
@@ -235,10 +228,6 @@ class Settings(BaseSettings):
# ── Test / Diagnostics ─────────────────────────────────────────────
# Skip loading heavy embedding models (for tests / low-memory envs).
timmy_skip_embeddings: bool = False
# Embedding backend: "ollama" for Ollama, "local" for sentence-transformers.
timmy_embedding_backend: Literal["ollama", "local"] = "local"
# Ollama model to use for embeddings (e.g., "nomic-embed-text").
ollama_embedding_model: str = "nomic-embed-text"
# Disable CSRF middleware entirely (for tests).
timmy_disable_csrf: bool = False
# Mark the process as running in test mode.

View File

@@ -49,7 +49,6 @@ from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -677,7 +676,6 @@ app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(three_strike_router)
@app.websocket("/ws")

View File

@@ -1,118 +0,0 @@
"""Three-Strike Detector dashboard routes.
Provides JSON API endpoints for inspecting and managing the three-strike
detector state.
Refs: #962
"""
import logging
from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from timmy.sovereignty.three_strike import CATEGORIES, get_detector
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sovereignty/three-strike", tags=["three-strike"])
class RecordRequest(BaseModel):
category: str
key: str
metadata: dict[str, Any] = {}
class AutomationRequest(BaseModel):
artifact_path: str
@router.get("")
async def list_strikes() -> dict[str, Any]:
"""Return all strike records."""
detector = get_detector()
records = detector.list_all()
return {
"records": [
{
"category": r.category,
"key": r.key,
"count": r.count,
"blocked": r.blocked,
"automation": r.automation,
"first_seen": r.first_seen,
"last_seen": r.last_seen,
}
for r in records
],
"categories": sorted(CATEGORIES),
}
@router.get("/blocked")
async def list_blocked() -> dict[str, Any]:
"""Return only blocked (category, key) pairs."""
detector = get_detector()
records = detector.list_blocked()
return {
"blocked": [
{
"category": r.category,
"key": r.key,
"count": r.count,
"automation": r.automation,
"last_seen": r.last_seen,
}
for r in records
]
}
@router.post("/record")
async def record_strike(body: RecordRequest) -> dict[str, Any]:
"""Record a manual action. Returns strike state; 409 when blocked."""
from timmy.sovereignty.three_strike import ThreeStrikeError
detector = get_detector()
try:
record = detector.record(body.category, body.key, body.metadata)
return {
"category": record.category,
"key": record.key,
"count": record.count,
"blocked": record.blocked,
"automation": record.automation,
}
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except ThreeStrikeError as exc:
raise HTTPException(
status_code=409,
detail={
"error": "three_strike_block",
"message": str(exc),
"category": exc.category,
"key": exc.key,
"count": exc.count,
},
) from exc
@router.post("/{category}/{key}/automation")
async def register_automation(
category: str, key: str, body: AutomationRequest
) -> dict[str, bool]:
"""Register an automation artifact to unblock a (category, key) pair."""
detector = get_detector()
detector.register_automation(category, key, body.artifact_path)
return {"success": True}
@router.get("/{category}/{key}/events")
async def get_strike_events(category: str, key: str, limit: int = 50) -> dict[str, Any]:
"""Return the individual strike events for a (category, key) pair."""
detector = get_detector()
events = detector.get_events(category, key, limit=limit)
return {"category": category, "key": key, "events": events}

View File

@@ -2,7 +2,6 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .classifier import TaskComplexity, classify_task
from .history import HealthHistoryStore, get_history_store
from .metabolic import (
DEFAULT_TIER_MODELS,
@@ -28,7 +27,4 @@ __all__ = [
"classify_complexity",
"build_prompt",
"get_metabolic_router",
# Classifier
"TaskComplexity",
"classify_task",
]

View File

@@ -593,34 +593,6 @@ class CascadeRouter:
"is_fallback_model": is_fallback_model,
}
def _get_model_for_complexity(
self, provider: Provider, complexity: "TaskComplexity"
) -> str | None:
"""Return the best model on *provider* for the given complexity tier.
Checks fallback chains first (routine / complex), then falls back to
any model with the matching capability tag, then the provider default.
"""
from infrastructure.router.classifier import TaskComplexity
chain_key = "routine" if complexity == TaskComplexity.SIMPLE else "complex"
# Walk the capability fallback chain — first model present on this provider wins
for model_name in self.config.fallback_chains.get(chain_key, []):
if any(m["name"] == model_name for m in provider.models):
return model_name
# Direct capability lookup — only return if a model explicitly has the tag
# (do not use get_model_with_capability here as it falls back to the default)
cap_model = next(
(m["name"] for m in provider.models if chain_key in m.get("capabilities", [])),
None,
)
if cap_model:
return cap_model
return None # Caller will use provider default
async def complete(
self,
messages: list[dict],
@@ -628,7 +600,6 @@ class CascadeRouter:
temperature: float = 0.7,
max_tokens: int | None = None,
cascade_tier: str | None = None,
complexity_hint: str | None = None,
) -> dict:
"""Complete a chat conversation with automatic failover.
@@ -637,103 +608,33 @@ class CascadeRouter:
- Falls back to vision-capable models when needed
- Supports image URLs, paths, and base64 encoding
Complexity-based routing (issue #1065):
- ``complexity_hint="simple"`` → routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` → routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) → auto-classifies from messages
Args:
messages: List of message dicts with role and content
model: Preferred model (tries this first; complexity routing is
skipped when an explicit model is given)
model: Preferred model (tries this first, then provider defaults)
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
cascade_tier: If specified, filters providers by this tier.
- "frontier_required": Uses only Anthropic provider for top-tier models.
complexity_hint: "simple", "complex", or None (auto-detect).
Returns:
Dict with content, provider_used, model, latency_ms,
is_fallback_model, and complexity fields.
Dict with content, provider_used, and metrics
Raises:
RuntimeError: If all providers fail
"""
from infrastructure.router.classifier import TaskComplexity, classify_task
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
# Resolve task complexity ─────────────────────────────────────────────
# Skip complexity routing when caller explicitly specifies a model.
complexity: TaskComplexity | None = None
if model is None:
if complexity_hint is not None:
try:
complexity = TaskComplexity(complexity_hint.lower())
except ValueError:
logger.warning("Unknown complexity_hint %r, auto-classifying", complexity_hint)
complexity = classify_task(messages)
else:
complexity = classify_task(messages)
logger.debug("Task complexity: %s", complexity.value)
errors: list[str] = []
providers = self._filter_providers(cascade_tier)
for provider in providers:
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
# Complexity-based model selection (only when no explicit model) ──
effective_model = model
if effective_model is None and complexity is not None:
effective_model = self._get_model_for_complexity(provider, complexity)
if effective_model:
logger.debug(
"Complexity routing [%s]: %s%s",
complexity.value,
provider.name,
effective_model,
)
selected_model, is_fallback_model = self._select_model(
provider, effective_model, content_type
result = await self._try_single_provider(
provider, messages, model, temperature, max_tokens, content_type, errors
)
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
"complexity": complexity.value if complexity is not None else None,
}
if result is not None:
return result
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -1,166 +0,0 @@
"""Task complexity classifier for Qwen3 dual-model routing.
Classifies incoming tasks as SIMPLE (route to Qwen3-8B for low-latency)
or COMPLEX (route to Qwen3-14B for quality-sensitive work).
Classification is fully heuristic — no LLM inference required.
"""
import re
from enum import Enum
class TaskComplexity(Enum):
"""Task complexity tier for model routing."""
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
# Keywords strongly associated with complex tasks
_COMPLEX_KEYWORDS: frozenset[str] = frozenset(
[
"plan",
"review",
"analyze",
"analyse",
"triage",
"refactor",
"design",
"architecture",
"implement",
"compare",
"debug",
"explain",
"prioritize",
"prioritise",
"strategy",
"optimize",
"optimise",
"evaluate",
"assess",
"brainstorm",
"outline",
"summarize",
"summarise",
"generate code",
"write a",
"write the",
"code review",
"pull request",
"multi-step",
"multi step",
"step by step",
"backlog prioriti",
"issue triage",
"root cause",
"how does",
"why does",
"what are the",
]
)
# Keywords strongly associated with simple/routine tasks
_SIMPLE_KEYWORDS: frozenset[str] = frozenset(
[
"status",
"list ",
"show ",
"what is",
"how many",
"ping",
"run ",
"execute ",
"ls ",
"cat ",
"ps ",
"fetch ",
"count ",
"tail ",
"head ",
"grep ",
"find file",
"read file",
"get ",
"query ",
"check ",
"yes",
"no",
"ok",
"done",
"thanks",
]
)
# Content longer than this is treated as complex regardless of keywords
_COMPLEX_CHAR_THRESHOLD = 500
# Short content defaults to simple
_SIMPLE_CHAR_THRESHOLD = 150
# More than this many messages suggests an ongoing complex conversation
_COMPLEX_CONVERSATION_DEPTH = 6
def classify_task(messages: list[dict]) -> TaskComplexity:
"""Classify task complexity from a list of messages.
Uses heuristic rules — no LLM call required. Errs toward COMPLEX
when uncertain so that quality is preserved.
Args:
messages: List of message dicts with ``role`` and ``content`` keys.
Returns:
TaskComplexity.SIMPLE or TaskComplexity.COMPLEX
"""
if not messages:
return TaskComplexity.SIMPLE
# Concatenate all user-turn content for analysis
user_content = " ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human")
and isinstance(msg.get("content"), str)
).lower().strip()
if not user_content:
return TaskComplexity.SIMPLE
# Complexity signals override everything -----------------------------------
# Explicit complex keywords
for kw in _COMPLEX_KEYWORDS:
if kw in user_content:
return TaskComplexity.COMPLEX
# Numbered / multi-step instruction list: "1. do this 2. do that"
if re.search(r"\b\d+\.\s+\w", user_content):
return TaskComplexity.COMPLEX
# Code blocks embedded in messages
if "```" in user_content:
return TaskComplexity.COMPLEX
# Long content → complex reasoning likely required
if len(user_content) > _COMPLEX_CHAR_THRESHOLD:
return TaskComplexity.COMPLEX
# Deep conversation → complex ongoing task
if len(messages) > _COMPLEX_CONVERSATION_DEPTH:
return TaskComplexity.COMPLEX
# Simplicity signals -------------------------------------------------------
# Explicit simple keywords
for kw in _SIMPLE_KEYWORDS:
if kw in user_content:
return TaskComplexity.SIMPLE
# Short single-sentence messages default to simple
if len(user_content) <= _SIMPLE_CHAR_THRESHOLD:
return TaskComplexity.SIMPLE
# When uncertain, prefer quality (complex model)
return TaskComplexity.COMPLEX

View File

@@ -212,212 +212,3 @@ def _append_result(workspace: Path, result: dict[str, Any]) -> None:
results_file.parent.mkdir(parents=True, exist_ok=True)
with results_file.open("a") as f:
f.write(json.dumps(result) + "\n")
def _extract_pass_rate(output: str) -> float | None:
"""Extract pytest pass rate as a percentage from tox/pytest output."""
passed_m = re.search(r"(\d+) passed", output)
failed_m = re.search(r"(\d+) failed", output)
if passed_m:
passed = int(passed_m.group(1))
failed = int(failed_m.group(1)) if failed_m else 0
total = passed + failed
return (passed / total * 100.0) if total > 0 else 100.0
return None
def _extract_coverage(output: str) -> float | None:
"""Extract total coverage percentage from coverage output."""
coverage_m = re.search(r"(?:TOTAL\s+\d+\s+\d+\s+|Total coverage:\s*)(\d+)%", output)
if coverage_m:
try:
return float(coverage_m.group(1))
except ValueError:
pass
return None
# ── Higher-is-better metric names ────────────────────────────────────────────
_HIGHER_IS_BETTER = frozenset({"unit_pass_rate", "coverage"})
class SystemExperiment:
"""An autoresearch experiment targeting a specific module with a configurable metric.
Encapsulates the hypothesis → edit → tox → evaluate → commit/revert loop
for a single target file or module.
Args:
target: Path or module name to optimise (e.g. ``src/timmy/agent.py``).
metric: Metric to extract from tox output. Built-in values:
``unit_pass_rate`` (default), ``coverage``, ``val_bpb``.
Any other value is forwarded to :func:`_extract_metric`.
budget_minutes: Wall-clock budget per experiment (default 5 min).
workspace: Working directory for subprocess calls. Defaults to ``cwd``.
"""
def __init__(
self,
target: str,
metric: str = "unit_pass_rate",
budget_minutes: int = 5,
workspace: Path | None = None,
) -> None:
self.target = target
self.metric = metric
self.budget_seconds = budget_minutes * 60
self.workspace = Path(workspace) if workspace else Path.cwd()
# ── Hypothesis generation ─────────────────────────────────────────────────
def generate_hypothesis(self, program_content: str = "") -> str:
"""Return a plain-English hypothesis for the next experiment.
Uses the first non-empty line of *program_content* when available;
falls back to a generic description based on target and metric.
"""
first_line = ""
for line in program_content.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
first_line = stripped[:120]
break
if first_line:
return f"[{self.target}] {first_line}"
return f"Improve {self.metric} for {self.target}"
# ── Edit phase ────────────────────────────────────────────────────────────
def apply_edit(self, hypothesis: str, model: str = "qwen3:30b") -> str:
"""Apply code edits to *target* via Aider.
Returns a status string. Degrades gracefully — never raises.
"""
prompt = f"Edit {self.target}: {hypothesis}"
try:
result = subprocess.run(
["aider", "--no-git", "--model", f"ollama/{model}", "--quiet", prompt],
capture_output=True,
text=True,
timeout=self.budget_seconds,
cwd=str(self.workspace),
)
if result.returncode == 0:
return result.stdout or "Edit applied."
return f"Aider error (exit {result.returncode}): {result.stderr[:500]}"
except FileNotFoundError:
logger.warning("Aider not installed — edit skipped")
return "Aider not available — edit skipped"
except subprocess.TimeoutExpired:
logger.warning("Aider timed out after %ds", self.budget_seconds)
return "Aider timed out"
except (OSError, subprocess.SubprocessError) as exc:
logger.warning("Aider failed: %s", exc)
return f"Edit failed: {exc}"
# ── Evaluation phase ──────────────────────────────────────────────────────
def run_tox(self, tox_env: str = "unit") -> dict[str, Any]:
"""Run *tox_env* and return a result dict.
Returns:
Dict with keys: ``metric`` (float|None), ``log`` (str),
``duration_s`` (int), ``success`` (bool), ``error`` (str|None).
"""
start = time.monotonic()
try:
result = subprocess.run(
["tox", "-e", tox_env],
capture_output=True,
text=True,
timeout=self.budget_seconds,
cwd=str(self.workspace),
)
duration = int(time.monotonic() - start)
output = result.stdout + result.stderr
metric_val = self._extract_tox_metric(output)
return {
"metric": metric_val,
"log": output[-3000:],
"duration_s": duration,
"success": result.returncode == 0,
"error": None if result.returncode == 0 else f"Exit code {result.returncode}",
}
except subprocess.TimeoutExpired:
duration = int(time.monotonic() - start)
return {
"metric": None,
"log": f"Budget exceeded after {self.budget_seconds}s",
"duration_s": duration,
"success": False,
"error": f"Budget exceeded after {self.budget_seconds}s",
}
except OSError as exc:
return {"metric": None, "log": "", "duration_s": 0, "success": False, "error": str(exc)}
def _extract_tox_metric(self, output: str) -> float | None:
"""Dispatch to the correct metric extractor based on *self.metric*."""
if self.metric == "unit_pass_rate":
return _extract_pass_rate(output)
if self.metric == "coverage":
return _extract_coverage(output)
return _extract_metric(output, self.metric)
def evaluate(self, current: float | None, baseline: float | None) -> str:
"""Compare *current* metric against *baseline* and return an assessment."""
if current is None:
return "Indeterminate: metric not extracted from output"
if baseline is None:
unit = "%" if self.metric in _HIGHER_IS_BETTER else ""
return f"Baseline: {self.metric} = {current:.2f}{unit}"
if self.metric in _HIGHER_IS_BETTER:
delta = current - baseline
pct = (delta / baseline * 100) if baseline != 0 else 0.0
if delta > 0:
return f"Improvement: {self.metric} {baseline:.2f}% → {current:.2f}% ({pct:+.2f}%)"
if delta < 0:
return f"Regression: {self.metric} {baseline:.2f}% → {current:.2f}% ({pct:+.2f}%)"
return f"No change: {self.metric} = {current:.2f}%"
# lower-is-better (val_bpb, loss, etc.)
return evaluate_result(current, baseline, self.metric)
def is_improvement(self, current: float, baseline: float) -> bool:
"""Return True if *current* is better than *baseline* for this metric."""
if self.metric in _HIGHER_IS_BETTER:
return current > baseline
return current < baseline # lower-is-better
# ── Git phase ─────────────────────────────────────────────────────────────
def commit_changes(self, message: str) -> bool:
"""Stage and commit all changes. Returns True on success."""
try:
subprocess.run(
["git", "add", "-A"], cwd=str(self.workspace), check=True, timeout=30
)
subprocess.run(
["git", "commit", "-m", message],
cwd=str(self.workspace),
check=True,
timeout=30,
)
return True
except subprocess.CalledProcessError as exc:
logger.warning("Git commit failed: %s", exc)
return False
def revert_changes(self) -> bool:
"""Revert all uncommitted changes. Returns True on success."""
try:
subprocess.run(
["git", "checkout", "--", "."],
cwd=str(self.workspace),
check=True,
timeout=30,
)
return True
except subprocess.CalledProcessError as exc:
logger.warning("Git revert failed: %s", exc)
return False

View File

@@ -527,159 +527,5 @@ def healthcheck(
raise typer.Exit(result.returncode)
@app.command()
def learn(
target: str | None = typer.Option(
None,
"--target",
"-t",
help="Module or file to optimise (e.g. 'src/timmy/agent.py')",
),
metric: str = typer.Option(
"unit_pass_rate",
"--metric",
"-m",
help="Metric to track: unit_pass_rate | coverage | val_bpb | <custom>",
),
budget: int = typer.Option(
5,
"--budget",
help="Time limit per experiment in minutes",
),
max_experiments: int = typer.Option(
10,
"--max-experiments",
help="Cap on total experiments per run",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
help="Show hypothesis without executing experiments",
),
program_file: str | None = typer.Option(
None,
"--program",
"-p",
help="Path to research direction file (default: program.md in cwd)",
),
tox_env: str = typer.Option(
"unit",
"--tox-env",
help="Tox environment to run for each evaluation",
),
model: str = typer.Option(
"qwen3:30b",
"--model",
help="Ollama model forwarded to Aider for code edits",
),
):
"""Start an autonomous improvement loop (autoresearch).
Reads program.md for research direction, then iterates:
hypothesis → edit → tox → evaluate → commit/revert.
Experiments continue until --max-experiments is reached or the loop is
interrupted with Ctrl+C. Use --dry-run to preview hypotheses without
making any changes.
Example:
timmy learn --target src/timmy/agent.py --metric unit_pass_rate
"""
from pathlib import Path
from timmy.autoresearch import SystemExperiment
repo_root = Path.cwd()
program_path = Path(program_file) if program_file else repo_root / "program.md"
if program_path.exists():
program_content = program_path.read_text()
typer.echo(f"Research direction: {program_path}")
else:
program_content = ""
typer.echo(
f"Note: {program_path} not found — proceeding without research direction.",
err=True,
)
if target is None:
typer.echo(
"Error: --target is required. Specify the module or file to optimise.", err=True
)
raise typer.Exit(1)
experiment = SystemExperiment(
target=target,
metric=metric,
budget_minutes=budget,
)
typer.echo()
typer.echo(typer.style("Autoresearch", bold=True) + f"{target}")
typer.echo(f" metric={metric} budget={budget}min max={max_experiments} tox={tox_env}")
if dry_run:
typer.echo(" (dry-run — no changes will be made)")
typer.echo()
baseline: float | None = None
try:
for i in range(1, max_experiments + 1):
typer.echo(typer.style(f"[{i}/{max_experiments}]", bold=True), nl=False)
hypothesis = experiment.generate_hypothesis(program_content)
typer.echo(f" {hypothesis}")
if dry_run:
continue
# Edit phase
typer.echo(" → editing …", nl=False)
edit_result = experiment.apply_edit(hypothesis, model=model)
if "not available" in edit_result or edit_result.startswith("Aider error"):
typer.echo(f" skipped ({edit_result.split(':')[0]})")
else:
typer.echo(" done")
# Evaluate phase
typer.echo(" → running tox …", nl=False)
tox_result = experiment.run_tox(tox_env=tox_env)
typer.echo(f" {tox_result['duration_s']}s")
assessment = experiment.evaluate(tox_result["metric"], baseline)
typer.echo(f"{assessment}")
if tox_result["metric"] is not None and baseline is None:
baseline = tox_result["metric"]
if tox_result["success"] and tox_result["metric"] is not None and baseline is not None:
if experiment.is_improvement(tox_result["metric"], baseline):
commit_msg = (
f"autoresearch: improve {metric} on {target}{assessment}"
)
if experiment.commit_changes(commit_msg):
typer.echo(" → committed")
baseline = tox_result["metric"]
else:
experiment.revert_changes()
typer.echo(" → commit failed, reverted")
else:
experiment.revert_changes()
typer.echo(" → reverted (no improvement)")
elif not tox_result["success"]:
experiment.revert_changes()
typer.echo(f" → reverted ({tox_result['error']})")
typer.echo()
except KeyboardInterrupt:
typer.echo("\nInterrupted.")
raise typer.Exit(0) from None
typer.echo(typer.style("Autoresearch complete.", bold=True))
if baseline is not None:
typer.echo(f"Final {metric}: {baseline:.4f}")
def main():
app()

View File

@@ -9,81 +9,35 @@ Also includes vector similarity utilities (cosine similarity, keyword overlap).
import hashlib
import logging
import math
import json
import httpx # Import httpx for Ollama API calls
from config import settings
logger = logging.getLogger(__name__)
# Embedding model - small, fast, local
EMBEDDING_MODEL = None
EMBEDDING_DIM = 384 # MiniLM dimension, will be overridden if Ollama model has different dim
EMBEDDING_DIM = 384 # MiniLM dimension
class OllamaEmbedder:
"""Mimics SentenceTransformer interface for Ollama."""
def __init__(self, model_name: str, ollama_url: str):
self.model_name = model_name
self.ollama_url = ollama_url
self.dimension = 0 # Will be updated after first call
def encode(self, sentences: str | list[str], convert_to_numpy: bool = False, normalize_embeddings: bool = True) -> list[list[float]] | list[float]:
"""Generate embeddings using Ollama."""
if isinstance(sentences, str):
sentences = [sentences]
all_embeddings = []
for sentence in sentences:
try:
response = httpx.post(
f"{self.ollama_url}/api/embeddings",
json={"model": self.model_name, "prompt": sentence},
timeout=settings.mcp_bridge_timeout,
)
response.raise_for_status()
embedding = response.json()["embedding"]
if not self.dimension:
self.dimension = len(embedding) # Set dimension on first successful call
global EMBEDDING_DIM
EMBEDDING_DIM = self.dimension # Update global EMBEDDING_DIM
all_embeddings.append(embedding)
except httpx.RequestError as exc:
logger.error("Ollama embeddings request failed: %s", exc)
# Fallback to simple hash embedding on Ollama error
return _simple_hash_embedding(sentence)
except json.JSONDecodeError as exc:
logger.error("Failed to decode Ollama embeddings response: %s", exc)
return _simple_hash_embedding(sentence)
if len(all_embeddings) == 1 and isinstance(sentences, str):
return all_embeddings[0]
return all_embeddings
def _get_embedding_model():
"""Lazy-load embedding model, preferring Ollama if configured."""
"""Lazy-load embedding model."""
global EMBEDDING_MODEL
global EMBEDDING_DIM
if EMBEDDING_MODEL is None:
if settings.timmy_skip_embeddings:
EMBEDDING_MODEL = False
return EMBEDDING_MODEL
try:
from config import settings
if settings.timmy_embedding_backend == "ollama":
logger.info("MemorySystem: Using Ollama for embeddings with model %s", settings.ollama_embedding_model)
EMBEDDING_MODEL = OllamaEmbedder(settings.ollama_embedding_model, settings.normalized_ollama_url)
# We don't know the dimension until after the first call, so keep it default for now.
# It will be updated dynamically in OllamaEmbedder.encode
return EMBEDDING_MODEL
else:
try:
from sentence_transformers import SentenceTransformer
if settings.timmy_skip_embeddings:
EMBEDDING_MODEL = False
return EMBEDDING_MODEL
except ImportError:
pass
EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
EMBEDDING_DIM = 384 # Reset to MiniLM dimension
logger.info("MemorySystem: Loaded local embedding model (all-MiniLM-L6-v2)")
except ImportError:
logger.warning("MemorySystem: sentence-transformers not installed, using fallback")
EMBEDDING_MODEL = False # Use fallback
try:
from sentence_transformers import SentenceTransformer
EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("MemorySystem: Loaded embedding model")
except ImportError:
logger.warning("MemorySystem: sentence-transformers not installed, using fallback")
EMBEDDING_MODEL = False # Use fallback
return EMBEDDING_MODEL
@@ -106,14 +60,10 @@ def embed_text(text: str) -> list[float]:
model = _get_embedding_model()
if model and model is not False:
embedding = model.encode(text)
# Ensure it's a list of floats, not numpy array
if hasattr(embedding, 'tolist'):
return embedding.tolist()
return embedding
return embedding.tolist()
return _simple_hash_embedding(text)
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b, strict=False))

View File

@@ -1206,7 +1206,7 @@ memory_searcher = MemorySearcher()
# ───────────────────────────────────────────────────────────────────────────────
def memory_search(query: str, limit: int = 10) -> str:
def memory_search(query: str, top_k: int = 5) -> str:
"""Search past conversations, notes, and stored facts for relevant context.
Searches across both the vault (indexed markdown files) and the
@@ -1215,19 +1215,19 @@ def memory_search(query: str, limit: int = 10) -> str:
Args:
query: What to search for (e.g. "Bitcoin strategy", "server setup").
limit: Number of results to return (default 10).
top_k: Number of results to return (default 5).
Returns:
Formatted string of relevant memory results.
"""
# Guard: model sometimes passes None for limit
if limit is None:
limit = 10
# Guard: model sometimes passes None for top_k
if top_k is None:
top_k = 5
parts: list[str] = []
# 1. Search semantic vault (indexed markdown files)
vault_results = semantic_memory.search(query, limit)
vault_results = semantic_memory.search(query, top_k)
for content, score in vault_results:
if score < 0.2:
continue
@@ -1235,7 +1235,7 @@ def memory_search(query: str, limit: int = 10) -> str:
# 2. Search runtime vector store (stored facts/conversations)
try:
runtime_results = search_memories(query, limit=limit, min_relevance=0.2)
runtime_results = search_memories(query, limit=top_k, min_relevance=0.2)
for entry in runtime_results:
label = entry.context_type or "memory"
parts.append(f"[{label}] {entry.content[:300]}")
@@ -1289,48 +1289,45 @@ def memory_read(query: str = "", top_k: int = 5) -> str:
return "\n".join(parts)
def memory_store(topic: str, report: str, type: str = "research") -> str:
"""Store a piece of information in persistent memory, particularly for research outputs.
def memory_write(content: str, context_type: str = "fact") -> str:
"""Store a piece of information in persistent memory.
Use this tool to store structured research findings or other important documents.
Stored memories are searchable via memory_search across all channels.
Use this tool when the user explicitly asks you to remember something.
Stored memories are searchable via memory_search across all channels
(web GUI, Discord, Telegram, etc.).
Args:
topic: A concise title or topic for the research output.
report: The detailed content of the research output or document.
type: Type of memory — "research" for research outputs (default),
"fact" for permanent facts, "conversation" for conversation context,
"document" for other document fragments.
content: The information to remember (e.g. a phrase, fact, or note).
context_type: Type of memory — "fact" for permanent facts,
"conversation" for conversation context,
"document" for document fragments.
Returns:
Confirmation that the memory was stored.
"""
if not report or not report.strip():
return "Nothing to store — report is empty."
if not content or not content.strip():
return "Nothing to store — content is empty."
# Combine topic and report for embedding and storage content
full_content = f"Topic: {topic.strip()}\n\nReport: {report.strip()}"
valid_types = ("fact", "conversation", "document", "research")
if type not in valid_types:
type = "research"
valid_types = ("fact", "conversation", "document")
if context_type not in valid_types:
context_type = "fact"
try:
# Dedup check for facts and research — skip if similar exists
if type in ("fact", "research"):
# Dedup check for facts — skip if a similar fact already exists
# Threshold 0.75 catches paraphrases (was 0.9 which only caught near-exact)
if context_type == "fact":
existing = search_memories(
full_content, limit=3, context_type=type, min_relevance=0.75
content.strip(), limit=3, context_type="fact", min_relevance=0.75
)
if existing:
return f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate."
return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate."
entry = store_memory(
content=full_content,
content=content.strip(),
source="agent",
context_type=type,
metadata={"topic": topic},
context_type=context_type,
)
return f"Stored in memory (type={type}, id={entry.id[:8]}). This is now searchable across all channels."
return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels."
except Exception as exc:
logger.error("Failed to write memory: %s", exc)
return f"Failed to store memory: {exc}"

View File

@@ -4,8 +4,4 @@ Tracks how much of each AI layer (perception, decision, narration)
runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard.
Refs: #954, #953
Three-strike detector and automation enforcement.
Refs: #962
"""

View File

@@ -1,486 +0,0 @@
"""Three-Strike Detector for Repeated Manual Work.
Tracks recurring manual actions by category and key. When the same action
is performed three or more times, it blocks further attempts and requires
an automation artifact to be registered first.
Strike 1 (count=1): discovery — action proceeds normally
Strike 2 (count=2): warning — action proceeds with a logged warning
Strike 3 (count≥3): blocked — raises ThreeStrikeError; caller must
register an automation artifact first
Governing principle: "If you do the same thing manually three times,
you have failed to crystallise."
Categories tracked:
- vlm_prompt_edit VLM prompt edits for the same UI element
- game_bug_review Manual game-bug reviews for the same bug type
- parameter_tuning Manual parameter tuning for the same parameter
- portal_adapter_creation Manual portal-adapter creation for same pattern
- deployment_step Manual deployment steps
The Falsework Checklist is enforced before cloud API calls via
:func:`falsework_check`.
Refs: #962
"""
from __future__ import annotations
import json
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────
DB_PATH = Path(settings.repo_root) / "data" / "three_strike.db"
CATEGORIES = frozenset(
{
"vlm_prompt_edit",
"game_bug_review",
"parameter_tuning",
"portal_adapter_creation",
"deployment_step",
}
)
STRIKE_WARNING = 2
STRIKE_BLOCK = 3
_SCHEMA = """
CREATE TABLE IF NOT EXISTS strikes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
count INTEGER NOT NULL DEFAULT 0,
blocked INTEGER NOT NULL DEFAULT 0,
automation TEXT DEFAULT NULL,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_strikes_cat_key ON strikes(category, key);
CREATE INDEX IF NOT EXISTS idx_strikes_blocked ON strikes(blocked);
CREATE TABLE IF NOT EXISTS strike_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
strike_num INTEGER NOT NULL,
metadata TEXT DEFAULT '{}',
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_se_cat_key ON strike_events(category, key);
CREATE INDEX IF NOT EXISTS idx_se_ts ON strike_events(timestamp);
"""
# ── Exceptions ────────────────────────────────────────────────────────────────
class ThreeStrikeError(RuntimeError):
"""Raised when a manual action has reached the third strike.
Attributes:
category: The action category (e.g. ``"vlm_prompt_edit"``).
key: The specific action key (e.g. a UI element name).
count: Total number of times this action has been recorded.
"""
def __init__(self, category: str, key: str, count: int) -> None:
self.category = category
self.key = key
self.count = count
super().__init__(
f"Three-strike block: '{category}/{key}' has been performed manually "
f"{count} time(s). Register an automation artifact before continuing. "
f"Run the Falsework Checklist (see three_strike.falsework_check)."
)
# ── Data classes ──────────────────────────────────────────────────────────────
@dataclass
class StrikeRecord:
"""State for one (category, key) pair."""
category: str
key: str
count: int
blocked: bool
automation: str | None
first_seen: str
last_seen: str
@dataclass
class FalseworkChecklist:
"""Pre-cloud-API call checklist — must be completed before making
expensive external calls.
Instantiate and call :meth:`validate` to ensure all answers are provided.
"""
durable_artifact: str = ""
artifact_storage_path: str = ""
local_rule_or_cache: str = ""
will_repeat: bool | None = None
elimination_strategy: str = ""
sovereignty_delta: str = ""
# ── internal ──
_errors: list[str] = field(default_factory=list, init=False, repr=False)
def validate(self) -> list[str]:
"""Return a list of unanswered questions. Empty list → checklist passes."""
self._errors = []
if not self.durable_artifact.strip():
self._errors.append("Q1: What durable artifact will this call produce?")
if not self.artifact_storage_path.strip():
self._errors.append("Q2: Where will the artifact be stored locally?")
if not self.local_rule_or_cache.strip():
self._errors.append("Q3: What local rule or cache will this populate?")
if self.will_repeat is None:
self._errors.append("Q4: After this call, will I need to make it again?")
if self.will_repeat and not self.elimination_strategy.strip():
self._errors.append("Q5: If yes, what would eliminate the repeat?")
if not self.sovereignty_delta.strip():
self._errors.append("Q6: What is the sovereignty delta of this call?")
return self._errors
@property
def passed(self) -> bool:
"""True when :meth:`validate` found no unanswered questions."""
return len(self.validate()) == 0
# ── Store ─────────────────────────────────────────────────────────────────────
class ThreeStrikeStore:
"""SQLite-backed three-strike store.
Thread-safe: creates a new connection per operation.
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()
# ── setup ─────────────────────────────────────────────────────────────
def _init_db(self) -> None:
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
conn.commit()
except Exception as exc:
logger.warning("Failed to initialise three-strike DB: %s", exc)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
return conn
# ── record ────────────────────────────────────────────────────────────
def record(
self,
category: str,
key: str,
metadata: dict[str, Any] | None = None,
) -> StrikeRecord:
"""Record a manual action and return the updated :class:`StrikeRecord`.
Raises :exc:`ThreeStrikeError` when the action is already blocked
(count ≥ STRIKE_BLOCK) and no automation has been registered.
Args:
category: Action category; must be in :data:`CATEGORIES`.
key: Specific identifier within the category.
metadata: Optional context stored alongside the event.
Returns:
The updated :class:`StrikeRecord`.
Raises:
ValueError: If *category* is not in :data:`CATEGORIES`.
ThreeStrikeError: On the third (or later) strike with no automation.
"""
if category not in CATEGORIES:
raise ValueError(
f"Unknown category '{category}'. Valid: {sorted(CATEGORIES)}"
)
now = datetime.now(UTC).isoformat()
meta_json = json.dumps(metadata or {})
try:
with closing(self._connect()) as conn:
# Upsert the aggregate row
conn.execute(
"""
INSERT INTO strikes (category, key, count, blocked, first_seen, last_seen)
VALUES (?, ?, 1, 0, ?, ?)
ON CONFLICT(category, key) DO UPDATE SET
count = count + 1,
last_seen = excluded.last_seen
""",
(category, key, now, now),
)
row = conn.execute(
"SELECT * FROM strikes WHERE category=? AND key=?",
(category, key),
).fetchone()
count = row["count"]
blocked = bool(row["blocked"])
automation = row["automation"]
# Record the individual event
conn.execute(
"INSERT INTO strike_events (category, key, strike_num, metadata, timestamp) "
"VALUES (?, ?, ?, ?, ?)",
(category, key, count, meta_json, now),
)
# Mark as blocked once threshold reached
if count >= STRIKE_BLOCK and not blocked:
conn.execute(
"UPDATE strikes SET blocked=1 WHERE category=? AND key=?",
(category, key),
)
blocked = True
conn.commit()
except ThreeStrikeError:
raise
except Exception as exc:
logger.warning("Three-strike DB error during record: %s", exc)
# Re-raise DB errors so callers are aware
raise
record = StrikeRecord(
category=category,
key=key,
count=count,
blocked=blocked,
automation=automation,
first_seen=row["first_seen"],
last_seen=now,
)
self._emit_log(record)
if blocked and not automation:
raise ThreeStrikeError(category=category, key=key, count=count)
return record
def _emit_log(self, record: StrikeRecord) -> None:
"""Log a warning or info message based on strike number."""
if record.count == STRIKE_WARNING:
logger.warning(
"Three-strike WARNING: '%s/%s' has been performed manually %d times. "
"Consider writing an automation.",
record.category,
record.key,
record.count,
)
elif record.count >= STRIKE_BLOCK:
logger.warning(
"Three-strike BLOCK: '%s/%s' reached %d strikes — automation required.",
record.category,
record.key,
record.count,
)
else:
logger.info(
"Three-strike discovery: '%s/%s' — strike %d.",
record.category,
record.key,
record.count,
)
# ── automation registration ───────────────────────────────────────────
def register_automation(
self,
category: str,
key: str,
artifact_path: str,
) -> None:
"""Unblock a (category, key) pair by registering an automation artifact.
Once registered, future calls to :meth:`record` will proceed normally
and the strike counter resets to zero.
Args:
category: Action category.
key: Specific identifier within the category.
artifact_path: Path or identifier of the automation artifact.
"""
try:
with closing(self._connect()) as conn:
conn.execute(
"UPDATE strikes SET automation=?, blocked=0, count=0 "
"WHERE category=? AND key=?",
(artifact_path, category, key),
)
conn.commit()
logger.info(
"Three-strike: automation registered for '%s/%s'%s",
category,
key,
artifact_path,
)
except Exception as exc:
logger.warning("Failed to register automation: %s", exc)
# ── queries ───────────────────────────────────────────────────────────
def get(self, category: str, key: str) -> StrikeRecord | None:
"""Return the :class:`StrikeRecord` for (category, key), or None."""
try:
with closing(self._connect()) as conn:
row = conn.execute(
"SELECT * FROM strikes WHERE category=? AND key=?",
(category, key),
).fetchone()
if row is None:
return None
return StrikeRecord(
category=row["category"],
key=row["key"],
count=row["count"],
blocked=bool(row["blocked"]),
automation=row["automation"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
)
except Exception as exc:
logger.warning("Failed to query strike record: %s", exc)
return None
def list_blocked(self) -> list[StrikeRecord]:
"""Return all currently-blocked (category, key) pairs."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strikes WHERE blocked=1 ORDER BY last_seen DESC"
).fetchall()
return [
StrikeRecord(
category=r["category"],
key=r["key"],
count=r["count"],
blocked=True,
automation=r["automation"],
first_seen=r["first_seen"],
last_seen=r["last_seen"],
)
for r in rows
]
except Exception as exc:
logger.warning("Failed to query blocked strikes: %s", exc)
return []
def list_all(self) -> list[StrikeRecord]:
"""Return all strike records ordered by last seen (most recent first)."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strikes ORDER BY last_seen DESC"
).fetchall()
return [
StrikeRecord(
category=r["category"],
key=r["key"],
count=r["count"],
blocked=bool(r["blocked"]),
automation=r["automation"],
first_seen=r["first_seen"],
last_seen=r["last_seen"],
)
for r in rows
]
except Exception as exc:
logger.warning("Failed to list strike records: %s", exc)
return []
def get_events(self, category: str, key: str, limit: int = 50) -> list[dict]:
"""Return the individual strike events for (category, key)."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strike_events WHERE category=? AND key=? "
"ORDER BY timestamp DESC LIMIT ?",
(category, key, limit),
).fetchall()
return [
{
"strike_num": r["strike_num"],
"timestamp": r["timestamp"],
"metadata": json.loads(r["metadata"]) if r["metadata"] else {},
}
for r in rows
]
except Exception as exc:
logger.warning("Failed to query strike events: %s", exc)
return []
# ── Falsework checklist helper ────────────────────────────────────────────────
def falsework_check(checklist: FalseworkChecklist) -> None:
"""Enforce the Falsework Checklist before a cloud API call.
Raises :exc:`ValueError` listing all unanswered questions if the checklist
does not pass.
Usage::
checklist = FalseworkChecklist(
durable_artifact="embedding vectors for UI element foo",
artifact_storage_path="data/vlm/foo_embeddings.json",
local_rule_or_cache="vlm_cache",
will_repeat=False,
sovereignty_delta="eliminates repeated VLM call",
)
falsework_check(checklist) # raises ValueError if incomplete
"""
errors = checklist.validate()
if errors:
raise ValueError(
"Falsework Checklist incomplete — answer all questions before "
"making a cloud API call:\n" + "\n".join(f"{e}" for e in errors)
)
# ── Module-level singleton ────────────────────────────────────────────────────
_detector: ThreeStrikeStore | None = None
def get_detector() -> ThreeStrikeStore:
"""Return the module-level :class:`ThreeStrikeStore`, creating it once."""
global _detector
if _detector is None:
_detector = ThreeStrikeStore()
return _detector

View File

@@ -1,48 +1,532 @@
"""Tool registry, full toolkit construction, and tool catalog.
"""Tool integration for the agent swarm.
Provides:
- Internal _register_* helpers for wiring tools into toolkits
- create_full_toolkit (orchestrator toolkit)
- create_experiment_tools (Lab agent toolkit)
- AGENT_TOOLKITS / get_tools_for_agent registry
- get_all_available_tools catalog
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
"""
from __future__ import annotations
import ast
import logging
import math
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
calculator,
consult_grok,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
from config import settings
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
# ---------------------------------------------------------------------------
# Internal _register_* helpers
# ---------------------------------------------------------------------------
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: Toolkit
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def _make_smart_read_file(file_tools: FileTools) -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
import subprocess
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
@@ -233,11 +717,6 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
raise
# ---------------------------------------------------------------------------
# Full toolkit factories
# ---------------------------------------------------------------------------
def create_full_toolkit(base_dir: str | Path | None = None):
"""Create a full toolkit with all available tools (for the orchestrator).
@@ -248,7 +727,6 @@ def create_full_toolkit(base_dir: str | Path | None = None):
# Return None when tools aren't available (tests)
return None
from config import settings
from timmy.tool_safety import DANGEROUS_TOOLS
toolkit = Toolkit(name="full")
@@ -330,24 +808,6 @@ def create_experiment_tools(base_dir: str | Path | None = None):
return toolkit
# ---------------------------------------------------------------------------
# Agent toolkit registry
# ---------------------------------------------------------------------------
def _create_stub_toolkit(name: str):
"""Create a minimal Agno toolkit for creative agents.
Creative agents use their own dedicated tool modules rather than
Agno-wrapped functions. This stub ensures AGENT_TOOLKITS has an
entry so ToolExecutor doesn't fall back to the full toolkit.
"""
if not _AGNO_TOOLS_AVAILABLE:
return None
toolkit = Toolkit(name=name)
return toolkit
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
@@ -363,7 +823,20 @@ AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
}
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "Toolkit | None":
def _create_stub_toolkit(name: str):
"""Create a minimal Agno toolkit for creative agents.
Creative agents use their own dedicated tool modules rather than
Agno-wrapped functions. This stub ensures AGENT_TOOLKITS has an
entry so ToolExecutor doesn't fall back to the full toolkit.
"""
if not _AGNO_TOOLS_AVAILABLE:
return None
toolkit = Toolkit(name=name)
return toolkit
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
"""Get the appropriate toolkit for an agent.
Args:
@@ -379,16 +852,11 @@ def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "T
return None
# Backward-compat aliases
# Backward-compat alias
get_tools_for_persona = get_tools_for_agent
PERSONA_TOOLKITS = AGENT_TOOLKITS
# ---------------------------------------------------------------------------
# Tool catalog
# ---------------------------------------------------------------------------
def _core_tool_catalog() -> dict:
"""Return core file and execution tools catalog entries."""
return {

View File

@@ -1,94 +0,0 @@
"""Tool integration for the agent swarm.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
Sub-modules:
- _base: shared types, tracking state
- file_tools: file-operation toolkit factories (Echo, Quill, Seer)
- system_tools: calculator, AI tools, code/devops toolkit factories
- _registry: full toolkit construction, agent registry, tool catalog
"""
# Re-export everything for backward compatibility — callers that do
# ``from timmy.tools import <symbol>`` continue to work unchanged.
from timmy.tools._base import (
AgentTools,
PersonaTools,
ToolStats,
_AGNO_TOOLS_AVAILABLE,
_ImportError,
_TOOL_USAGE,
_track_tool_usage,
get_tool_stats,
)
from timmy.tools._registry import (
AGENT_TOOLKITS,
PERSONA_TOOLKITS,
_create_stub_toolkit,
_merge_catalog,
create_experiment_tools,
create_full_toolkit,
get_all_available_tools,
get_tools_for_agent,
get_tools_for_persona,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
_safe_eval,
calculator,
consult_grok,
create_aider_tool,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
__all__ = [
# _base
"AgentTools",
"PersonaTools",
"ToolStats",
"_AGNO_TOOLS_AVAILABLE",
"_ImportError",
"_TOOL_USAGE",
"_track_tool_usage",
"get_tool_stats",
# file_tools
"_make_smart_read_file",
"create_data_tools",
"create_research_tools",
"create_writing_tools",
# system_tools
"_safe_eval",
"calculator",
"consult_grok",
"create_aider_tool",
"create_code_tools",
"create_devops_tools",
"create_security_tools",
"web_fetch",
# _registry
"AGENT_TOOLKITS",
"PERSONA_TOOLKITS",
"_create_stub_toolkit",
"_merge_catalog",
"create_experiment_tools",
"create_full_toolkit",
"get_all_available_tools",
"get_tools_for_agent",
"get_tools_for_persona",
]

View File

@@ -1,90 +0,0 @@
"""Base types, shared state, and tracking for the Timmy tool system."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: "Toolkit"
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats

View File

@@ -1,121 +0,0 @@
"""File operation tools and agent toolkit factories for file-heavy agents.
Provides:
- Smart read_file wrapper (auto-lists directories)
- Toolkit factories for Echo (research), Quill (writing), Seer (data)
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
Toolkit,
)
logger = logging.getLogger(__name__)
def _make_smart_read_file(file_tools: "FileTools") -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit

View File

@@ -1,357 +0,0 @@
"""System, calculation, and AI consultation tools for Timmy agents.
Provides:
- Safe AST-based calculator
- consult_grok (xAI frontier reasoning)
- web_fetch (content extraction)
- Toolkit factories for Forge (code), Mace (security), Helm (devops)
"""
from __future__ import annotations
import ast
import logging
import math
import subprocess
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import _make_smart_read_file
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend — use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit

View File

@@ -1512,195 +1512,3 @@ class TestTrySingleProvider:
assert len(errors) == 1
assert "boom" in errors[0]
assert provider.metrics.failed_requests == 1
class TestComplexityRouting:
"""Tests for Qwen3-8B / Qwen3-14B dual-model routing (issue #1065)."""
def _make_dual_model_provider(self) -> Provider:
"""Build an Ollama provider with both Qwen3 models registered."""
return Provider(
name="ollama-local",
type="ollama",
enabled=True,
priority=1,
url="http://localhost:11434",
models=[
{
"name": "qwen3:8b",
"capabilities": ["text", "tools", "json", "streaming", "routine"],
},
{
"name": "qwen3:14b",
"default": True,
"capabilities": ["text", "tools", "json", "streaming", "complex", "reasoning"],
},
],
)
def test_get_model_for_complexity_simple_returns_8b(self):
"""Simple tasks should select the model with 'routine' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model == "qwen3:8b"
def test_get_model_for_complexity_complex_returns_14b(self):
"""Complex tasks should select the model with 'complex' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.COMPLEX)
assert model == "qwen3:14b"
def test_get_model_for_complexity_returns_none_when_no_match(self):
"""Returns None when provider has no matching model in chain."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {} # empty chains
provider = Provider(
name="test",
type="ollama",
enabled=True,
priority=1,
models=[{"name": "llama3.2:3b", "default": True, "capabilities": ["text"]}],
)
# No 'routine' or 'complex' model available
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model is None
@pytest.mark.asyncio
async def test_complete_with_simple_hint_routes_to_8b(self):
"""complexity_hint='simple' should use qwen3:8b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "fast answer", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
complexity_hint="simple",
)
assert result["model"] == "qwen3:8b"
assert result["complexity"] == "simple"
@pytest.mark.asyncio
async def test_complete_with_complex_hint_routes_to_14b(self):
"""complexity_hint='complex' should use qwen3:14b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "detailed answer", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "review this PR"}],
complexity_hint="complex",
)
assert result["model"] == "qwen3:14b"
assert result["complexity"] == "complex"
@pytest.mark.asyncio
async def test_explicit_model_bypasses_complexity_routing(self):
"""When model is explicitly provided, complexity routing is skipped."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "response", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
model="qwen3:14b", # explicit override
)
# Explicit model wins — complexity field is None
assert result["model"] == "qwen3:14b"
assert result["complexity"] is None
@pytest.mark.asyncio
async def test_auto_classification_routes_simple_message(self):
"""Short, simple messages should auto-classify as SIMPLE → 8B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
# no complexity_hint — auto-classify
)
assert result["complexity"] == "simple"
assert result["model"] == "qwen3:8b"
@pytest.mark.asyncio
async def test_auto_classification_routes_complex_message(self):
"""Complex messages should auto-classify → 14B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "deep analysis", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "analyze and prioritize the backlog"}],
)
assert result["complexity"] == "complex"
assert result["model"] == "qwen3:14b"
@pytest.mark.asyncio
async def test_invalid_complexity_hint_falls_back_to_auto(self):
"""Invalid complexity_hint should log a warning and auto-classify."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
# Should not raise
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
complexity_hint="INVALID_HINT",
)
assert result["complexity"] in ("simple", "complex") # auto-classified

View File

@@ -1,134 +0,0 @@
"""Tests for Qwen3 dual-model task complexity classifier."""
import pytest
from infrastructure.router.classifier import TaskComplexity, classify_task
class TestClassifyTask:
"""Tests for classify_task heuristics."""
# ── Simple / routine tasks ──────────────────────────────────────────────
def test_empty_messages_is_simple(self):
assert classify_task([]) == TaskComplexity.SIMPLE
def test_no_user_content_is_simple(self):
messages = [{"role": "system", "content": "You are Timmy."}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_status_query_is_simple(self):
messages = [{"role": "user", "content": "status"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_list_command_is_simple(self):
messages = [{"role": "user", "content": "list all tasks"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_get_command_is_simple(self):
messages = [{"role": "user", "content": "get the latest log entry"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_message_under_threshold_is_simple(self):
messages = [{"role": "user", "content": "run the build"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_affirmation_is_simple(self):
messages = [{"role": "user", "content": "yes"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
# ── Complex / quality-sensitive tasks ──────────────────────────────────
def test_plan_keyword_is_complex(self):
messages = [{"role": "user", "content": "plan the sprint"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_review_keyword_is_complex(self):
messages = [{"role": "user", "content": "review this code"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyze_keyword_is_complex(self):
messages = [{"role": "user", "content": "analyze performance"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_triage_keyword_is_complex(self):
messages = [{"role": "user", "content": "triage the open issues"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_refactor_keyword_is_complex(self):
messages = [{"role": "user", "content": "refactor the auth module"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_explain_keyword_is_complex(self):
messages = [{"role": "user", "content": "explain how the router works"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_prioritize_keyword_is_complex(self):
messages = [{"role": "user", "content": "prioritize the backlog"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_long_message_is_complex(self):
long_msg = "do something " * 50 # > 500 chars
messages = [{"role": "user", "content": long_msg}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_numbered_list_is_complex(self):
messages = [
{
"role": "user",
"content": "1. Read the file 2. Analyze it 3. Write a report",
}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_code_block_is_complex(self):
messages = [
{"role": "user", "content": "Here is the code:\n```python\nprint('hello')\n```"}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_deep_conversation_is_complex(self):
messages = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "now do the thing"},
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyse_british_spelling_is_complex(self):
messages = [{"role": "user", "content": "analyse this dataset"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_non_string_content_is_ignored(self):
"""Non-string content should not crash the classifier."""
messages = [{"role": "user", "content": ["part1", "part2"]}]
# Should not raise; result doesn't matter — just must not blow up
result = classify_task(messages)
assert isinstance(result, TaskComplexity)
def test_system_message_not_counted_as_user(self):
"""System message alone should not trigger complex keywords."""
messages = [
{"role": "system", "content": "analyze everything carefully"},
{"role": "user", "content": "yes"},
]
# "analyze" is in system message (not user) — user says "yes" → simple
assert classify_task(messages) == TaskComplexity.SIMPLE
class TestTaskComplexityEnum:
"""Tests for TaskComplexity enum values."""
def test_simple_value(self):
assert TaskComplexity.SIMPLE.value == "simple"
def test_complex_value(self):
assert TaskComplexity.COMPLEX.value == "complex"
def test_lookup_by_value(self):
assert TaskComplexity("simple") == TaskComplexity.SIMPLE
assert TaskComplexity("complex") == TaskComplexity.COMPLEX

View File

@@ -176,176 +176,3 @@ class TestExtractMetric:
output = "loss: 0.45\nloss: 0.32"
assert _extract_metric(output, "loss") == pytest.approx(0.32)
class TestExtractPassRate:
"""Tests for _extract_pass_rate()."""
def test_all_passing(self):
from timmy.autoresearch import _extract_pass_rate
output = "5 passed in 1.23s"
assert _extract_pass_rate(output) == pytest.approx(100.0)
def test_mixed_results(self):
from timmy.autoresearch import _extract_pass_rate
output = "8 passed, 2 failed in 2.00s"
assert _extract_pass_rate(output) == pytest.approx(80.0)
def test_no_pytest_output(self):
from timmy.autoresearch import _extract_pass_rate
assert _extract_pass_rate("no test results here") is None
class TestExtractCoverage:
"""Tests for _extract_coverage()."""
def test_total_line(self):
from timmy.autoresearch import _extract_coverage
output = "TOTAL 1234 100 92%"
assert _extract_coverage(output) == pytest.approx(92.0)
def test_no_coverage(self):
from timmy.autoresearch import _extract_coverage
assert _extract_coverage("no coverage data") is None
class TestSystemExperiment:
"""Tests for SystemExperiment class."""
def test_generate_hypothesis_with_program(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="src/timmy/agent.py")
hyp = exp.generate_hypothesis("Fix memory leak in session handling")
assert "src/timmy/agent.py" in hyp
assert "Fix memory leak" in hyp
def test_generate_hypothesis_fallback(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="src/timmy/agent.py", metric="coverage")
hyp = exp.generate_hypothesis("")
assert "src/timmy/agent.py" in hyp
assert "coverage" in hyp
def test_generate_hypothesis_skips_comment_lines(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="mymodule.py")
hyp = exp.generate_hypothesis("# comment\nActual direction here")
assert "Actual direction" in hyp
def test_evaluate_baseline(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="unit_pass_rate")
result = exp.evaluate(85.0, None)
assert "Baseline" in result
assert "85" in result
def test_evaluate_improvement_higher_is_better(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="unit_pass_rate")
result = exp.evaluate(90.0, 85.0)
assert "Improvement" in result
def test_evaluate_regression_higher_is_better(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="coverage")
result = exp.evaluate(80.0, 85.0)
assert "Regression" in result
def test_evaluate_none_metric(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py")
result = exp.evaluate(None, 80.0)
assert "Indeterminate" in result
def test_evaluate_lower_is_better(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="val_bpb")
result = exp.evaluate(1.1, 1.2)
assert "Improvement" in result
def test_is_improvement_higher_is_better(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="unit_pass_rate")
assert exp.is_improvement(90.0, 85.0) is True
assert exp.is_improvement(80.0, 85.0) is False
def test_is_improvement_lower_is_better(self):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", metric="val_bpb")
assert exp.is_improvement(1.1, 1.2) is True
assert exp.is_improvement(1.3, 1.2) is False
def test_run_tox_success(self, tmp_path):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout="8 passed in 1.23s",
stderr="",
)
result = exp.run_tox(tox_env="unit")
assert result["success"] is True
assert result["metric"] == pytest.approx(100.0)
def test_run_tox_timeout(self, tmp_path):
import subprocess
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", budget_minutes=1, workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.side_effect = subprocess.TimeoutExpired(cmd="tox", timeout=60)
result = exp.run_tox()
assert result["success"] is False
assert "Budget exceeded" in result["error"]
def test_apply_edit_aider_not_installed(self, tmp_path):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.side_effect = FileNotFoundError("aider not found")
result = exp.apply_edit("some hypothesis")
assert "not available" in result
def test_commit_changes_success(self, tmp_path):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
success = exp.commit_changes("test commit")
assert success is True
def test_revert_changes_failure(self, tmp_path):
import subprocess
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.side_effect = subprocess.CalledProcessError(1, "git")
success = exp.revert_changes()
assert success is False

View File

@@ -1,94 +0,0 @@
"""Tests for the `timmy learn` CLI command (autoresearch entry point)."""
from unittest.mock import MagicMock, patch
from typer.testing import CliRunner
from timmy.cli import app
runner = CliRunner()
class TestLearnCommand:
"""Tests for `timmy learn`."""
def test_requires_target(self):
result = runner.invoke(app, ["learn"])
assert result.exit_code != 0
assert "target" in result.output.lower() or "target" in (result.stderr or "").lower()
def test_dry_run_shows_hypothesis_no_tox(self, tmp_path):
program_file = tmp_path / "program.md"
program_file.write_text("Improve logging coverage in agent module")
with patch("timmy.autoresearch.subprocess.run") as mock_run:
result = runner.invoke(
app,
[
"learn",
"--target",
"src/timmy/agent.py",
"--program",
str(program_file),
"--max-experiments",
"2",
"--dry-run",
],
)
assert result.exit_code == 0
# tox should never be called in dry-run
mock_run.assert_not_called()
assert "agent.py" in result.output
def test_missing_program_md_warns_but_continues(self, tmp_path):
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="3 passed", stderr="")
result = runner.invoke(
app,
[
"learn",
"--target",
"src/timmy/agent.py",
"--program",
str(tmp_path / "nonexistent.md"),
"--max-experiments",
"1",
"--dry-run",
],
)
assert result.exit_code == 0
def test_dry_run_prints_max_experiments_hypotheses(self, tmp_path):
program_file = tmp_path / "program.md"
program_file.write_text("Fix edge case in parser")
result = runner.invoke(
app,
[
"learn",
"--target",
"src/timmy/parser.py",
"--program",
str(program_file),
"--max-experiments",
"3",
"--dry-run",
],
)
assert result.exit_code == 0
# Should show 3 experiment headers
assert result.output.count("[1/3]") == 1
assert result.output.count("[2/3]") == 1
assert result.output.count("[3/3]") == 1
def test_help_text_present(self):
result = runner.invoke(app, ["learn", "--help"])
assert result.exit_code == 0
assert "--target" in result.output
assert "--metric" in result.output
assert "--budget" in result.output
assert "--max-experiments" in result.output
assert "--dry-run" in result.output

View File

@@ -16,7 +16,7 @@ from timmy.memory_system import (
memory_forget,
memory_read,
memory_search,
memory_store,
memory_write,
)
@@ -490,7 +490,7 @@ class TestMemorySearch:
assert isinstance(result, str)
def test_none_top_k_handled(self):
result = memory_search("test", limit=None)
result = memory_search("test", top_k=None)
assert isinstance(result, str)
def test_basic_search_returns_string(self):
@@ -521,12 +521,12 @@ class TestMemoryRead:
assert isinstance(result, str)
class TestMemoryStore:
"""Test module-level memory_store function."""
class TestMemoryWrite:
"""Test module-level memory_write function."""
@pytest.fixture(autouse=True)
def mock_vector_store(self):
"""Mock vector_store functions for memory_store tests."""
"""Mock vector_store functions for memory_write tests."""
# Patch where it's imported from, not where it's used
with (
patch("timmy.memory_system.search_memories") as mock_search,
@@ -542,83 +542,75 @@ class TestMemoryStore:
yield {"search": mock_search, "store": mock_store}
def test_memory_store_empty_report(self):
"""Test that empty report returns error message."""
result = memory_store(topic="test", report="")
def test_memory_write_empty_content(self):
"""Test that empty content returns error message."""
result = memory_write("")
assert "empty" in result.lower()
def test_memory_store_whitespace_only(self):
"""Test that whitespace-only report returns error."""
result = memory_store(topic="test", report=" \n\t ")
def test_memory_write_whitespace_only(self):
"""Test that whitespace-only content returns error."""
result = memory_write(" \n\t ")
assert "empty" in result.lower()
def test_memory_store_valid_content(self, mock_vector_store):
def test_memory_write_valid_content(self, mock_vector_store):
"""Test writing valid content."""
result = memory_store(topic="fact about Timmy", report="Remember this important fact.")
result = memory_write("Remember this important fact.")
assert "stored" in result.lower() or "memory" in result.lower()
mock_vector_store["store"].assert_called_once()
def test_memory_store_dedup_for_facts_or_research(self, mock_vector_store):
"""Test that duplicate facts or research are skipped."""
def test_memory_write_dedup_for_facts(self, mock_vector_store):
"""Test that duplicate facts are skipped."""
# Simulate existing similar fact
mock_entry = MagicMock()
mock_entry.id = "existing-id"
mock_vector_store["search"].return_value = [mock_entry]
# Test with 'fact'
result = memory_store(topic="Similar fact", report="Similar fact text", type="fact")
result = memory_write("Similar fact text", context_type="fact")
assert "similar" in result.lower() or "duplicate" in result.lower()
mock_vector_store["store"].assert_not_called()
mock_vector_store["store"].reset_mock()
# Test with 'research'
result = memory_store(topic="Similar research", report="Similar research content", type="research")
assert "similar" in result.lower() or "duplicate" in result.lower()
mock_vector_store["store"].assert_not_called()
def test_memory_store_no_dedup_for_conversation(self, mock_vector_store):
def test_memory_write_no_dedup_for_conversation(self, mock_vector_store):
"""Test that conversation entries are not deduplicated."""
# Even with existing entries, conversations should be stored
mock_entry = MagicMock()
mock_entry.id = "existing-id"
mock_vector_store["search"].return_value = [mock_entry]
memory_store(topic="Conversation", report="Conversation text", type="conversation")
memory_write("Conversation text", context_type="conversation")
# Should still store (no duplicate check for non-fact)
mock_vector_store["store"].assert_called_once()
def test_memory_store_invalid_type_defaults_to_research(self, mock_vector_store):
"""Test that invalid type defaults to 'research'."""
memory_store(topic="Invalid type test", report="Some content", type="invalid_type")
# Should still succeed, using "research" as default
def test_memory_write_invalid_context_type(self, mock_vector_store):
"""Test that invalid context_type defaults to 'fact'."""
memory_write("Some content", context_type="invalid_type")
# Should still succeed, using "fact" as default
mock_vector_store["store"].assert_called_once()
call_kwargs = mock_vector_store["store"].call_args.kwargs
assert call_kwargs.get("context_type") == "research"
assert call_kwargs.get("context_type") == "fact"
def test_memory_store_valid_types(self, mock_vector_store):
def test_memory_write_valid_context_types(self, mock_vector_store):
"""Test all valid context types."""
valid_types = ["fact", "conversation", "document", "research"]
valid_types = ["fact", "conversation", "document"]
for ctx_type in valid_types:
mock_vector_store["store"].reset_mock()
memory_store(topic=f"Topic for {ctx_type}", report=f"Content for {ctx_type}", type=ctx_type)
memory_write(f"Content for {ctx_type}", context_type=ctx_type)
mock_vector_store["store"].assert_called_once()
def test_memory_store_strips_report_and_adds_topic(self, mock_vector_store):
"""Test that report is stripped of leading/trailing whitespace and combined with topic."""
memory_store(topic=" My Topic ", report=" padded content ")
def test_memory_write_strips_content(self, mock_vector_store):
"""Test that content is stripped of leading/trailing whitespace."""
memory_write(" padded content ")
call_kwargs = mock_vector_store["store"].call_args.kwargs
assert call_kwargs.get("content") == "Topic: My Topic\n\nReport: padded content"
assert call_kwargs.get("metadata") == {"topic": " My Topic "}
assert call_kwargs.get("content") == "padded content"
def test_memory_store_unicode_report(self, mock_vector_store):
def test_memory_write_unicode_content(self, mock_vector_store):
"""Test writing unicode content."""
result = memory_store(topic="Unicode", report="Unicode content: 你好世界 🎉")
result = memory_write("Unicode content: 你好世界 🎉")
assert "stored" in result.lower() or "memory" in result.lower()
def test_memory_store_handles_exception(self, mock_vector_store):
def test_memory_write_handles_exception(self, mock_vector_store):
"""Test handling of store_memory exceptions."""
mock_vector_store["store"].side_effect = Exception("DB error")
result = memory_store(topic="Failing", report="This will fail")
result = memory_write("This will fail")
assert "failed" in result.lower() or "error" in result.lower()

View File

@@ -1,332 +0,0 @@
"""Tests for the three-strike detector.
Refs: #962
"""
import pytest
from timmy.sovereignty.three_strike import (
CATEGORIES,
STRIKE_BLOCK,
STRIKE_WARNING,
FalseworkChecklist,
StrikeRecord,
ThreeStrikeError,
ThreeStrikeStore,
falsework_check,
)
@pytest.fixture
def store(tmp_path):
"""Isolated store backed by a temp DB."""
return ThreeStrikeStore(db_path=tmp_path / "test_strikes.db")
# ── Category constants ────────────────────────────────────────────────────────
class TestCategories:
@pytest.mark.unit
def test_all_categories_present(self):
expected = {
"vlm_prompt_edit",
"game_bug_review",
"parameter_tuning",
"portal_adapter_creation",
"deployment_step",
}
assert expected == CATEGORIES
@pytest.mark.unit
def test_strike_thresholds(self):
assert STRIKE_WARNING == 2
assert STRIKE_BLOCK == 3
# ── ThreeStrikeStore ──────────────────────────────────────────────────────────
class TestThreeStrikeStore:
@pytest.mark.unit
def test_first_strike_returns_record(self, store):
record = store.record("vlm_prompt_edit", "login_button")
assert isinstance(record, StrikeRecord)
assert record.count == 1
assert record.blocked is False
assert record.category == "vlm_prompt_edit"
assert record.key == "login_button"
@pytest.mark.unit
def test_second_strike_count(self, store):
store.record("vlm_prompt_edit", "login_button")
record = store.record("vlm_prompt_edit", "login_button")
assert record.count == 2
assert record.blocked is False
@pytest.mark.unit
def test_third_strike_raises(self, store):
store.record("vlm_prompt_edit", "login_button")
store.record("vlm_prompt_edit", "login_button")
with pytest.raises(ThreeStrikeError) as exc_info:
store.record("vlm_prompt_edit", "login_button")
err = exc_info.value
assert err.category == "vlm_prompt_edit"
assert err.key == "login_button"
assert err.count == 3
@pytest.mark.unit
def test_fourth_strike_still_raises(self, store):
for _ in range(3):
try:
store.record("deployment_step", "build_docker")
except ThreeStrikeError:
pass
with pytest.raises(ThreeStrikeError):
store.record("deployment_step", "build_docker")
@pytest.mark.unit
def test_different_keys_are_independent(self, store):
store.record("vlm_prompt_edit", "login_button")
store.record("vlm_prompt_edit", "login_button")
# Different key — should not be blocked
record = store.record("vlm_prompt_edit", "logout_button")
assert record.count == 1
@pytest.mark.unit
def test_different_categories_are_independent(self, store):
store.record("vlm_prompt_edit", "foo")
store.record("vlm_prompt_edit", "foo")
# Different category, same key — should not be blocked
record = store.record("game_bug_review", "foo")
assert record.count == 1
@pytest.mark.unit
def test_invalid_category_raises_value_error(self, store):
with pytest.raises(ValueError, match="Unknown category"):
store.record("nonexistent_category", "some_key")
@pytest.mark.unit
def test_metadata_stored_in_events(self, store):
store.record("parameter_tuning", "learning_rate", metadata={"value": 0.01})
events = store.get_events("parameter_tuning", "learning_rate")
assert len(events) == 1
assert events[0]["metadata"]["value"] == 0.01
@pytest.mark.unit
def test_get_returns_none_for_missing(self, store):
assert store.get("vlm_prompt_edit", "not_there") is None
@pytest.mark.unit
def test_get_returns_record(self, store):
store.record("vlm_prompt_edit", "submit_btn")
record = store.get("vlm_prompt_edit", "submit_btn")
assert record is not None
assert record.count == 1
@pytest.mark.unit
def test_list_all_empty(self, store):
assert store.list_all() == []
@pytest.mark.unit
def test_list_all_returns_records(self, store):
store.record("vlm_prompt_edit", "a")
store.record("vlm_prompt_edit", "b")
records = store.list_all()
assert len(records) == 2
@pytest.mark.unit
def test_list_blocked_empty_when_no_strikes(self, store):
assert store.list_blocked() == []
@pytest.mark.unit
def test_list_blocked_contains_blocked(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
blocked = store.list_blocked()
assert len(blocked) == 1
assert blocked[0].key == "push_image"
@pytest.mark.unit
def test_register_automation_unblocks(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
store.register_automation("deployment_step", "push_image", "scripts/push.sh")
# Should no longer raise
record = store.record("deployment_step", "push_image")
assert record.blocked is False
assert record.automation == "scripts/push.sh"
@pytest.mark.unit
def test_register_automation_resets_count(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
store.register_automation("deployment_step", "push_image", "scripts/push.sh")
# register_automation resets count to 0; one new record brings it to 1
new_record = store.record("deployment_step", "push_image")
assert new_record.count == 1
@pytest.mark.unit
def test_get_events_returns_most_recent_first(self, store):
store.record("vlm_prompt_edit", "nav", metadata={"n": 1})
store.record("vlm_prompt_edit", "nav", metadata={"n": 2})
events = store.get_events("vlm_prompt_edit", "nav")
assert len(events) == 2
# Most recent first
assert events[0]["metadata"]["n"] == 2
@pytest.mark.unit
def test_get_events_respects_limit(self, store):
for i in range(5):
try:
store.record("vlm_prompt_edit", "el")
except ThreeStrikeError:
pass
events = store.get_events("vlm_prompt_edit", "el", limit=2)
assert len(events) == 2
# ── FalseworkChecklist ────────────────────────────────────────────────────────
class TestFalseworkChecklist:
@pytest.mark.unit
def test_valid_checklist_passes(self):
cl = FalseworkChecklist(
durable_artifact="embedding vectors",
artifact_storage_path="data/embeddings.json",
local_rule_or_cache="vlm_cache",
will_repeat=False,
sovereignty_delta="eliminates repeated call",
)
assert cl.passed is True
assert cl.validate() == []
@pytest.mark.unit
def test_missing_artifact_fails(self):
cl = FalseworkChecklist(
artifact_storage_path="data/x.json",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q1" in e for e in errors)
@pytest.mark.unit
def test_missing_storage_path_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q2" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_none_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q4" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_true_requires_elimination_strategy(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=True,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q5" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_false_no_elimination_needed(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert not any("Q5" in e for e in errors)
@pytest.mark.unit
def test_missing_sovereignty_delta_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
)
errors = cl.validate()
assert any("Q6" in e for e in errors)
@pytest.mark.unit
def test_multiple_missing_fields(self):
cl = FalseworkChecklist()
errors = cl.validate()
# At minimum Q1, Q2, Q3, Q4, Q6 should be flagged
assert len(errors) >= 5
# ── falsework_check() helper ──────────────────────────────────────────────────
class TestFalseworkCheck:
@pytest.mark.unit
def test_raises_on_incomplete_checklist(self):
with pytest.raises(ValueError, match="Falsework Checklist incomplete"):
falsework_check(FalseworkChecklist())
@pytest.mark.unit
def test_passes_on_complete_checklist(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
falsework_check(cl) # should not raise
# ── ThreeStrikeError ──────────────────────────────────────────────────────────
class TestThreeStrikeError:
@pytest.mark.unit
def test_attributes(self):
err = ThreeStrikeError("vlm_prompt_edit", "foo", 3)
assert err.category == "vlm_prompt_edit"
assert err.key == "foo"
assert err.count == 3
@pytest.mark.unit
def test_message_contains_details(self):
err = ThreeStrikeError("deployment_step", "build", 4)
msg = str(err)
assert "deployment_step" in msg
assert "build" in msg
assert "4" in msg

View File

@@ -1,82 +0,0 @@
"""Integration tests for the three-strike dashboard routes.
Refs: #962
"""
import pytest
class TestThreeStrikeRoutes:
@pytest.mark.unit
def test_list_strikes_returns_200(self, client):
response = client.get("/sovereignty/three-strike")
assert response.status_code == 200
data = response.json()
assert "records" in data
assert "categories" in data
@pytest.mark.unit
def test_list_blocked_returns_200(self, client):
response = client.get("/sovereignty/three-strike/blocked")
assert response.status_code == 200
data = response.json()
assert "blocked" in data
@pytest.mark.unit
def test_record_strike_first(self, client):
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "vlm_prompt_edit", "key": "test_btn"},
)
assert response.status_code == 200
data = response.json()
assert data["count"] == 1
assert data["blocked"] is False
@pytest.mark.unit
def test_record_invalid_category_returns_422(self, client):
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "not_a_real_category", "key": "x"},
)
assert response.status_code == 422
@pytest.mark.unit
def test_third_strike_returns_409(self, client):
for _ in range(2):
client.post(
"/sovereignty/three-strike/record",
json={"category": "deployment_step", "key": "push_route_test"},
)
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "deployment_step", "key": "push_route_test"},
)
assert response.status_code == 409
data = response.json()
assert data["detail"]["error"] == "three_strike_block"
assert data["detail"]["count"] == 3
@pytest.mark.unit
def test_register_automation_returns_success(self, client):
response = client.post(
"/sovereignty/three-strike/deployment_step/some_key/automation",
json={"artifact_path": "scripts/auto.sh"},
)
assert response.status_code == 200
assert response.json()["success"] is True
@pytest.mark.unit
def test_get_events_returns_200(self, client):
client.post(
"/sovereignty/three-strike/record",
json={"category": "vlm_prompt_edit", "key": "events_test_key"},
)
response = client.get(
"/sovereignty/three-strike/vlm_prompt_edit/events_test_key/events"
)
assert response.status_code == 200
data = response.json()
assert data["category"] == "vlm_prompt_edit"
assert data["key"] == "events_test_key"
assert len(data["events"]) >= 1

View File

@@ -1,569 +0,0 @@
"""Unit tests for src/timmy/paperclip.py.
Refs #1236
"""
from __future__ import annotations
import asyncio
import sys
from types import ModuleType
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
# ── Stub serpapi before any import of paperclip (it imports research_tools) ───
_serpapi_stub = ModuleType("serpapi")
_google_search_mock = MagicMock()
_serpapi_stub.GoogleSearch = _google_search_mock
sys.modules.setdefault("serpapi", _serpapi_stub)
pytestmark = pytest.mark.unit
# ── PaperclipTask ─────────────────────────────────────────────────────────────
class TestPaperclipTask:
"""PaperclipTask dataclass holds task data."""
def test_task_creation(self):
from timmy.paperclip import PaperclipTask
task = PaperclipTask(id="task-123", kind="research", context={"key": "value"})
assert task.id == "task-123"
assert task.kind == "research"
assert task.context == {"key": "value"}
def test_task_creation_empty_context(self):
from timmy.paperclip import PaperclipTask
task = PaperclipTask(id="task-456", kind="other", context={})
assert task.id == "task-456"
assert task.kind == "other"
assert task.context == {}
# ── PaperclipClient ───────────────────────────────────────────────────────────
class TestPaperclipClient:
"""PaperclipClient interacts with the Paperclip API."""
def test_init_uses_settings(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_agent_id = "agent-123"
mock_settings.paperclip_company_id = "company-456"
mock_settings.paperclip_timeout = 45
client = PaperclipClient()
assert client.base_url == "http://test.example:3100"
assert client.api_key == "test-api-key"
assert client.agent_id == "agent-123"
assert client.company_id == "company-456"
assert client.timeout == 45
@pytest.mark.asyncio
async def test_get_tasks_makes_correct_request(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_agent_id = "agent-123"
mock_settings.paperclip_company_id = "company-456"
mock_settings.paperclip_timeout = 30
client = PaperclipClient()
mock_response = MagicMock()
mock_response.json.return_value = [
{"id": "task-1", "kind": "research", "context": {"issue_number": 42}},
{"id": "task-2", "kind": "other", "context": {}},
]
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_client):
tasks = await client.get_tasks()
mock_client.get.assert_called_once_with(
"http://test.example:3100/api/tasks",
headers={"Authorization": "Bearer test-api-key"},
params={
"agent_id": "agent-123",
"company_id": "company-456",
"status": "queued",
},
)
mock_response.raise_for_status.assert_called_once()
assert len(tasks) == 2
assert tasks[0].id == "task-1"
assert tasks[0].kind == "research"
assert tasks[1].id == "task-2"
@pytest.mark.asyncio
async def test_get_tasks_empty_response(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_agent_id = "agent-123"
mock_settings.paperclip_company_id = "company-456"
mock_settings.paperclip_timeout = 30
client = PaperclipClient()
mock_response = MagicMock()
mock_response.json.return_value = []
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_client):
tasks = await client.get_tasks()
assert tasks == []
@pytest.mark.asyncio
async def test_get_tasks_raises_on_http_error(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_agent_id = "agent-123"
mock_settings.paperclip_company_id = "company-456"
mock_settings.paperclip_timeout = 30
client = PaperclipClient()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(side_effect=httpx.HTTPError("Connection failed"))
with patch("httpx.AsyncClient", return_value=mock_client):
with pytest.raises(httpx.HTTPError):
await client.get_tasks()
@pytest.mark.asyncio
async def test_update_task_status_makes_correct_request(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_timeout = 30
client = PaperclipClient()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.patch = AsyncMock(return_value=MagicMock())
with patch("httpx.AsyncClient", return_value=mock_client):
await client.update_task_status("task-123", "completed", "Task result here")
mock_client.patch.assert_called_once_with(
"http://test.example:3100/api/tasks/task-123",
headers={"Authorization": "Bearer test-api-key"},
json={"status": "completed", "result": "Task result here"},
)
@pytest.mark.asyncio
async def test_update_task_status_without_result(self):
from timmy.paperclip import PaperclipClient
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_url = "http://test.example:3100"
mock_settings.paperclip_api_key = "test-api-key"
mock_settings.paperclip_timeout = 30
client = PaperclipClient()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.patch = AsyncMock(return_value=MagicMock())
with patch("httpx.AsyncClient", return_value=mock_client):
await client.update_task_status("task-123", "running")
mock_client.patch.assert_called_once_with(
"http://test.example:3100/api/tasks/task-123",
headers={"Authorization": "Bearer test-api-key"},
json={"status": "running", "result": None},
)
# ── ResearchOrchestrator ───────────────────────────────────────────────────────
class TestResearchOrchestrator:
"""ResearchOrchestrator coordinates research tasks."""
def test_init_creates_instances(self):
from timmy.paperclip import ResearchOrchestrator
orchestrator = ResearchOrchestrator()
assert orchestrator is not None
@pytest.mark.asyncio
async def test_get_gitea_issue_makes_correct_request(self):
from timmy.paperclip import ResearchOrchestrator
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.example:3000"
mock_settings.gitea_token = "gitea-token"
orchestrator = ResearchOrchestrator()
mock_response = MagicMock()
mock_response.json.return_value = {"number": 42, "title": "Test Issue"}
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(return_value=mock_response)
with patch("httpx.AsyncClient", return_value=mock_client):
issue = await orchestrator.get_gitea_issue(42)
mock_client.get.assert_called_once_with(
"http://gitea.example:3000/api/v1/repos/owner/repo/issues/42",
headers={"Authorization": "token gitea-token"},
)
mock_response.raise_for_status.assert_called_once()
assert issue["number"] == 42
assert issue["title"] == "Test Issue"
@pytest.mark.asyncio
async def test_get_gitea_issue_raises_on_http_error(self):
from timmy.paperclip import ResearchOrchestrator
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.example:3000"
mock_settings.gitea_token = "gitea-token"
orchestrator = ResearchOrchestrator()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(side_effect=httpx.HTTPError("Not found"))
with patch("httpx.AsyncClient", return_value=mock_client):
with pytest.raises(httpx.HTTPError):
await orchestrator.get_gitea_issue(999)
@pytest.mark.asyncio
async def test_post_gitea_comment_makes_correct_request(self):
from timmy.paperclip import ResearchOrchestrator
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.example:3000"
mock_settings.gitea_token = "gitea-token"
orchestrator = ResearchOrchestrator()
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=MagicMock())
with patch("httpx.AsyncClient", return_value=mock_client):
await orchestrator.post_gitea_comment(42, "Test comment body")
mock_client.post.assert_called_once_with(
"http://gitea.example:3000/api/v1/repos/owner/repo/issues/42/comments",
headers={"Authorization": "token gitea-token"},
json={"body": "Test comment body"},
)
@pytest.mark.asyncio
async def test_run_research_pipeline_returns_report(self):
from timmy.paperclip import ResearchOrchestrator
orchestrator = ResearchOrchestrator()
mock_search_results = "Search result 1\nSearch result 2"
mock_llm_response = MagicMock()
mock_llm_response.text = "Research report summary"
mock_llm_client = MagicMock()
mock_llm_client.completion = AsyncMock(return_value=mock_llm_response)
with patch("timmy.paperclip.google_web_search", new=AsyncMock(return_value=mock_search_results)):
with patch("timmy.paperclip.get_llm_client", return_value=mock_llm_client):
report = await orchestrator.run_research_pipeline("test query")
assert report == "Research report summary"
mock_llm_client.completion.assert_called_once()
call_args = mock_llm_client.completion.call_args
# The prompt is passed as first positional arg, check it contains expected content
prompt = call_args[0][0] if call_args[0] else call_args[1].get("messages", [""])[0]
assert "Summarize" in prompt
assert "Search result 1" in prompt
@pytest.mark.asyncio
async def test_run_returns_error_when_missing_issue_number(self):
from timmy.paperclip import ResearchOrchestrator
orchestrator = ResearchOrchestrator()
result = await orchestrator.run({})
assert result == "Missing issue_number in task context"
@pytest.mark.asyncio
async def test_run_executes_full_pipeline_with_triage_results(self):
from timmy.paperclip import ResearchOrchestrator
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.example:3000"
mock_settings.gitea_token = "gitea-token"
orchestrator = ResearchOrchestrator()
mock_issue = {"number": 42, "title": "Test Research Topic"}
mock_report = "Research report content"
mock_triage_results = [
{
"action_item": MagicMock(title="Action 1"),
"gitea_issue": {"number": 101},
},
{
"action_item": MagicMock(title="Action 2"),
"gitea_issue": {"number": 102},
},
]
orchestrator.get_gitea_issue = AsyncMock(return_value=mock_issue)
orchestrator.run_research_pipeline = AsyncMock(return_value=mock_report)
orchestrator.post_gitea_comment = AsyncMock()
with patch("timmy.paperclip.triage_research_report", new=AsyncMock(return_value=mock_triage_results)):
result = await orchestrator.run({"issue_number": 42})
assert "Research complete for issue #42" in result
orchestrator.get_gitea_issue.assert_called_once_with(42)
orchestrator.run_research_pipeline.assert_called_once_with("Test Research Topic")
orchestrator.post_gitea_comment.assert_called_once()
comment_body = orchestrator.post_gitea_comment.call_args[0][1]
assert "Research complete for issue #42" in comment_body
assert "#101" in comment_body
assert "#102" in comment_body
@pytest.mark.asyncio
async def test_run_executes_full_pipeline_without_triage_results(self):
from timmy.paperclip import ResearchOrchestrator
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.example:3000"
mock_settings.gitea_token = "gitea-token"
orchestrator = ResearchOrchestrator()
mock_issue = {"number": 42, "title": "Test Research Topic"}
mock_report = "Research report content"
orchestrator.get_gitea_issue = AsyncMock(return_value=mock_issue)
orchestrator.run_research_pipeline = AsyncMock(return_value=mock_report)
orchestrator.post_gitea_comment = AsyncMock()
with patch("timmy.paperclip.triage_research_report", new=AsyncMock(return_value=[])):
result = await orchestrator.run({"issue_number": 42})
assert "Research complete for issue #42" in result
comment_body = orchestrator.post_gitea_comment.call_args[0][1]
assert "No new issues were created" in comment_body
# ── PaperclipPoller ────────────────────────────────────────────────────────────
class TestPaperclipPoller:
"""PaperclipPoller polls for and executes tasks."""
def test_init_creates_client_and_orchestrator(self):
from timmy.paperclip import PaperclipPoller
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_poll_interval = 60
poller = PaperclipPoller()
assert poller.client is not None
assert poller.orchestrator is not None
assert poller.poll_interval == 60
@pytest.mark.asyncio
async def test_poll_returns_early_when_disabled(self):
from timmy.paperclip import PaperclipPoller
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_poll_interval = 0
poller = PaperclipPoller()
poller.client.get_tasks = AsyncMock()
await poller.poll()
poller.client.get_tasks.assert_not_called()
@pytest.mark.asyncio
async def test_poll_processes_research_tasks(self):
from timmy.paperclip import PaperclipPoller, PaperclipTask
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_poll_interval = 1
poller = PaperclipPoller()
mock_task = PaperclipTask(id="task-1", kind="research", context={"issue_number": 42})
poller.client.get_tasks = AsyncMock(return_value=[mock_task])
poller.run_research_task = AsyncMock()
# Stop after first iteration
call_count = 0
async def mock_sleep(duration):
nonlocal call_count
call_count += 1
if call_count >= 1:
raise asyncio.CancelledError("Stop the loop")
import asyncio
with patch("asyncio.sleep", mock_sleep):
with pytest.raises(asyncio.CancelledError):
await poller.poll()
poller.client.get_tasks.assert_called_once()
poller.run_research_task.assert_called_once_with(mock_task)
@pytest.mark.asyncio
async def test_poll_logs_http_error_and_continues(self, caplog):
import logging
from timmy.paperclip import PaperclipPoller
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_poll_interval = 1
poller = PaperclipPoller()
poller.client.get_tasks = AsyncMock(side_effect=httpx.HTTPError("Connection failed"))
call_count = 0
async def mock_sleep(duration):
nonlocal call_count
call_count += 1
if call_count >= 1:
raise asyncio.CancelledError("Stop the loop")
with patch("asyncio.sleep", mock_sleep):
with caplog.at_level(logging.WARNING, logger="timmy.paperclip"):
with pytest.raises(asyncio.CancelledError):
await poller.poll()
assert any("Error polling Paperclip" in rec.message for rec in caplog.records)
@pytest.mark.asyncio
async def test_run_research_task_success(self):
from timmy.paperclip import PaperclipPoller, PaperclipTask
poller = PaperclipPoller()
mock_task = PaperclipTask(id="task-1", kind="research", context={"issue_number": 42})
poller.client.update_task_status = AsyncMock()
poller.orchestrator.run = AsyncMock(return_value="Research completed successfully")
await poller.run_research_task(mock_task)
assert poller.client.update_task_status.call_count == 2
poller.client.update_task_status.assert_any_call("task-1", "running")
poller.client.update_task_status.assert_any_call("task-1", "completed", "Research completed successfully")
poller.orchestrator.run.assert_called_once_with({"issue_number": 42})
@pytest.mark.asyncio
async def test_run_research_task_failure(self, caplog):
import logging
from timmy.paperclip import PaperclipPoller, PaperclipTask
poller = PaperclipPoller()
mock_task = PaperclipTask(id="task-1", kind="research", context={"issue_number": 42})
poller.client.update_task_status = AsyncMock()
poller.orchestrator.run = AsyncMock(side_effect=Exception("Something went wrong"))
with caplog.at_level(logging.ERROR, logger="timmy.paperclip"):
await poller.run_research_task(mock_task)
assert poller.client.update_task_status.call_count == 2
poller.client.update_task_status.assert_any_call("task-1", "running")
poller.client.update_task_status.assert_any_call("task-1", "failed", "Something went wrong")
assert any("Error running research task" in rec.message for rec in caplog.records)
# ── start_paperclip_poller ─────────────────────────────────────────────────────
class TestStartPaperclipPoller:
"""start_paperclip_poller creates and starts the poller."""
@pytest.mark.asyncio
async def test_starts_poller_when_enabled(self):
from timmy.paperclip import start_paperclip_poller
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
mock_poller = MagicMock()
mock_poller.poll = AsyncMock()
created_tasks = []
original_create_task = asyncio.create_task
def capture_create_task(coro):
created_tasks.append(coro)
return original_create_task(coro)
with patch("timmy.paperclip.PaperclipPoller", return_value=mock_poller):
with patch("asyncio.create_task", side_effect=capture_create_task):
await start_paperclip_poller()
assert len(created_tasks) == 1
@pytest.mark.asyncio
async def test_does_nothing_when_disabled(self):
from timmy.paperclip import start_paperclip_poller
with patch("timmy.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = False
with patch("timmy.paperclip.PaperclipPoller") as mock_poller_class:
with patch("asyncio.create_task") as mock_create_task:
await start_paperclip_poller()
mock_poller_class.assert_not_called()
mock_create_task.assert_not_called()

View File

@@ -1,149 +0,0 @@
"""Unit tests for src/timmy/research_tools.py.
Refs #1237
"""
from __future__ import annotations
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
# ── Stub serpapi before any import of research_tools ─────────────────────────
_serpapi_stub = ModuleType("serpapi")
_google_search_mock = MagicMock()
_serpapi_stub.GoogleSearch = _google_search_mock
sys.modules.setdefault("serpapi", _serpapi_stub)
# ── google_web_search ─────────────────────────────────────────────────────────
class TestGoogleWebSearch:
"""google_web_search returns results or degrades gracefully."""
@pytest.mark.asyncio
async def test_returns_empty_string_when_no_api_key(self, monkeypatch):
monkeypatch.delenv("SERPAPI_API_KEY", raising=False)
from timmy.research_tools import google_web_search
result = await google_web_search("test query")
assert result == ""
@pytest.mark.asyncio
async def test_logs_warning_when_no_api_key(self, monkeypatch, caplog):
import logging
monkeypatch.delenv("SERPAPI_API_KEY", raising=False)
from timmy.research_tools import google_web_search
with caplog.at_level(logging.WARNING, logger="timmy.research_tools"):
await google_web_search("test query")
assert any("SERPAPI_API_KEY" in rec.message for rec in caplog.records)
@pytest.mark.asyncio
async def test_calls_google_search_with_api_key(self, monkeypatch):
monkeypatch.setenv("SERPAPI_API_KEY", "fake-key-123")
mock_instance = MagicMock()
mock_instance.get_dict.return_value = {"organic_results": [{"title": "Result"}]}
with patch("timmy.research_tools.GoogleSearch", return_value=mock_instance) as mock_cls:
from timmy.research_tools import google_web_search
result = await google_web_search("hello world")
mock_cls.assert_called_once()
call_params = mock_cls.call_args[0][0]
assert call_params["q"] == "hello world"
assert call_params["api_key"] == "fake-key-123"
mock_instance.get_dict.assert_called_once()
assert "organic_results" in result
@pytest.mark.asyncio
async def test_returns_string_result(self, monkeypatch):
monkeypatch.setenv("SERPAPI_API_KEY", "key")
mock_instance = MagicMock()
mock_instance.get_dict.return_value = {"answer": 42}
with patch("timmy.research_tools.GoogleSearch", return_value=mock_instance):
from timmy.research_tools import google_web_search
result = await google_web_search("query")
assert isinstance(result, str)
@pytest.mark.asyncio
async def test_passes_query_to_params(self, monkeypatch):
monkeypatch.setenv("SERPAPI_API_KEY", "k")
mock_instance = MagicMock()
mock_instance.get_dict.return_value = {}
with patch("timmy.research_tools.GoogleSearch", return_value=mock_instance) as mock_cls:
from timmy.research_tools import google_web_search
await google_web_search("specific search term")
params = mock_cls.call_args[0][0]
assert params["q"] == "specific search term"
# ── get_llm_client ────────────────────────────────────────────────────────────
class TestGetLLMClient:
"""get_llm_client returns a client with a completion method."""
def test_returns_non_none_client(self):
from timmy.research_tools import get_llm_client
client = get_llm_client()
assert client is not None
def test_client_has_completion_method(self):
from timmy.research_tools import get_llm_client
client = get_llm_client()
assert hasattr(client, "completion")
assert callable(client.completion)
@pytest.mark.asyncio
async def test_completion_returns_object_with_text(self):
from timmy.research_tools import get_llm_client
client = get_llm_client()
result = await client.completion("test prompt", max_tokens=100)
assert hasattr(result, "text")
@pytest.mark.asyncio
async def test_completion_text_is_string(self):
from timmy.research_tools import get_llm_client
client = get_llm_client()
result = await client.completion("any prompt", max_tokens=50)
assert isinstance(result.text, str)
@pytest.mark.asyncio
async def test_completion_text_contains_prompt(self):
from timmy.research_tools import get_llm_client
client = get_llm_client()
result = await client.completion("my prompt", max_tokens=50)
assert "my prompt" in result.text
def test_each_call_returns_new_client(self):
from timmy.research_tools import get_llm_client
client_a = get_llm_client()
client_b = get_llm_client()
# Both should be functional clients (not necessarily the same instance)
assert hasattr(client_a, "completion")
assert hasattr(client_b, "completion")