Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
710f36e768 fix: confirm Qwe backend model with exact phrase
All checks were successful
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Successful in 1m4s
When the user sends exactly "Qwe", short-circuit the chat handler
to return "Confirmed: Qwe backend" instead of routing to the LLM.

Fixes #500

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:07:06 -04:00
20 changed files with 382 additions and 444 deletions

View File

@@ -82,6 +82,7 @@ cp .env.example .env
| `OLLAMA_MODEL` | `qwen3:30b` | Primary model for reasoning and tool calling. Fallback: `llama3.1:8b-instruct` |
| `DEBUG` | `false` | Enable `/docs` and `/redoc` |
| `TIMMY_MODEL_BACKEND` | `ollama` | `ollama` \| `airllm` \| `auto` |
| `AIRLLM_MODEL_SIZE` | `70b` | `8b` \| `70b` \| `405b` |
| `L402_HMAC_SECRET` | *(default — change in prod)* | HMAC signing key for macaroons |
| `L402_MACAROON_SECRET` | *(default — change in prod)* | Macaroon secret |
| `LIGHTNING_BACKEND` | `mock` | `mock` (production-ready) \| `lnd` (scaffolded, not yet functional) |
@@ -176,6 +177,7 @@ timmy chat "Explain self-custody" --backend airllm --model-size 70b
Or set once in `.env`:
```bash
TIMMY_MODEL_BACKEND=auto
AIRLLM_MODEL_SIZE=70b
```
| Flag | Parameters | RAM needed |

View File

@@ -111,7 +111,7 @@ pytest: error: unrecognized arguments: -n --dist worksteal
### 4a. Missing Error-Path Testing
Many modules have happy-path tests but lack coverage for:
- **Graceful degradation paths**: The architecture mandates graceful degradation when Ollama/Redis are unavailable, but most fallback paths are untested (e.g., `cascade.py` lines 563655)
- **Graceful degradation paths**: The architecture mandates graceful degradation when Ollama/Redis/AirLLM are unavailable, but most fallback paths are untested (e.g., `cascade.py` lines 563655)
- **`brain/client.py`**: Only 14.8% covered — connection failures, retries, and error handling are untested
- **`infrastructure/error_capture.py`**: 0% — the error capture system itself has no tests

View File

@@ -63,11 +63,11 @@ $ python -m pytest -q
## 2. Feature-by-Feature Audit
### 2.1 Timmy Agent
**Claimed**: Agno-powered conversational agent backed by Ollama, SQLite memory
**Claimed**: Agno-powered conversational agent backed by Ollama, AirLLM for 70B-405B models, SQLite memory
**Verdict: REAL & FUNCTIONAL**
- `src/timmy/agent.py` (79 lines): Creates a genuine `agno.Agent` with Ollama model, SQLite persistence, tools, and system prompt
- Backend selection (`backends.py`) implements real Ollama switching with Apple Silicon detection
- Backend selection (`backends.py`) implements real Ollama/AirLLM switching with Apple Silicon detection
- CLI (`cli.py`) provides working `timmy chat`, `timmy think`, `timmy status` commands
- Approval workflow (`approvals.py`) implements real human-in-the-loop with SQLite-backed state
- Briefing system (`briefing.py`) generates real scheduled briefings

View File

@@ -100,7 +100,7 @@ Bitcoin Lightning economics. No cloud AI.
make install && make dev → http://localhost:8000
## What's Here
- Timmy Agent (Ollama)
- Timmy Agent (Ollama/AirLLM)
- Mission Control Dashboard (FastAPI + HTMX)
- Swarm Coordinator (multi-agent auctions)
- Lightning Payments (L402 gating)

View File

@@ -6,7 +6,7 @@ This document outlines the security architecture, threat model, and recent audit
Timmy Time is built on the principle of **AI Sovereignty**. Security is not just about preventing unauthorized access, but about ensuring the user maintains full control over their data and AI models.
1. **Local-First Execution:** All primary AI inference (Ollama) runs on localhost. No data is sent to third-party cloud providers unless explicitly configured (e.g., Grok).
1. **Local-First Execution:** All primary AI inference (Ollama/AirLLM) runs on localhost. No data is sent to third-party cloud providers unless explicitly configured (e.g., Grok).
2. **Air-Gapped Ready:** The system is designed to run without an internet connection once dependencies and models are cached.
3. **Secret Management:** Secrets are never hard-coded. They are managed via Pydantic-settings from `.env` or environment variables.

View File

@@ -59,7 +59,7 @@ already works.
| LLM routing | CascadeRouter with circuit breakers | Good |
| Memory tiers | Hot (MEMORY.md) → Vault (markdown) → Semantic (SQLite+vectors) | Good foundation |
| Module boundaries | 8 packages with clear responsibilities | Good |
| Multi-backend LLM | Ollama/Grok/Claude with auto-detection | Good |
| Multi-backend LLM | Ollama/AirLLM/Grok/Claude with auto-detection | Good |
| Security posture | CSRF, security headers, secret validation, telemetry off | Good |
### Architecture Diagram (Current State)
@@ -473,7 +473,7 @@ The proposal enforces a strict 2,000-line limit for `src/timmy/`:
| `workflow_engine.py` | ~200 | YAML loader, step executor, state machine |
| `tool_registry.py` | ~200 | Dynamic tool discovery, spawn, health check |
| `memory_system.py` | ~300 | Hot/Vault/Semantic memory interface (existing) |
| `backends.py` | ~200 | Ollama/Claude/Grok adapters |
| `backends.py` | ~200 | Ollama/AirLLM/Claude/Grok adapters |
| `config.py` | ~150 | Pydantic-settings (existing) |
| `lightning_wallet.py` | ~200 | L402 handling, invoice generation, balance |
| `utils/` | ~300 | Shared helpers, logging, serialization |

View File

@@ -4,6 +4,7 @@
Proposed
## Context
Currently, the Timmy agent (`src/timmy/agent.py`) uses `src/timmy/backends.py` which provides a simple abstraction over Ollama and AirLLM. However, this lacks:
- Automatic failover between multiple LLM providers
- Circuit breaker pattern for failing providers
- Cost and latency tracking per provider
@@ -18,13 +19,14 @@ Integrate the Cascade Router as the primary LLM routing layer for Timmy, replaci
### Current Flow
```
User Request → Timmy Agent → backends.py → Ollama
User Request → Timmy Agent → backends.py → Ollama/AirLLM
```
### Proposed Flow
```
User Request → Timmy Agent → Cascade Router → Provider 1 (Ollama)
↓ (if fail)
Provider 2 (Local AirLLM)
↓ (if fail)
Provider 3 (API - optional)
@@ -39,6 +41,7 @@ User Request → Timmy Agent → Cascade Router → Provider 1 (Ollama)
- Expose provider status in agent responses
2. **Cascade Router** (`src/router/cascade.py`)
- Already supports: Ollama, OpenAI, Anthropic, AirLLM
- Already has: Circuit breakers, metrics, failover logic
- Add: Integration with existing `src/timmy/prompts.py`
@@ -54,6 +57,7 @@ User Request → Timmy Agent → Cascade Router → Provider 1 (Ollama)
### Provider Priority Order
1. **Ollama (local)** - Priority 1, always try first
2. **AirLLM (local)** - Priority 2, if Ollama unavailable
3. **API providers** - Priority 3+, only if configured
### Data Flow

View File

@@ -10,11 +10,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
APP_START_TIME: _datetime = _datetime.now(UTC)
def normalize_ollama_url(url: str) -> str:
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
return url.replace("localhost", "127.0.0.1")
class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class."""
@@ -24,11 +19,6 @@ class Settings(BaseSettings):
# Ollama host — override with OLLAMA_URL env var or .env file
ollama_url: str = "http://localhost:11434"
@property
def normalized_ollama_url(self) -> str:
"""Return ollama_url with localhost replaced by 127.0.0.1."""
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware.
@@ -402,7 +392,7 @@ def check_ollama_model_available(model_name: str) -> bool:
import json
import urllib.request
url = settings.normalized_ollama_url
url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -329,35 +329,33 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc)
def _startup_init() -> None:
"""Validate config and enable event persistence."""
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
# Validate security config (no-op in test mode)
from config import validate_startup
validate_startup()
# Enable event persistence (unified EventBus + swarm event_log)
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
presence_task = asyncio.create_task(_presence_watcher())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
# Auto-prune old vector store memories on startup
if settings.memory_prune_days > 0:
try:
from timmy.memory_system import prune_memories
@@ -375,6 +373,7 @@ def _startup_pruning() -> None:
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0:
try:
from timmy.thinking import thinking_engine
@@ -392,6 +391,7 @@ def _startup_pruning() -> None:
except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0:
try:
from swarm.event_log import prune_old_events
@@ -409,6 +409,7 @@ def _startup_pruning() -> None:
except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0:
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
@@ -424,42 +425,6 @@ def _startup_pruning() -> None:
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
) -> None:
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop()
for task in bg_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
@@ -467,7 +432,10 @@ async def lifespan(app: FastAPI):
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
# Start chat integrations in background
chat_task = asyncio.create_task(_start_chat_integrations_background())
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
@@ -480,7 +448,30 @@ async def lifespan(app: FastAPI):
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Cleanup on shutdown
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
# Close MCP tool server sessions
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop()
for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]:
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(

View File

@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
try:
import urllib.request
url = settings.normalized_ollama_url
url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -13,7 +13,7 @@ import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from config import normalize_ollama_url, settings
from config import settings
logger = logging.getLogger(__name__)
@@ -307,7 +307,7 @@ class MultiModalManager:
import json
import urllib.request
url = normalize_ollama_url(self.ollama_url)
url = self.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",
@@ -462,7 +462,7 @@ class MultiModalManager:
logger.info("Pulling model: %s", model_name)
url = normalize_ollama_url(self.ollama_url)
url = self.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",

View File

@@ -388,101 +388,6 @@ class CascadeRouter:
return None
def _select_model(
self, provider: Provider, model: str | None, content_type: ContentType
) -> tuple[str | None, bool]:
"""Select the best model for the request, with vision fallback.
Returns:
Tuple of (selected_model, is_fallback_model).
"""
selected_model = model or provider.get_default_model()
is_fallback = False
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
fallback = self._get_fallback_model(provider, selected_model, content_type)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
return selected_model, is_fallback
async def _attempt_with_retry(
self,
provider: Provider,
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
) -> dict:
"""Try a provider with retries, returning the result dict.
Raises:
RuntimeError: If all retry attempts fail.
Returns error strings collected during retries via the exception message.
"""
errors: list[str] = []
for attempt in range(self.config.max_retries_per_provider):
try:
return await self._try_provider(
provider=provider,
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s",
provider.name,
attempt + 1,
error_msg,
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
raise RuntimeError("; ".join(errors))
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
async def complete(
self,
messages: list[dict],
@@ -509,6 +414,7 @@ class CascadeRouter:
Raises:
RuntimeError: If all providers fail
"""
# Detect content type for multi-modal routing
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
@@ -516,34 +422,93 @@ class CascadeRouter:
errors = []
for provider in self.providers:
if not self._is_provider_available(provider):
# Skip disabled providers
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
# Skip unhealthy providers (circuit breaker)
if provider.status == ProviderStatus.UNHEALTHY:
# Check if circuit breaker can close
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
continue
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
# Determine which model to use
selected_model = model or provider.get_default_model()
is_fallback_model = False
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
# For non-text content, check if model supports it
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
# Check if selected model supports the required capability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
# Find fallback model
fallback = self._get_fallback_model(
provider, selected_model, content_type
)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback_model = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
# Try this provider
for attempt in range(self.config.max_retries_per_provider):
try:
result = await self._try_provider(
provider=provider,
messages=messages,
model=selected_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
# Success! Update metrics and return
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get(
"model", selected_model or provider.get_default_model()
),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
# All retries failed for this provider
self._record_failure(provider)
# All providers failed
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider(

View File

@@ -1 +1 @@
"""Timmy — Core AI agent (Ollama backend, CLI, prompts)."""
"""Timmy — Core AI agent (Ollama/AirLLM backends, CLI, prompts)."""

View File

@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
logger.info("Pulling model: %s", model_name)
url = settings.normalized_ollama_url
url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",

View File

@@ -95,126 +95,6 @@ def _parse_steps(plan_text: str) -> list[str]:
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
# ---------------------------------------------------------------------------
# Extracted helpers
# ---------------------------------------------------------------------------
def _extract_content(run_result) -> str:
"""Extract text content from an agent run result."""
return run_result.content if hasattr(run_result, "content") else str(run_result)
def _clean(text: str) -> str:
"""Clean a model response using session's response cleaner."""
from timmy.session import _clean_response
return _clean_response(text)
async def _plan_task(
agent, task: str, session_id: str, max_steps: int
) -> tuple[list[str], bool] | str:
"""Run the planning phase — returns (steps, was_truncated) or error string."""
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = _extract_content(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
return f"Planning failed: {exc}"
steps = _parse_steps(plan_text)
if not steps:
return "Planning produced no steps."
planned_count = len(steps)
steps = steps[:max_steps]
return steps, planned_count > len(steps)
async def _execute_step(
agent,
task: str,
step_desc: str,
step_num: int,
total_steps: int,
recent_results: list[str],
session_id: str,
) -> AgenticStep:
"""Execute a single step, returning an AgenticStep."""
step_start = time.monotonic()
context = (
f"Task: {task}\n"
f"Step {step_num}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
f"Execute this step and report what you did."
)
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
)
step_result = _clean(_extract_content(step_run))
return AgenticStep(
step_num=step_num,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
async def _adapt_step(
agent,
step_desc: str,
step_num: int,
error: Exception,
step_start: float,
session_id: str,
) -> AgenticStep:
"""Attempt adaptation after a step failure."""
adapt_prompt = (
f"Step {step_num} failed with error: {error}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
adapt_run = await asyncio.to_thread(
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
)
adapt_result = _clean(_extract_content(adapt_run))
return AgenticStep(
step_num=step_num,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
"""Fill in summary and final status on the result object (mutates in place)."""
completed = sum(1 for s in result.steps if s.status == "completed")
adapted = sum(1 for s in result.steps if s.status == "adapted")
failed = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed}/{total_steps} steps"]
if adapted:
parts.append(f"{adapted} adapted")
if failed:
parts.append(f"{failed} failed")
result.summary = f"{result.task}: {', '.join(parts)}."
if was_truncated or len(result.steps) < total_steps or failed:
result.status = "partial"
else:
result.status = "completed"
# ---------------------------------------------------------------------------
# Core loop
# ---------------------------------------------------------------------------
@@ -245,41 +125,88 @@ async def run_agentic_loop(
task_id = str(uuid.uuid4())[:8]
start_time = time.monotonic()
agent = _get_loop_agent()
result = AgenticResult(task_id=task_id, task=task, summary="")
# Phase 1: Planning
plan = await _plan_task(agent, task, session_id, max_steps)
if isinstance(plan, str):
# ── Phase 1: Planning ──────────────────────────────────────────────────
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
result.status = "failed"
result.summary = plan
result.summary = f"Planning failed: {exc}"
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
steps, was_truncated = plan
total_steps = len(steps)
steps = _parse_steps(plan_text)
if not steps:
result.status = "failed"
result.summary = "Planning produced no steps."
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
# Enforce max_steps — track if we truncated
planned_steps = len(steps)
steps = steps[:max_steps]
total_steps = len(steps)
was_truncated = planned_steps > total_steps
# Broadcast plan
await _broadcast_progress(
"agentic.plan_ready",
{"task_id": task_id, "task": task, "steps": steps, "total": total_steps},
{
"task_id": task_id,
"task": task,
"steps": steps,
"total": total_steps,
},
)
# Phase 2: Execution
# ── Phase 2: Execution ─────────────────────────────────────────────────
completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic()
recent = completed_results[-2:] if completed_results else []
context = (
f"Task: {task}\n"
f"Step {i}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent}\n\n"
f"Execute this step and report what you did."
)
try:
step = await _execute_step(
agent,
task,
step_desc,
i,
total_steps,
completed_results,
session_id,
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
)
step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
# Clean the response
from timmy.session import _clean_response
step_result = _clean_response(step_result)
step = AgenticStep(
step_num=i,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
result.steps.append(step)
completed_results.append(f"Step {i}: {step.result[:200]}")
completed_results.append(f"Step {i}: {step_result[:200]}")
# Broadcast progress
await _broadcast_progress(
"agentic.step_complete",
{
@@ -287,18 +214,46 @@ async def run_agentic_loop(
"step": i,
"total": total_steps,
"description": step_desc,
"result": step.result[:200],
"result": step_result[:200],
},
)
if on_progress:
await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc)
# ── Adaptation: ask model to adapt ─────────────────────────────
adapt_prompt = (
f"Step {i} failed with error: {exc}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
try:
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
adapt_run = await asyncio.to_thread(
agent.run,
adapt_prompt,
stream=False,
session_id=f"{session_id}_adapt{i}",
)
adapt_result = (
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
)
from timmy.session import _clean_response
adapt_result = _clean_response(adapt_result)
step = AgenticStep(
step_num=i,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
await _broadcast_progress(
"agentic.step_adapted",
{
@@ -307,26 +262,46 @@ async def run_agentic_loop(
"total": total_steps,
"description": step_desc,
"error": str(exc),
"adaptation": step.result[:200],
"adaptation": adapt_result[:200],
},
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
result.steps.append(
AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
step = AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
result.steps.append(step)
completed_results.append(f"Step {i}: FAILED")
# Phase 3: Summary
_summarize(result, total_steps, was_truncated)
# ── Phase 3: Summary ───────────────────────────────────────────────────
completed_count = sum(1 for s in result.steps if s.status == "completed")
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
failed_count = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed_count}/{total_steps} steps"]
if adapted_count:
parts.append(f"{adapted_count} adapted")
if failed_count:
parts.append(f"{failed_count} failed")
result.summary = f"{task}: {', '.join(parts)}."
# Determine final status
if was_truncated:
result.status = "partial"
elif len(result.steps) < total_steps:
result.status = "partial"
elif any(s.status == "failed" for s in result.steps):
result.status = "partial"
else:
result.status = "completed"
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
await _broadcast_progress(

View File

@@ -97,6 +97,11 @@ async def chat(message: str, session_id: str | None = None) -> str:
The agent's response text.
"""
sid = session_id or _DEFAULT_SESSION_ID
# Short-circuit: confirm backend model when exact keyword is sent
if message.strip() == "Qwe":
return "Confirmed: Qwe backend"
agent = _get_agent()
session_logger = get_session_logger()

View File

@@ -232,90 +232,6 @@ class ThinkingEngine:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list["Thought"],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: "Thought") -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
@@ -341,21 +257,91 @@ class ThinkingEngine:
)
return None
memory_context, system_context, recent_thoughts = self._build_thinking_context()
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
content: str | None = None
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
content = None # Will retry
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
# Post-hook: check memory status periodically
self._maybe_check_memory()
# Post-hook: distill facts from recent thoughts periodically
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
await self._maybe_file_issues()
# Post-hook: check workspace for new messages from Hermes
await self._check_workspace()
# Post-hook: proactive memory status audit
self._maybe_check_memory_status()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
# Log to swarm event system
self._log_event(thought)
# Append to daily journal file
self._write_journal(thought)
# Broadcast to WebSocket clients
await self._broadcast(thought)
logger.info(
"Thought [%s] (%s): %s",

View File

@@ -10,7 +10,7 @@ Categories:
M3xx iOS keyboard & zoom prevention
M4xx HTMX robustness (double-submit, sync)
M5xx Safe-area / notch support
M6xx Backend interface contract
M6xx AirLLM backend interface contract
"""
import re
@@ -208,7 +208,7 @@ def test_M505_dvh_units_used():
assert "dvh" in css
# ── M6xx — Backend interface contract ──────────────────────────────────
# ── M6xx — AirLLM backend interface contract ──────────────────────────────────
def test_M601_airllm_agent_has_run_method():

View File

@@ -1,4 +1,4 @@
"""Tests for src/timmy/backends.py — backend helpers."""
"""Tests for src/timmy/backends.py — AirLLM wrapper and helpers."""
import sys
from unittest.mock import MagicMock, patch

View File

@@ -71,6 +71,26 @@ class TestAnnotateConfidence:
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_chat_confirms_qwe_backend():
"""chat() should return exact confirmation when message is 'Qwe'."""
from timmy.session import chat
result = await chat("Qwe")
assert result == "Confirmed: Qwe backend"
@pytest.mark.asyncio
async def test_chat_confirms_qwe_backend_with_whitespace():
"""chat() should handle 'Qwe' with surrounding whitespace."""
from timmy.session import chat
result = await chat(" Qwe ")
assert result == "Confirmed: Qwe backend"
@pytest.mark.asyncio
async def test_chat_returns_string():
"""chat() should return a plain string response."""