Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
96acac5c5f refactor: break up shell.py::run() into helpers
All checks were successful
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Successful in 1m29s
Extract _build_run_env() and _exec_subprocess() from the 89-line run()
method, reducing it to ~30 lines that validate and delegate.

Fixes #539

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:03:33 -04:00
48 changed files with 1130 additions and 3004 deletions

View File

@@ -54,7 +54,6 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
# How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50
@@ -247,37 +246,9 @@ def update_summary() -> None:
SUMMARY_FILE.write_text(json.dumps(summary, indent=2) + "\n")
def _load_cycle_result() -> dict:
"""Read .loop/cycle_result.json if it exists; return empty dict on failure."""
if not CYCLE_RESULT_FILE.exists():
return {}
try:
raw = CYCLE_RESULT_FILE.read_text().strip()
# Strip hermes fence markers (```json ... ```) if present
if raw.startswith("```"):
lines = raw.splitlines()
lines = [l for l in lines if not l.startswith("```")]
raw = "\n".join(lines)
return json.loads(raw)
except (json.JSONDecodeError, OSError):
return {}
def main() -> None:
args = parse_args()
# Backfill from cycle_result.json when CLI args have defaults
cr = _load_cycle_result()
if cr:
if args.issue is None and cr.get("issue"):
args.issue = int(cr["issue"])
if args.type == "unknown" and cr.get("type"):
args.type = cr["type"]
if args.tests_passed == 0 and cr.get("tests_passed"):
args.tests_passed = int(cr["tests_passed"])
if not args.notes and cr.get("notes"):
args.notes = cr["notes"]
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()

View File

@@ -84,7 +84,6 @@ class Settings(BaseSettings):
# Only used when explicitly enabled and query complexity warrants it.
grok_enabled: bool = False
xai_api_key: str = ""
xai_base_url: str = "https://api.x.ai/v1"
grok_default_model: str = "grok-3-fast"
grok_max_sats_per_query: int = 200
grok_free: bool = False # Skip Lightning invoice when user has own API key

View File

@@ -46,7 +46,6 @@ from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import router as world_router
@@ -377,78 +376,73 @@ def _startup_background_tasks() -> list[asyncio.Task]:
]
def _try_prune(label: str, prune_fn, days: int) -> None:
"""Run a prune function, log results, swallow errors."""
try:
pruned = prune_fn()
if pruned:
logger.info(
"%s auto-prune: removed %d entries older than %d days",
label,
pruned,
days,
)
except Exception as exc:
logger.debug("%s auto-prune skipped: %s", label, exc)
def _check_vault_size() -> None:
"""Warn if the memory vault exceeds the configured size limit."""
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
if vault_path.exists():
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
total_mb = total_bytes / (1024 * 1024)
if total_mb > settings.memory_vault_max_mb:
logger.warning(
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
total_mb,
settings.memory_vault_max_mb,
)
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0:
from timmy.memory_system import prune_memories
try:
from timmy.memory_system import prune_memories
_try_prune(
"Memory",
lambda: prune_memories(
pruned = prune_memories(
older_than_days=settings.memory_prune_days,
keep_facts=settings.memory_prune_keep_facts,
),
settings.memory_prune_days,
)
)
if pruned:
logger.info(
"Memory auto-prune: removed %d entries older than %d days",
pruned,
settings.memory_prune_days,
)
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
if settings.thoughts_prune_days > 0:
from timmy.thinking import thinking_engine
try:
from timmy.thinking import thinking_engine
_try_prune(
"Thought",
lambda: thinking_engine.prune_old_thoughts(
pruned = thinking_engine.prune_old_thoughts(
keep_days=settings.thoughts_prune_days,
keep_min=settings.thoughts_prune_keep_min,
),
settings.thoughts_prune_days,
)
)
if pruned:
logger.info(
"Thought auto-prune: removed %d entries older than %d days",
pruned,
settings.thoughts_prune_days,
)
except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc)
if settings.events_prune_days > 0:
from swarm.event_log import prune_old_events
try:
from swarm.event_log import prune_old_events
_try_prune(
"Event",
lambda: prune_old_events(
pruned = prune_old_events(
keep_days=settings.events_prune_days,
keep_min=settings.events_prune_keep_min,
),
settings.events_prune_days,
)
)
if pruned:
logger.info(
"Event auto-prune: removed %d entries older than %d days",
pruned,
settings.events_prune_days,
)
except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc)
if settings.memory_vault_max_mb > 0:
_check_vault_size()
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
if vault_path.exists():
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
total_mb = total_bytes / (1024 * 1024)
if total_mb > settings.memory_vault_max_mb:
logger.warning(
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
total_mb,
settings.memory_vault_max_mb,
)
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
async def _shutdown_cleanup(
@@ -589,7 +583,6 @@ app.include_router(system_router)
app.include_router(experiments_router)
app.include_router(db_explorer_router)
app.include_router(world_router)
app.include_router(tower_router)
@app.websocket("/ws")

View File

@@ -175,12 +175,18 @@ class CSRFMiddleware(BaseHTTPMiddleware):
return await call_next(request)
# Token validation failed and path is not exempt
# Resolve the endpoint from routes BEFORE executing to avoid side effects
endpoint = self._resolve_endpoint(request)
if endpoint and is_csrf_exempt(endpoint):
return await call_next(request)
# We still need to call the app to check if the endpoint is decorated
# with @csrf_exempt, so we'll let it through and check after routing
response = await call_next(request)
# Endpoint is not exempt and token validation failed — reject without executing
# After routing, check if the endpoint is marked as exempt
endpoint = request.scope.get("endpoint")
if endpoint and is_csrf_exempt(endpoint):
# Endpoint is marked as exempt, allow the response
return response
# Endpoint is not exempt and token validation failed
# Return 403 error
return JSONResponse(
status_code=403,
content={
@@ -190,42 +196,6 @@ class CSRFMiddleware(BaseHTTPMiddleware):
},
)
def _resolve_endpoint(self, request: Request) -> Callable | None:
"""Resolve the endpoint for a request without executing it.
Walks the app chain to find routes, then matches against the request
scope. This allows checking @csrf_exempt before the handler runs
(avoiding side effects on CSRF rejection).
Returns:
The endpoint callable if found, None otherwise.
"""
try:
from starlette.routing import Match
# Walk the middleware/app chain to find something with routes
routes = None
current = self.app
for _ in range(10): # Safety limit
routes = getattr(current, "routes", None)
if routes:
break
current = getattr(current, "app", None)
if current is None:
break
if not routes:
return None
scope = dict(request.scope)
for route in routes:
match, child_scope = route.matches(scope)
if match == Match.FULL:
return child_scope.get("endpoint")
except Exception:
logger.debug("Failed to resolve endpoint for CSRF check")
return None
def _is_likely_exempt(self, path: str) -> bool:
"""Check if a path is likely to be CSRF exempt.

View File

@@ -1,4 +1,4 @@
from datetime import UTC, date, datetime
from datetime import date, datetime
from enum import StrEnum
from sqlalchemy import JSON, Boolean, Column, Date, DateTime, Index, Integer, String
@@ -40,13 +40,8 @@ class Task(Base):
deferred_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False)
updated_at = Column(
DateTime,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
nullable=False,
)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (Index("ix_task_state_order", "state", "sort_order"),)
@@ -64,4 +59,4 @@ class JournalEntry(Base):
gratitude = Column(String(500), nullable=True)
energy_level = Column(Integer, nullable=True) # User-reported, 1-10
created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

View File

@@ -1,5 +1,5 @@
import logging
from datetime import UTC, date, datetime
from datetime import date, datetime
from fastapi import APIRouter, Depends, Form, HTTPException, Request
from fastapi.responses import HTMLResponse
@@ -19,17 +19,14 @@ router = APIRouter(tags=["calm"])
# Helper functions for state machine logic
def get_now_task(db: Session) -> Task | None:
"""Return the single active NOW task, or None."""
return db.query(Task).filter(Task.state == TaskState.NOW).first()
def get_next_task(db: Session) -> Task | None:
"""Return the single queued NEXT task, or None."""
return db.query(Task).filter(Task.state == TaskState.NEXT).first()
def get_later_tasks(db: Session) -> list[Task]:
"""Return all LATER tasks ordered by MIT flag then sort_order."""
return (
db.query(Task)
.filter(Task.state == TaskState.LATER)
@@ -38,63 +35,7 @@ def get_later_tasks(db: Session) -> list[Task]:
)
def _create_mit_tasks(db: Session, titles: list[str | None]) -> list[int]:
"""Create MIT tasks from a list of titles, return their IDs."""
task_ids: list[int] = []
for title in titles:
if title:
task = Task(
title=title,
is_mit=True,
state=TaskState.LATER,
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
task_ids.append(task.id)
return task_ids
def _create_other_tasks(db: Session, other_tasks: str):
"""Create non-MIT tasks from newline-separated text."""
for line in other_tasks.split("\n"):
line = line.strip()
if line:
task = Task(
title=line,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
def _seed_now_next(db: Session):
"""Set initial NOW/NEXT states when both slots are empty."""
if get_now_task(db) or get_next_task(db):
return
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush()
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
- At most one NOW task (extras demoted to NEXT).
- If no NOW, promote NEXT -> NOW.
- If no NEXT, promote highest-priority LATER -> NEXT.
"""
# Ensure only one NOW task exists. If multiple, demote extras to NEXT.
now_tasks = db.query(Task).filter(Task.state == TaskState.NOW).all()
if len(now_tasks) > 1:
@@ -133,7 +74,6 @@ def promote_tasks(db: Session):
# Endpoints
@router.get("/calm", response_class=HTMLResponse)
async def get_calm_view(request: Request, db: Session = Depends(get_db)):
"""Render the main CALM dashboard with NOW/NEXT/LATER counts."""
now_task = get_now_task(db)
next_task = get_next_task(db)
later_tasks_count = len(get_later_tasks(db))
@@ -150,7 +90,6 @@ async def get_calm_view(request: Request, db: Session = Depends(get_db)):
@router.get("/calm/ritual/morning", response_class=HTMLResponse)
async def get_morning_ritual_form(request: Request):
"""Render the morning ritual intake form."""
return templates.TemplateResponse(request, "calm/morning_ritual_form.html", {})
@@ -163,20 +102,63 @@ async def post_morning_ritual(
mit3_title: str = Form(None),
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
db.add(journal_entry)
db.commit()
db.refresh(journal_entry)
journal_entry.mit_task_ids = _create_mit_tasks(db, [mit1_title, mit2_title, mit3_title])
# Create MIT tasks
for mit_title in [mit1_title, mit2_title, mit3_title]:
if mit_title:
task = Task(
title=mit_title,
is_mit=True,
state=TaskState.LATER, # Initially LATER, will be promoted
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
mit_task_ids.append(task.id)
journal_entry.mit_task_ids = mit_task_ids
db.add(journal_entry)
_create_other_tasks(db, other_tasks)
# Create other tasks
for task_title in other_tasks.split("\n"):
task_title = task_title.strip()
if task_title:
task = Task(
title=task_title,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
db.commit()
_seed_now_next(db)
db.commit()
# Set initial NOW/NEXT states
# Set initial NOW/NEXT states after all tasks are created
if not get_now_task(db) and not get_next_task(db):
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
# Set the highest priority LATER task to NOW
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush() # Flush to make the change visible for the next query
# Set the next highest priority LATER task to NEXT
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
db.commit() # Commit changes after initial NOW/NEXT setup
return templates.TemplateResponse(
request,
@@ -191,7 +173,6 @@ async def post_morning_ritual(
@router.get("/calm/ritual/evening", response_class=HTMLResponse)
async def get_evening_ritual_form(request: Request, db: Session = Depends(get_db)):
"""Render the evening ritual form for today's journal entry."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -208,7 +189,6 @@ async def post_evening_ritual(
gratitude: str = Form(None),
energy_level: int = Form(None),
):
"""Process evening ritual: save reflection/gratitude, archive active tasks."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -226,7 +206,7 @@ async def post_evening_ritual(
)
for task in active_tasks:
task.state = TaskState.DEFERRED # Or DONE, depending on desired archiving logic
task.deferred_at = datetime.now(UTC)
task.deferred_at = datetime.utcnow()
db.add(task)
db.commit()
@@ -243,7 +223,6 @@ async def create_new_task(
is_mit: bool = Form(False),
certainty: TaskCertainty = Form(TaskCertainty.SOFT),
):
"""Create a new task in LATER state and return updated count."""
task = Task(
title=title,
description=description,
@@ -268,7 +247,6 @@ async def start_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Move a task to NOW state, demoting the current NOW to NEXT."""
current_now_task = get_now_task(db)
if current_now_task and current_now_task.id != task_id:
current_now_task.state = TaskState.NEXT # Demote current NOW to NEXT
@@ -279,7 +257,7 @@ async def start_task(
raise HTTPException(status_code=404, detail="Task not found")
task.state = TaskState.NOW
task.started_at = datetime.now(UTC)
task.started_at = datetime.utcnow()
db.add(task)
db.commit()
@@ -303,13 +281,12 @@ async def complete_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Mark a task as DONE and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task.state = TaskState.DONE
task.completed_at = datetime.now(UTC)
task.completed_at = datetime.utcnow()
db.add(task)
db.commit()
@@ -332,13 +309,12 @@ async def defer_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Defer a task and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task.state = TaskState.DEFERRED
task.deferred_at = datetime.now(UTC)
task.deferred_at = datetime.utcnow()
db.add(task)
db.commit()
@@ -357,7 +333,6 @@ async def defer_task(
@router.get("/calm/partials/later_tasks_list", response_class=HTMLResponse)
async def get_later_tasks_list(request: Request, db: Session = Depends(get_db)):
"""Render the expandable list of LATER tasks."""
later_tasks = get_later_tasks(db)
return templates.TemplateResponse(
"calm/partials/later_tasks_list.html",
@@ -373,7 +348,6 @@ async def reorder_tasks(
later_task_ids: str = Form(""),
next_task_id: int | None = Form(None),
):
"""Reorder LATER tasks and optionally promote one to NEXT."""
# Reorder LATER tasks
if later_task_ids:
ids_in_order = [int(x.strip()) for x in later_task_ids.split(",") if x.strip()]

View File

@@ -16,11 +16,52 @@ router = APIRouter(tags=["system"])
@router.get("/lightning/ledger", response_class=HTMLResponse)
async def lightning_ledger(request: Request):
"""Ledger and balance page backed by the in-memory Lightning ledger."""
from lightning.ledger import get_balance, get_transactions
"""Ledger and balance page."""
# Mock data for now, as this seems to be a UI-first feature
balance = {
"available_sats": 1337,
"incoming_total_sats": 2000,
"outgoing_total_sats": 663,
"fees_paid_sats": 5,
"net_sats": 1337,
"pending_incoming_sats": 0,
"pending_outgoing_sats": 0,
}
balance = get_balance()
transactions = get_transactions()
# Mock transactions
from collections import namedtuple
from enum import Enum
class TxType(Enum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(Enum):
completed = "completed"
pending = "pending"
Tx = namedtuple(
"Tx", ["tx_type", "status", "amount_sats", "payment_hash", "memo", "created_at"]
)
transactions = [
Tx(
TxType.outgoing,
TxStatus.completed,
50,
"hash1",
"Model inference",
"2026-03-04 10:00:00",
),
Tx(
TxType.incoming,
TxStatus.completed,
1000,
"hash2",
"Manual deposit",
"2026-03-03 15:00:00",
),
]
return templates.TemplateResponse(
request,
@@ -29,7 +70,7 @@ async def lightning_ledger(request: Request):
"balance": balance,
"transactions": transactions,
"tx_types": ["incoming", "outgoing"],
"tx_statuses": ["pending", "settled", "failed", "expired"],
"tx_statuses": ["completed", "pending"],
"filter_type": None,
"filter_status": None,
"stats": {},

View File

@@ -5,7 +5,7 @@ import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Form, HTTPException, Request
@@ -219,7 +219,7 @@ async def create_task_form(
raise HTTPException(status_code=400, detail="Task title cannot be empty")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
with _get_db() as db:
@@ -287,7 +287,7 @@ async def modify_task(
async def _set_status(request: Request, task_id: str, new_status: str):
"""Helper to update status and return refreshed task card."""
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
@@ -316,7 +316,7 @@ async def api_create_task(request: Request):
raise HTTPException(422, "title is required")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = body.get("priority", "normal")
if priority not in VALID_PRIORITIES:
priority = "normal"
@@ -358,7 +358,7 @@ async def api_update_status(task_id: str, request: Request):
raise HTTPException(422, f"Invalid status. Must be one of: {VALID_STATUSES}")
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(

View File

@@ -1,108 +0,0 @@
"""Tower dashboard — real-time Spark visualization via WebSocket.
GET /tower — HTML Tower dashboard (Thinking / Predicting / Advising)
WS /tower/ws — WebSocket stream of Spark engine state updates
"""
import asyncio
import json
import logging
from fastapi import APIRouter, Request, WebSocket
from fastapi.responses import HTMLResponse
from dashboard.templating import templates
from spark.engine import spark_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tower", tags=["tower"])
_PUSH_INTERVAL = 5 # seconds between state broadcasts
def _spark_snapshot() -> dict:
"""Build a JSON-serialisable snapshot of Spark state."""
status = spark_engine.status()
timeline = spark_engine.get_timeline(limit=10)
events = []
for ev in timeline:
entry = {
"event_type": ev.event_type,
"description": ev.description,
"importance": ev.importance,
"created_at": ev.created_at,
}
if ev.agent_id:
entry["agent_id"] = ev.agent_id[:8]
if ev.task_id:
entry["task_id"] = ev.task_id[:8]
try:
entry["data"] = json.loads(ev.data)
except (json.JSONDecodeError, TypeError):
entry["data"] = {}
events.append(entry)
predictions = spark_engine.get_predictions(limit=5)
preds = []
for p in predictions:
pred = {
"task_id": p.task_id[:8] if p.task_id else "?",
"accuracy": p.accuracy,
"evaluated": p.evaluated_at is not None,
"created_at": p.created_at,
}
try:
pred["predicted"] = json.loads(p.predicted_value)
except (json.JSONDecodeError, TypeError):
pred["predicted"] = {}
preds.append(pred)
advisories = spark_engine.get_advisories()
advs = [
{
"category": a.category,
"priority": a.priority,
"title": a.title,
"detail": a.detail,
"suggested_action": a.suggested_action,
}
for a in advisories
]
return {
"type": "spark_state",
"status": status,
"events": events,
"predictions": preds,
"advisories": advs,
}
@router.get("", response_class=HTMLResponse)
async def tower_ui(request: Request):
"""Render the Tower dashboard page."""
snapshot = _spark_snapshot()
return templates.TemplateResponse(
request,
"tower.html",
{"snapshot": snapshot},
)
@router.websocket("/ws")
async def tower_ws(websocket: WebSocket) -> None:
"""Stream Spark state snapshots to the Tower dashboard."""
await websocket.accept()
logger.info("Tower WS connected")
try:
# Send initial snapshot
await websocket.send_text(json.dumps(_spark_snapshot()))
while True:
await asyncio.sleep(_PUSH_INTERVAL)
await websocket.send_text(json.dumps(_spark_snapshot()))
except Exception:
logger.debug("Tower WS disconnected")

View File

@@ -5,7 +5,7 @@ import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Form, HTTPException, Request
@@ -144,7 +144,7 @@ async def submit_work_order(
related_files: str = Form(""),
):
wo_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = priority if priority in PRIORITIES else "medium"
category = category if category in CATEGORIES else "suggestion"
@@ -211,7 +211,7 @@ async def active_partial(request: Request):
async def _update_status(request: Request, wo_id: str, new_status: str, **extra):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "rejected") else None
datetime.utcnow().isoformat() if new_status in ("completed", "rejected") else None
)
with _get_db() as db:
sets = ["status=?", "completed_at=COALESCE(?, completed_at)"]

View File

@@ -138,47 +138,6 @@
</div>
</div>
<!-- Spark Intelligence -->
{% from "macros.html" import panel %}
<div class="mc-card-spaced">
<div class="card">
<div class="card-header">
<h2 class="card-title">Spark Intelligence</h2>
<div>
<span class="badge" id="spark-status-badge">Loading...</span>
</div>
</div>
<div class="grid grid-3">
<div class="stat">
<div class="stat-value" id="spark-events">-</div>
<div class="stat-label">Events</div>
</div>
<div class="stat">
<div class="stat-value" id="spark-memories">-</div>
<div class="stat-label">Memories</div>
</div>
<div class="stat">
<div class="stat-value" id="spark-predictions">-</div>
<div class="stat-label">Predictions</div>
</div>
</div>
</div>
<div class="grid grid-2 mc-section-gap">
{% call panel("SPARK TIMELINE", id="spark-timeline-panel",
hx_get="/spark/timeline",
hx_trigger="load, every 10s") %}
<div class="spark-timeline-scroll">
<p class="chat-history-placeholder">Loading timeline...</p>
</div>
{% endcall %}
{% call panel("SPARK INSIGHTS", id="spark-insights-panel",
hx_get="/spark/insights",
hx_trigger="load, every 30s") %}
<p class="chat-history-placeholder">Loading insights...</p>
{% endcall %}
</div>
</div>
<!-- Chat History -->
<div class="card mc-card-spaced">
<div class="card-header">
@@ -469,34 +428,7 @@ async function loadGrokStats() {
}
}
// Load Spark status
async function loadSparkStatus() {
try {
var response = await fetch('/spark');
var data = await response.json();
var st = data.status || {};
document.getElementById('spark-events').textContent = st.total_events || 0;
document.getElementById('spark-memories').textContent = st.total_memories || 0;
document.getElementById('spark-predictions').textContent = st.total_predictions || 0;
var badge = document.getElementById('spark-status-badge');
if (st.total_events > 0) {
badge.textContent = 'Active';
badge.className = 'badge badge-success';
} else {
badge.textContent = 'Idle';
badge.className = 'badge badge-warning';
}
} catch (error) {
var badge = document.getElementById('spark-status-badge');
badge.textContent = 'Offline';
badge.className = 'badge badge-danger';
}
}
// Initial load
loadSparkStatus();
loadSovereignty();
loadHealth();
loadSwarmStats();
@@ -510,6 +442,5 @@ setInterval(loadHealth, 10000);
setInterval(loadSwarmStats, 5000);
setInterval(updateHeartbeat, 5000);
setInterval(loadGrokStats, 10000);
setInterval(loadSparkStatus, 15000);
</script>
{% endblock %}

View File

@@ -1,180 +0,0 @@
{% extends "base.html" %}
{% block title %}Timmy Time — Tower{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid tower-container py-3">
<div class="tower-header">
<div class="tower-title">TOWER</div>
<div class="tower-subtitle">
Real-time Spark visualization &mdash;
<span id="tower-conn" class="tower-conn-badge tower-conn-connecting">CONNECTING</span>
</div>
</div>
<div class="row g-3">
<!-- Left: THINKING (events) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-thinking">// THINKING</div>
<div class="card-body p-3 tower-scroll" id="tower-events">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
<!-- Middle: PREDICTING (EIDOS) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-predicting">// PREDICTING</div>
<div class="card-body p-3" id="tower-predictions">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
<div class="card mc-panel">
<div class="card-header mc-panel-header">// EIDOS STATS</div>
<div class="card-body p-3">
<div class="tower-stat-grid" id="tower-stats">
<div class="tower-stat"><span class="tower-stat-label">EVENTS</span><span class="tower-stat-value" id="ts-events">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">MEMORIES</span><span class="tower-stat-value" id="ts-memories">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">PREDICTIONS</span><span class="tower-stat-value" id="ts-preds">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">ACCURACY</span><span class="tower-stat-value" id="ts-accuracy"></span></div>
</div>
</div>
</div>
</div>
<!-- Right: ADVISING -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-advising">// ADVISING</div>
<div class="card-body p-3 tower-scroll" id="tower-advisories">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
</div>
</div>
<script>
(function() {
var ws = null;
var badge = document.getElementById('tower-conn');
function setConn(state) {
badge.textContent = state.toUpperCase();
badge.className = 'tower-conn-badge tower-conn-' + state;
}
function esc(s) { var d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
function renderEvents(events) {
var el = document.getElementById('tower-events');
if (!events || !events.length) { el.innerHTML = '<div class="tower-empty">No events captured yet.</div>'; return; }
var html = '';
for (var i = 0; i < events.length; i++) {
var ev = events[i];
var dots = ev.importance >= 0.8 ? '\u25cf\u25cf\u25cf' : ev.importance >= 0.5 ? '\u25cf\u25cf' : '\u25cf';
html += '<div class="tower-event tower-etype-' + esc(ev.event_type) + '">'
+ '<div class="tower-ev-head">'
+ '<span class="tower-ev-badge">' + esc(ev.event_type.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-ev-dots">' + dots + '</span>'
+ '</div>'
+ '<div class="tower-ev-desc">' + esc(ev.description) + '</div>'
+ '<div class="tower-ev-time">' + esc((ev.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderPredictions(preds) {
var el = document.getElementById('tower-predictions');
if (!preds || !preds.length) { el.innerHTML = '<div class="tower-empty">No predictions yet.</div>'; return; }
var html = '';
for (var i = 0; i < preds.length; i++) {
var p = preds[i];
var cls = p.evaluated ? 'tower-pred-done' : 'tower-pred-pending';
var accTxt = p.accuracy != null ? Math.round(p.accuracy * 100) + '%' : 'PENDING';
var accCls = p.accuracy != null ? (p.accuracy >= 0.7 ? 'text-success' : p.accuracy < 0.4 ? 'text-danger' : 'text-warning') : '';
html += '<div class="tower-pred ' + cls + '">'
+ '<div class="tower-pred-head">'
+ '<span class="tower-pred-task">' + esc(p.task_id) + '</span>'
+ '<span class="tower-pred-acc ' + accCls + '">' + accTxt + '</span>'
+ '</div>';
if (p.predicted) {
var pr = p.predicted;
html += '<div class="tower-pred-detail">';
if (pr.likely_winner) html += '<span>Winner: ' + esc(pr.likely_winner.slice(0, 8)) + '</span> ';
if (pr.success_probability != null) html += '<span>Success: ' + Math.round(pr.success_probability * 100) + '%</span> ';
html += '</div>';
}
html += '<div class="tower-ev-time">' + esc((p.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderAdvisories(advs) {
var el = document.getElementById('tower-advisories');
if (!advs || !advs.length) { el.innerHTML = '<div class="tower-empty">No advisories yet.</div>'; return; }
var html = '';
for (var i = 0; i < advs.length; i++) {
var a = advs[i];
var prio = a.priority >= 0.7 ? 'high' : a.priority >= 0.4 ? 'medium' : 'low';
html += '<div class="tower-advisory tower-adv-' + prio + '">'
+ '<div class="tower-adv-head">'
+ '<span class="tower-adv-cat">' + esc(a.category.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-adv-prio">' + Math.round(a.priority * 100) + '%</span>'
+ '</div>'
+ '<div class="tower-adv-title">' + esc(a.title) + '</div>'
+ '<div class="tower-adv-detail">' + esc(a.detail) + '</div>'
+ '<div class="tower-adv-action">' + esc(a.suggested_action) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderStats(status) {
if (!status) return;
document.getElementById('ts-events').textContent = status.events_captured || 0;
document.getElementById('ts-memories').textContent = status.memories_stored || 0;
var p = status.predictions || {};
document.getElementById('ts-preds').textContent = p.total_predictions || 0;
var acc = p.avg_accuracy;
var accEl = document.getElementById('ts-accuracy');
if (acc != null) {
accEl.textContent = Math.round(acc * 100) + '%';
accEl.className = 'tower-stat-value ' + (acc >= 0.7 ? 'text-success' : acc < 0.4 ? 'text-danger' : 'text-warning');
} else {
accEl.textContent = '\u2014';
}
}
function handleMsg(data) {
if (data.type !== 'spark_state') return;
renderEvents(data.events);
renderPredictions(data.predictions);
renderAdvisories(data.advisories);
renderStats(data.status);
}
function connect() {
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(proto + '//' + location.host + '/tower/ws');
ws.onopen = function() { setConn('live'); };
ws.onclose = function() { setConn('offline'); setTimeout(connect, 3000); };
ws.onerror = function() { setConn('offline'); };
ws.onmessage = function(e) {
try { handleMsg(JSON.parse(e.data)); } catch(err) { console.error('Tower WS parse error', err); }
};
}
connect();
})();
</script>
{% endblock %}

View File

@@ -149,52 +149,6 @@ def _log_error_event(
logger.debug("Failed to log error event: %s", log_exc)
def _build_report_description(
exc: Exception,
source: str,
context: dict | None,
error_hash: str,
tb_str: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> str:
"""Build the markdown description for a bug report task."""
parts = [
f"**Error:** {type(exc).__name__}: {str(exc)}",
f"**Source:** {source}",
f"**File:** {affected_file}:{affected_line}",
f"**Git:** {git_ctx.get('branch', '?')} @ {git_ctx.get('commit', '?')}",
f"**Time:** {datetime.now(UTC).isoformat()}",
f"**Hash:** {error_hash}",
]
if context:
ctx_str = ", ".join(f"{k}={v}" for k, v in context.items())
parts.append(f"**Context:** {ctx_str}")
parts.append(f"\n**Stack Trace:**\n```\n{tb_str[:2000]}\n```")
return "\n".join(parts)
def _log_bug_report_created(source: str, task_id: str, error_hash: str, title: str) -> None:
"""Log a BUG_REPORT_CREATED event (best-effort)."""
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.BUG_REPORT_CREATED,
source=source,
task_id=task_id,
data={
"error_hash": error_hash,
"title": title[:100],
},
)
except Exception as exc:
logger.warning("Bug report event log error: %s", exc)
def _create_bug_report(
exc: Exception,
source: str,
@@ -210,20 +164,25 @@ def _create_bug_report(
from swarm.task_queue.models import create_task
title = f"[BUG] {type(exc).__name__}: {str(exc)[:80]}"
description = _build_report_description(
exc,
source,
context,
error_hash,
tb_str,
affected_file,
affected_line,
git_ctx,
)
description_parts = [
f"**Error:** {type(exc).__name__}: {str(exc)}",
f"**Source:** {source}",
f"**File:** {affected_file}:{affected_line}",
f"**Git:** {git_ctx.get('branch', '?')} @ {git_ctx.get('commit', '?')}",
f"**Time:** {datetime.now(UTC).isoformat()}",
f"**Hash:** {error_hash}",
]
if context:
ctx_str = ", ".join(f"{k}={v}" for k, v in context.items())
description_parts.append(f"**Context:** {ctx_str}")
description_parts.append(f"\n**Stack Trace:**\n```\n{tb_str[:2000]}\n```")
task = create_task(
title=title,
description=description,
description="\n".join(description_parts),
assigned_to="default",
created_by="system",
priority="normal",
@@ -231,9 +190,24 @@ def _create_bug_report(
auto_approve=True,
task_type="bug_report",
)
task_id = task.id
_log_bug_report_created(source, task.id, error_hash, title)
return task.id
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.BUG_REPORT_CREATED,
source=source,
task_id=task_id,
data={
"error_hash": error_hash,
"title": title[:100],
},
)
except Exception as exc:
logger.warning("Bug report screenshot error: %s", exc)
return task_id
except Exception as task_exc:
logger.debug("Failed to create bug report task: %s", task_exc)

View File

@@ -64,7 +64,7 @@ class EventBus:
@bus.subscribe("agent.task.*")
async def handle_task(event: Event):
logger.debug("Task event: %s", event.data)
logger.debug(f"Task event: {event.data}")
await bus.publish(Event(
type="agent.task.assigned",

View File

@@ -146,7 +146,7 @@ class ShellHand:
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into a copy of the current environment."""
"""Merge *env* overrides into the current process environment."""
import os
run_env = os.environ.copy()
@@ -154,7 +154,7 @@ class ShellHand:
run_env.update(env)
return run_env
async def _execute_subprocess(
async def _exec_subprocess(
self,
command: str,
effective_timeout: int,
@@ -162,7 +162,7 @@ class ShellHand:
run_env: dict,
start: float,
) -> ShellResult:
"""Run *command* as a subprocess with timeout enforcement."""
"""Launch *command*, enforce timeout, and return the result."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
@@ -178,29 +178,24 @@ class ShellHand:
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
latency_ms=(time.time() - start) * 1000,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
stdout=stdout_bytes.decode("utf-8", errors="replace").strip(),
stderr=stderr_bytes.decode("utf-8", errors="replace").strip(),
latency_ms=(time.time() - start) * 1000,
)
async def run(
@@ -232,20 +227,21 @@ class ShellHand:
latency_ms=(time.time() - start) * 1000,
)
effective_timeout = timeout or self._default_timeout
cwd = working_dir or self._working_dir
try:
run_env = self._build_run_env(env)
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
return await self._exec_subprocess(
command,
effective_timeout=timeout or self._default_timeout,
cwd=working_dir or self._working_dir,
run_env=self._build_run_env(env),
start=start,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc)
return ShellResult(
command=command,
success=False,
error=str(exc),
latency_ms=latency,
latency_ms=(time.time() - start) * 1000,
)
def status(self) -> dict:

View File

@@ -2,7 +2,6 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .history import HealthHistoryStore, get_history_store
__all__ = [
"CascadeRouter",
@@ -10,6 +9,4 @@ __all__ = [
"ProviderStatus",
"get_router",
"router",
"HealthHistoryStore",
"get_history_store",
]

View File

@@ -8,7 +8,6 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from .cascade import CascadeRouter, get_router
from .history import HealthHistoryStore, get_history_store
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/router", tags=["router"])
@@ -200,17 +199,6 @@ async def reload_config(
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/history")
async def get_history(
hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours."""
if store is None:
store = get_history_store()
return store.get_history(hours=hours)
@router.get("/config")
async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -221,56 +221,65 @@ class CascadeRouter:
raise RuntimeError("PyYAML not installed")
content = self.config_path.read_text()
# Expand environment variables
content = self._expand_env_vars(content)
data = yaml.safe_load(content)
self.config = self._parse_router_config(data)
self._load_providers(data)
# Load cascade settings
cascade = data.get("cascade", {})
# Load fallback chains
fallback_chains = data.get("fallback_chains", {})
# Load multi-modal settings
multimodal = data.get("multimodal", {})
self.config = RouterConfig(
timeout_seconds=cascade.get("timeout_seconds", 30),
max_retries_per_provider=cascade.get("max_retries_per_provider", 2),
retry_delay_seconds=cascade.get("retry_delay_seconds", 1),
circuit_breaker_failure_threshold=cascade.get("circuit_breaker", {}).get(
"failure_threshold", 5
),
circuit_breaker_recovery_timeout=cascade.get("circuit_breaker", {}).get(
"recovery_timeout", 60
),
circuit_breaker_half_open_max_calls=cascade.get("circuit_breaker", {}).get(
"half_open_max_calls", 2
),
auto_pull_models=multimodal.get("auto_pull", True),
fallback_chains=fallback_chains,
)
# Load providers
for p_data in data.get("providers", []):
# Skip disabled providers
if not p_data.get("enabled", False):
continue
provider = Provider(
name=p_data["name"],
type=p_data["type"],
enabled=p_data.get("enabled", True),
priority=p_data.get("priority", 99),
url=p_data.get("url"),
api_key=p_data.get("api_key"),
base_url=p_data.get("base_url"),
models=p_data.get("models", []),
)
# Check if provider is actually available
if self._check_provider_available(provider):
self.providers.append(provider)
else:
logger.warning("Provider %s not available, skipping", provider.name)
# Sort by priority
self.providers.sort(key=lambda p: p.priority)
except Exception as exc:
logger.error("Failed to load config: %s", exc)
def _parse_router_config(self, data: dict) -> RouterConfig:
"""Build a RouterConfig from parsed YAML data."""
cascade = data.get("cascade", {})
cb = cascade.get("circuit_breaker", {})
multimodal = data.get("multimodal", {})
return RouterConfig(
timeout_seconds=cascade.get("timeout_seconds", 30),
max_retries_per_provider=cascade.get("max_retries_per_provider", 2),
retry_delay_seconds=cascade.get("retry_delay_seconds", 1),
circuit_breaker_failure_threshold=cb.get("failure_threshold", 5),
circuit_breaker_recovery_timeout=cb.get("recovery_timeout", 60),
circuit_breaker_half_open_max_calls=cb.get("half_open_max_calls", 2),
auto_pull_models=multimodal.get("auto_pull", True),
fallback_chains=data.get("fallback_chains", {}),
)
def _load_providers(self, data: dict) -> None:
"""Load, filter, and sort providers from parsed YAML data."""
for p_data in data.get("providers", []):
if not p_data.get("enabled", False):
continue
provider = Provider(
name=p_data["name"],
type=p_data["type"],
enabled=p_data.get("enabled", True),
priority=p_data.get("priority", 99),
url=p_data.get("url"),
api_key=p_data.get("api_key"),
base_url=p_data.get("base_url"),
models=p_data.get("models", []),
)
if self._check_provider_available(provider):
self.providers.append(provider)
else:
logger.warning("Provider %s not available, skipping", provider.name)
self.providers.sort(key=lambda p: p.priority)
def _expand_env_vars(self, content: str) -> str:
"""Expand ${VAR} syntax in YAML content.
@@ -555,7 +564,6 @@ class CascadeRouter:
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
elif provider.type == "openai":
@@ -596,7 +604,6 @@ class CascadeRouter:
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None = None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Call Ollama API with multi-modal support."""
@@ -607,15 +614,13 @@ class CascadeRouter:
# Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages)
options = {"temperature": temperature}
if max_tokens:
options["num_predict"] = max_tokens
payload = {
"model": model,
"messages": transformed_messages,
"stream": False,
"options": options,
"options": {
"temperature": temperature,
},
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)
@@ -759,7 +764,7 @@ class CascadeRouter:
client = openai.AsyncOpenAI(
api_key=provider.api_key,
base_url=provider.base_url or settings.xai_base_url,
base_url=provider.base_url or "https://api.x.ai/v1",
timeout=httpx.Timeout(300.0),
)

View File

@@ -1,152 +0,0 @@
"""Provider health history — time-series snapshots for dashboard visualization."""
import asyncio
import logging
import sqlite3
from datetime import UTC, datetime, timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
_store: "HealthHistoryStore | None" = None
class HealthHistoryStore:
"""Stores timestamped provider health snapshots in SQLite."""
def __init__(self, db_path: str = "data/router_history.db") -> None:
self.db_path = db_path
if db_path != ":memory:":
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
self._bg_task: asyncio.Task | None = None
def _init_schema(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
provider_name TEXT NOT NULL,
status TEXT NOT NULL,
error_rate REAL NOT NULL,
avg_latency_ms REAL NOT NULL,
circuit_state TEXT NOT NULL,
total_requests INTEGER NOT NULL
)
""")
self._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_snapshots_ts
ON snapshots(timestamp)
""")
self._conn.commit()
def record_snapshot(self, providers: list[dict]) -> None:
"""Record a health snapshot for all providers."""
ts = datetime.now(UTC).isoformat()
rows = [
(
ts,
p["name"],
p["status"],
p["error_rate"],
p["avg_latency_ms"],
p["circuit_state"],
p["total_requests"],
)
for p in providers
]
self._conn.executemany(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
rows,
)
self._conn.commit()
def get_history(self, hours: int = 24) -> list[dict]:
"""Return snapshots from the last N hours, grouped by timestamp."""
cutoff = (datetime.now(UTC) - timedelta(hours=hours)).isoformat()
rows = self._conn.execute(
"""SELECT timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests
FROM snapshots WHERE timestamp >= ? ORDER BY timestamp""",
(cutoff,),
).fetchall()
# Group by timestamp
snapshots: dict[str, list[dict]] = {}
for row in rows:
ts = row["timestamp"]
if ts not in snapshots:
snapshots[ts] = []
snapshots[ts].append(
{
"name": row["provider_name"],
"status": row["status"],
"error_rate": row["error_rate"],
"avg_latency_ms": row["avg_latency_ms"],
"circuit_state": row["circuit_state"],
"total_requests": row["total_requests"],
}
)
return [{"timestamp": ts, "providers": providers} for ts, providers in snapshots.items()]
def prune(self, keep_hours: int = 168) -> int:
"""Remove snapshots older than keep_hours. Returns rows deleted."""
cutoff = (datetime.now(UTC) - timedelta(hours=keep_hours)).isoformat()
cursor = self._conn.execute("DELETE FROM snapshots WHERE timestamp < ?", (cutoff,))
self._conn.commit()
return cursor.rowcount
def close(self) -> None:
"""Close the database connection."""
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
self._conn.close()
def _capture_snapshot(self, cascade_router) -> None: # noqa: ANN001
"""Capture current provider state as a snapshot."""
providers = []
for p in cascade_router.providers:
providers.append(
{
"name": p.name,
"status": p.status.value,
"error_rate": round(p.metrics.error_rate, 4),
"avg_latency_ms": round(p.metrics.avg_latency_ms, 2),
"circuit_state": p.circuit_state.value,
"total_requests": p.metrics.total_requests,
}
)
self.record_snapshot(providers)
async def start_background_task(
self,
cascade_router,
interval_seconds: int = 60, # noqa: ANN001
) -> None:
"""Start periodic snapshot capture."""
async def _loop() -> None:
while True:
try:
self._capture_snapshot(cascade_router)
logger.debug("Recorded health snapshot")
except Exception:
logger.exception("Failed to record health snapshot")
await asyncio.sleep(interval_seconds)
self._bg_task = asyncio.create_task(_loop())
logger.info("Health history background task started (interval=%ds)", interval_seconds)
def get_history_store() -> HealthHistoryStore:
"""Get or create the singleton history store."""
global _store # noqa: PLW0603
if _store is None:
_store = HealthHistoryStore()
return _store

View File

@@ -515,36 +515,25 @@ class DiscordVendor(ChatPlatform):
async def _handle_message(self, message) -> None:
"""Process an incoming message and respond via a thread."""
content = self._extract_content(message)
if not content:
return
thread = await self._get_or_create_thread(message)
target = thread or message.channel
session_id = f"discord_{thread.id}" if thread else f"discord_{message.channel.id}"
run_output, response = await self._invoke_agent(content, session_id, target)
if run_output is not None:
await self._handle_paused_run(run_output, target, session_id)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
await self._send_response(response, target)
def _extract_content(self, message) -> str:
"""Strip the bot mention and return clean message text."""
# Strip the bot mention from the message content
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
return content
async def _invoke_agent(self, content: str, session_id: str, target):
"""Run chat_with_tools with a typing indicator and timeout.
if not content:
return
Returns a (run_output, error_response) tuple. On success the
error_response is ``None``; on failure run_output is ``None``.
"""
# Create or reuse a thread for this conversation
thread = await self._get_or_create_thread(message)
target = thread or message.channel
# Derive session_id for per-conversation history via Agno's SQLite
if thread:
session_id = f"discord_{thread.id}"
else:
session_id = f"discord_{message.channel.id}"
# Run Timmy agent with typing indicator and timeout
run_output = None
response = None
try:
@@ -559,57 +548,51 @@ class DiscordVendor(ChatPlatform):
except Exception as exc:
logger.error("Discord: chat_with_tools() failed: %s", exc)
response = "I'm having trouble reaching my inference backend right now. Please try again shortly."
return run_output, response
async def _handle_paused_run(self, run_output, target, session_id: str) -> None:
"""If Agno paused the run for tool confirmation, enqueue approvals."""
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
# Check if Agno paused the run for tool confirmation
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if not (is_paused and getattr(run_output, "active_requirements", None)):
return
if is_paused and getattr(run_output, "active_requirements", None):
from config import settings
from config import settings
if settings.discord_confirm_actions:
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
if not settings.discord_confirm_actions:
return
from timmy.approvals import create_item
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
from timmy.approvals import create_item
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
@staticmethod
async def _send_response(response: str | None, target) -> None:
"""Send a response to Discord, chunked to the 2000-char limit."""
if not response or not response.strip():
return
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
# Discord has a 2000 character limit — send with error handling
if response and response.strip():
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
async def _get_or_create_thread(self, message):
"""Get the active thread for a channel, or create one.

View File

@@ -1 +0,0 @@
"""Lightning Network integration for tool-usage micro-payments."""

View File

@@ -1,69 +0,0 @@
"""Lightning backend factory.
Returns a mock or real LND backend based on ``settings.lightning_backend``.
"""
from __future__ import annotations
import hashlib
import logging
import secrets
from dataclasses import dataclass
from config import settings
logger = logging.getLogger(__name__)
@dataclass
class Invoice:
"""Minimal Lightning invoice representation."""
payment_hash: str
payment_request: str
amount_sats: int
memo: str
class MockBackend:
"""In-memory mock Lightning backend for development and testing."""
def create_invoice(self, amount_sats: int, memo: str = "") -> Invoice:
"""Create a fake invoice with a random payment hash."""
raw = secrets.token_bytes(32)
payment_hash = hashlib.sha256(raw).hexdigest()
payment_request = f"lnbc{amount_sats}mock{payment_hash[:20]}"
logger.debug("Mock invoice: %s sats — %s", amount_sats, payment_hash[:12])
return Invoice(
payment_hash=payment_hash,
payment_request=payment_request,
amount_sats=amount_sats,
memo=memo,
)
# Singleton — lazily created
_backend: MockBackend | None = None
def get_backend() -> MockBackend:
"""Return the configured Lightning backend (currently mock-only).
Raises ``ValueError`` if an unsupported backend is requested.
"""
global _backend # noqa: PLW0603
if _backend is not None:
return _backend
kind = settings.lightning_backend
if kind == "mock":
_backend = MockBackend()
elif kind == "lnd":
# LND gRPC integration is on the roadmap — for now fall back to mock.
logger.warning("LND backend not yet implemented — using mock")
_backend = MockBackend()
else:
raise ValueError(f"Unknown lightning_backend: {kind!r}")
logger.info("Lightning backend: %s", kind)
return _backend

View File

@@ -1,146 +0,0 @@
"""In-memory Lightning transaction ledger.
Tracks invoices, settlements, and balances per the schema in
``docs/adr/018-lightning-ledger.md``. Uses a simple in-memory list so the
dashboard can display real (ephemeral) data without requiring SQLite yet.
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
logger = logging.getLogger(__name__)
class TxType(StrEnum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(StrEnum):
pending = "pending"
settled = "settled"
failed = "failed"
expired = "expired"
@dataclass
class LedgerEntry:
"""Single ledger row matching the ADR-018 schema."""
id: str
tx_type: TxType
status: TxStatus
payment_hash: str
amount_sats: int
memo: str
source: str
created_at: str
invoice: str = ""
preimage: str = ""
task_id: str = ""
agent_id: str = ""
settled_at: str = ""
fee_sats: int = 0
# ── In-memory store ──────────────────────────────────────────────────
_entries: list[LedgerEntry] = []
def create_invoice_entry(
payment_hash: str,
amount_sats: int,
memo: str = "",
source: str = "tool_usage",
task_id: str = "",
agent_id: str = "",
invoice: str = "",
) -> LedgerEntry:
"""Record a new incoming invoice in the ledger."""
entry = LedgerEntry(
id=uuid.uuid4().hex[:16],
tx_type=TxType.incoming,
status=TxStatus.pending,
payment_hash=payment_hash,
amount_sats=amount_sats,
memo=memo,
source=source,
task_id=task_id,
agent_id=agent_id,
invoice=invoice,
created_at=datetime.now(UTC).isoformat(),
)
_entries.append(entry)
logger.debug("Ledger entry created: %s (%s sats)", entry.id, amount_sats)
return entry
def mark_settled(payment_hash: str, preimage: str = "") -> LedgerEntry | None:
"""Mark a pending entry as settled by payment hash."""
for entry in _entries:
if entry.payment_hash == payment_hash and entry.status == TxStatus.pending:
entry.status = TxStatus.settled
entry.preimage = preimage
entry.settled_at = datetime.now(UTC).isoformat()
logger.debug("Ledger settled: %s", payment_hash[:12])
return entry
return None
def get_balance() -> dict:
"""Compute the current balance from settled and pending entries."""
incoming_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.settled
)
outgoing_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.settled
)
fees = sum(e.fee_sats for e in _entries if e.status == TxStatus.settled)
pending_in = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.pending
)
pending_out = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.pending
)
net = incoming_total - outgoing_total - fees
return {
"incoming_total_sats": incoming_total,
"outgoing_total_sats": outgoing_total,
"fees_paid_sats": fees,
"net_sats": net,
"pending_incoming_sats": pending_in,
"pending_outgoing_sats": pending_out,
"available_sats": net - pending_out,
}
def get_transactions(
tx_type: str | None = None,
status: str | None = None,
limit: int = 50,
) -> list[LedgerEntry]:
"""Return ledger entries, optionally filtered."""
result = _entries
if tx_type:
result = [e for e in result if e.tx_type.value == tx_type]
if status:
result = [e for e in result if e.status.value == status]
return list(reversed(result))[:limit]
def clear() -> None:
"""Reset the ledger (for testing)."""
_entries.clear()

View File

@@ -119,84 +119,75 @@ class BaseAgent(ABC):
"""
pass
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_TRANSIENT = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
async def run(self, message: str) -> str:
"""Run the agent with a message.
async def run(self, message: str, *, max_retries: int = 3) -> str:
"""Run the agent with a message, retrying on transient failures.
Retries on transient failures (connection errors, timeouts) with
exponential backoff. GPU contention from concurrent Ollama
requests causes ReadError / ReadTimeout — these are transient
and should be retried, not raised immediately (#70).
GPU contention from concurrent Ollama requests causes ReadError /
ReadTimeout — these are transient and retried with exponential
backoff (#70).
Returns:
Agent response
"""
response = await self._run_with_retries(message, max_retries)
await self._emit_response_event(message, response)
return response
max_retries = 3
last_exception = None
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_transient = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
async def _run_with_retries(self, message: str, max_retries: int) -> str:
"""Execute agent.run() with retry logic for transient errors."""
for attempt in range(1, max_retries + 1):
try:
result = self.agent.run(message, stream=False)
return result.content if hasattr(result, "content") else str(result)
except self._TRANSIENT as exc:
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=True,
)
await asyncio.sleep(min(2**attempt, 16))
response = result.content if hasattr(result, "content") else str(result)
break # Success, exit the retry loop
except _transient as exc:
last_exception = exc
if attempt < max_retries:
# Contention backoff — longer waits because the GPU
# needs time to finish the other request.
wait = min(2**attempt, 16)
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting %ds before retry...",
attempt,
max_retries,
type(exc).__name__,
wait,
)
await asyncio.sleep(wait)
else:
logger.error(
"Ollama unreachable after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
except Exception as exc:
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=False,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
# Unreachable — _handle_retry_or_raise raises on last attempt.
raise RuntimeError("retry loop exited unexpectedly") # pragma: no cover
last_exception = exc
if attempt < max_retries:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
else:
logger.error(
"Agent run failed after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
@staticmethod
def _handle_retry_or_raise(
exc: Exception,
attempt: int,
max_retries: int,
*,
transient: bool,
) -> None:
"""Log a retry warning or raise after exhausting attempts."""
if attempt < max_retries:
if transient:
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting before retry...",
attempt,
max_retries,
type(exc).__name__,
)
else:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
else:
label = "Ollama unreachable" if transient else "Agent run failed"
logger.error("%s after %d attempts: %s", label, max_retries, exc)
raise exc
async def _emit_response_event(self, message: str, response: str) -> None:
"""Publish a completion event to the event bus if connected."""
# Emit completion event
if self.event_bus:
await self.event_bus.publish(
Event(
@@ -206,6 +197,8 @@ class BaseAgent(ABC):
)
)
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools

View File

@@ -99,27 +99,23 @@ class GrokBackend:
def _get_client(self):
"""Create OpenAI client configured for xAI endpoint."""
from config import settings
import httpx
from openai import OpenAI
return OpenAI(
api_key=self._api_key,
base_url=settings.xai_base_url,
base_url="https://api.x.ai/v1",
timeout=httpx.Timeout(300.0),
)
async def _get_async_client(self):
"""Create async OpenAI client configured for xAI endpoint."""
from config import settings
import httpx
from openai import AsyncOpenAI
return AsyncOpenAI(
api_key=self._api_key,
base_url=settings.xai_base_url,
base_url="https://api.x.ai/v1",
timeout=httpx.Timeout(300.0),
)

View File

@@ -37,35 +37,6 @@ def _is_interactive() -> bool:
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _prompt_interactive(req, tool_name: str, tool_args: dict) -> None:
"""Display tool details and prompt the human for approval."""
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
if typer.confirm("Allow this action?", default=False):
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
"""Auto-approve allowlisted tools; reject everything else."""
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
@@ -80,7 +51,6 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
decide = _prompt_interactive if interactive else _decide_autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
@@ -96,10 +66,39 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
for req in reqs:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
decide(req, tool_name, tool_args)
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
# Resume the run so the agent sees the confirmation result
try:
@@ -139,7 +138,7 @@ def think(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
@@ -202,7 +201,7 @@ def chat(
session_id = str(uuid.uuid4())
else:
session_id = _CLI_SESSION_ID
timmy = create_timmy(backend=backend, session_id=session_id)
timmy = create_timmy(backend=backend, model_size=model_size, session_id=session_id)
# Use agent.run() so we can intercept paused runs for tool confirmation.
run_output = timmy.run(message_str, stream=False, session_id=session_id)
@@ -279,7 +278,7 @@ def status(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Print Timmy's operational status."""
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy.print_response(STATUS_PROMPT, stream=False, session_id=_CLI_SESSION_ID)

View File

@@ -174,8 +174,15 @@ class ConversationManager:
return None
_TOOL_KEYWORDS = frozenset(
{
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
# Tool keywords that suggest tool usage is needed
tool_keywords = [
"search",
"look up",
"find",
@@ -196,11 +203,10 @@ class ConversationManager:
"shell",
"command",
"install",
}
)
]
_CHAT_ONLY_KEYWORDS = frozenset(
{
# Chat-only keywords that definitely don't need tools
chat_only = [
"hello",
"hi ",
"hey",
@@ -215,47 +221,30 @@ class ConversationManager:
"goodbye",
"tell me about yourself",
"what can you do",
}
)
]
_SIMPLE_QUESTION_PREFIXES = ("what is", "who is", "how does", "why is", "when did", "where is")
_TIME_WORDS = ("today", "now", "current", "latest", "this week", "this month")
# Check for chat-only patterns first
for pattern in chat_only:
if pattern in message_lower:
return False
def _is_chat_only(self, message_lower: str) -> bool:
"""Return True if the message matches a chat-only pattern."""
return any(kw in message_lower for kw in self._CHAT_ONLY_KEYWORDS)
# Check for tool keywords
for keyword in tool_keywords:
if keyword in message_lower:
return True
def _has_tool_keyword(self, message_lower: str) -> bool:
"""Return True if the message contains a tool-related keyword."""
return any(kw in message_lower for kw in self._TOOL_KEYWORDS)
def _is_simple_question(self, message_lower: str) -> bool | None:
"""Check if message is a simple question.
Returns True if it needs tools (real-time info), False if it
doesn't, or None if the message isn't a simple question.
"""
for prefix in self._SIMPLE_QUESTION_PREFIXES:
if message_lower.startswith(prefix):
return any(t in message_lower for t in self._TIME_WORDS)
return None
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
if self._is_chat_only(message_lower):
return False
if self._has_tool_keyword(message_lower):
return True
simple = self._is_simple_question(message_lower)
if simple is not None:
return simple
# Simple questions (starting with what, who, how, why, when, where)
# usually don't need tools unless about current/real-time info
simple_question_words = ["what is", "who is", "how does", "why is", "when did", "where is"]
for word in simple_question_words:
if message_lower.startswith(word):
# Check if it's asking about current/real-time info
time_words = ["today", "now", "current", "latest", "this week", "this month"]
if any(t in message_lower for t in time_words):
return True
return False
# Default: don't use tools for unclear cases
return False

View File

@@ -21,16 +21,12 @@ Usage::
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from PIL import ImageDraw
import os
import shutil
import sqlite3
import uuid
from contextlib import closing
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
import httpx
@@ -196,7 +192,7 @@ def _bridge_to_work_order(title: str, body: str, category: str) -> None:
body,
category,
"timmy-thinking",
datetime.now(UTC).isoformat(),
datetime.utcnow().isoformat(),
),
)
conn.commit()
@@ -204,61 +200,15 @@ def _bridge_to_work_order(title: str, body: str, category: str) -> None:
logger.debug("Work order bridge failed: %s", exc)
async def _ensure_issue_session():
"""Get or create the cached MCP session, connecting if needed.
Returns the connected ``MCPTools`` instance.
"""
from agno.tools.mcp import MCPTools
global _issue_session
if _issue_session is None:
_issue_session = MCPTools(
server_params=_gitea_server_params(),
timeout_seconds=settings.mcp_timeout,
)
if not getattr(_issue_session, "_connected", False):
await _issue_session.connect()
_issue_session._connected = True
return _issue_session
def _build_issue_body(body: str) -> str:
"""Append the auto-filing signature to the issue body."""
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n*Auto-filed by Timmy's thinking engine*"
return full_body
def _build_issue_args(title: str, full_body: str) -> dict:
"""Build MCP tool arguments for ``issue_write`` with method=create."""
owner, repo = settings.gitea_repo.split("/", 1)
return {
"method": "create",
"owner": owner,
"repo": repo,
"title": title,
"body": full_body,
}
def _category_from_labels(labels: str) -> str:
"""Derive a work-order category from comma-separated label names."""
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
return "bug" if "bug" in label_list else "suggestion"
async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "") -> str:
"""File a Gitea issue via the MCP server (standalone, no LLM loop).
Used by the thinking engine's ``_maybe_file_issues()`` post-hook.
Manages its own MCPTools session with lazy connect + graceful failure.
Uses ``tools.session.call_tool()`` for direct MCP invocation — the
``MCPTools`` wrapper itself does not expose ``call_tool()``.
Args:
title: Issue title.
body: Issue body (markdown).
@@ -271,13 +221,46 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return "Gitea integration is not configured."
try:
session = await _ensure_issue_session()
full_body = _build_issue_body(body)
args = _build_issue_args(title, full_body)
from agno.tools.mcp import MCPTools
result = await session.session.call_tool("issue_write", arguments=args)
global _issue_session
_bridge_to_work_order(title, body, _category_from_labels(labels))
if _issue_session is None:
_issue_session = MCPTools(
server_params=_gitea_server_params(),
timeout_seconds=settings.mcp_timeout,
)
# Ensure connected
if not getattr(_issue_session, "_connected", False):
await _issue_session.connect()
_issue_session._connected = True
# Append auto-filing signature
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n*Auto-filed by Timmy's thinking engine*"
# Parse owner/repo from settings
owner, repo = settings.gitea_repo.split("/", 1)
# Build tool arguments — gitea-mcp uses issue_write with method="create"
args = {
"method": "create",
"owner": owner,
"repo": repo,
"title": title,
"body": full_body,
}
# Call via the underlying MCP session (MCPTools doesn't expose call_tool)
result = await _issue_session.session.call_tool("issue_write", arguments=args)
# Bridge to local work order
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
category = "bug" if "bug" in label_list else "suggestion"
_bridge_to_work_order(title, body, category)
logger.info("Created Gitea issue via MCP: %s", title[:60])
return f"Created issue: {title}\n{result}"
@@ -287,8 +270,20 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return f"Failed to create issue via MCP: {exc}"
def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None:
"""Draw radial gradient background with concentric circles."""
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
# Background gradient effect — concentric circles
for i in range(size // 2, 0, -4):
g = int(25 + (i / (size // 2)) * 30)
draw.ellipse(
@@ -296,45 +291,33 @@ def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None:
fill=(10, g, 20),
)
def _draw_wizard(draw: ImageDraw.ImageDraw) -> None:
"""Draw wizard hat, face, eyes, smile, monogram, and robe."""
# Wizard hat (triangle)
hat_color = (100, 50, 160) # purple
hat_outline = (180, 130, 255)
gold = (220, 190, 50)
pupil = (30, 30, 60)
# Hat + brim
draw.polygon([(256, 40), (160, 220), (352, 220)], fill=hat_color, outline=hat_outline)
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=hat_outline)
# Face
draw.ellipse([190, 220, 322, 370], fill=(60, 180, 100), outline=(80, 220, 120))
# Eyes (whites + pupils)
draw.ellipse([220, 275, 248, 310], fill=(255, 255, 255))
draw.ellipse([264, 275, 292, 310], fill=(255, 255, 255))
draw.ellipse([228, 285, 242, 300], fill=pupil)
draw.ellipse([272, 285, 286, 300], fill=pupil)
# Smile
draw.arc([225, 300, 287, 355], start=10, end=170, fill=pupil, width=3)
# "T" monogram on hat
draw.text((243, 100), "T", fill=gold)
# Robe
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
[(256, 40), (160, 220), (352, 220)],
fill=hat_color,
outline=(180, 130, 255),
)
# Hat brim
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=(180, 130, 255))
def _draw_stars(draw: ImageDraw.ImageDraw) -> None:
"""Draw decorative gold stars around the wizard hat."""
# Face circle
draw.ellipse([190, 220, 322, 370], fill=(60, 180, 100), outline=(80, 220, 120))
# Eyes
draw.ellipse([220, 275, 248, 310], fill=(255, 255, 255))
draw.ellipse([264, 275, 292, 310], fill=(255, 255, 255))
draw.ellipse([228, 285, 242, 300], fill=(30, 30, 60))
draw.ellipse([272, 285, 286, 300], fill=(30, 30, 60))
# Smile
draw.arc([225, 300, 287, 355], start=10, end=170, fill=(30, 30, 60), width=3)
# Stars around the hat
gold = (220, 190, 50)
for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]:
star_positions = [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]
for sx, sy in star_positions:
r = 8
draw.polygon(
[
@@ -350,26 +333,18 @@ def _draw_stars(draw: ImageDraw.ImageDraw) -> None:
fill=gold,
)
# "T" monogram on the hat
draw.text((243, 100), "T", fill=gold)
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
# Robe / body
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
import io
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
_draw_background(draw, size)
_draw_wizard(draw)
_draw_stars(draw)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()

View File

@@ -78,88 +78,83 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
if "memories" not in tables:
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists but new one doesn't fully)
if not has_memories:
logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
conn.commit()
return
# Schema will be created above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old facts table
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
_migrate_episodes(conn, tables)
_migrate_chunks(conn, tables)
_drop_legacy_tables(conn, tables)
conn.commit()
def _migrate_episodes(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate episodes table rows into the unified memories table."""
if "episodes" not in tables:
return
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate chunks table rows into the unified memories table as vault_chunk."""
if "chunks" not in tables:
return
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_tables(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Drop old facts table if it exists."""
if "facts" not in tables:
return
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")

View File

@@ -46,64 +46,6 @@ DB_PATH = PROJECT_ROOT / "data" / "memory.db"
# ───────────────────────────────────────────────────────────────────────────────
_DEFAULT_HOT_MEMORY_TEMPLATE = """\
# Timmy Hot Memory
> Working RAM — always loaded, ~300 lines max, pruned monthly
> Last updated: {date}
---
## Current Status
**Agent State:** Operational
**Mode:** Development
**Active Tasks:** 0
**Pending Decisions:** None
---
## Standing Rules
1. **Sovereignty First** — No cloud dependencies
2. **Local-Only Inference** — Ollama on localhost
3. **Privacy by Design** — Telemetry disabled
4. **Tool Minimalism** — Use tools only when necessary
5. **Memory Discipline** — Write handoffs at session end
---
## Agent Roster
| Agent | Role | Status |
|-------|------|--------|
| Timmy | Core | Active |
---
## User Profile
**Name:** (not set)
**Interests:** (to be learned)
---
## Key Decisions
(none yet)
---
## Pending Actions
- [ ] Learn user's name
---
*Prune date: {prune_date}*
"""
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Get database connection to unified memory database."""
@@ -156,73 +98,6 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
@@ -235,16 +110,78 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
# Check if we need to migrate (old schema exists)
if not has_memories and (has_episodes or has_chunks or has_facts):
logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
if "episodes" in tables and has_memories:
_migrate_episodes(conn)
if "chunks" in tables and has_memories:
_migrate_chunks(conn)
if "facts" in tables:
_drop_legacy_table(conn, "facts")
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old tables
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
conn.commit()
@@ -361,85 +298,6 @@ def store_memory(
return entry
def _build_search_filters(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build SQL WHERE clause and params from search filters."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_filter(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
) -> list[MemoryEntry]:
"""Score candidate rows by similarity and filter by min_relevance."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results
def search_memories(
query: str,
limit: int = 10,
@@ -462,9 +320,65 @@ def search_memories(
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
where_clause, params = _build_search_filters(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
results = _score_and_filter(rows, query, query_embedding, min_relevance)
# Build query with filters
conditions = []
params = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
# Fetch candidates (we'll do in-memory similarity for now)
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(limit * 3) # Get more candidates for ranking
with get_connection() as conn:
rows = conn.execute(query_sql, params).fetchall()
# Compute similarity scores
results = []
for row in rows:
entry = MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
else:
# Fallback: check for keyword overlap
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
# Sort by relevance and return top results
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]
@@ -790,12 +704,66 @@ class HotMemory:
logger.debug(
"HotMemory._create_default() - creating default MEMORY.md for backward compatibility"
)
now = datetime.now(UTC)
content = _DEFAULT_HOT_MEMORY_TEMPLATE.format(
date=now.strftime("%Y-%m-%d"),
prune_date=now.replace(day=25).strftime("%Y-%m-%d"),
default_content = """# Timmy Hot Memory
> Working RAM — always loaded, ~300 lines max, pruned monthly
> Last updated: {date}
---
## Current Status
**Agent State:** Operational
**Mode:** Development
**Active Tasks:** 0
**Pending Decisions:** None
---
## Standing Rules
1. **Sovereignty First** — No cloud dependencies
2. **Local-Only Inference** — Ollama on localhost
3. **Privacy by Design** — Telemetry disabled
4. **Tool Minimalism** — Use tools only when necessary
5. **Memory Discipline** — Write handoffs at session end
---
## Agent Roster
| Agent | Role | Status |
|-------|------|--------|
| Timmy | Core | Active |
---
## User Profile
**Name:** (not set)
**Interests:** (to be learned)
---
## Key Decisions
(none yet)
---
## Pending Actions
- [ ] Learn user's name
---
*Prune date: {prune_date}*
""".format(
date=datetime.now(UTC).strftime("%Y-%m-%d"),
prune_date=(datetime.now(UTC).replace(day=25)).strftime("%Y-%m-%d"),
)
self.path.write_text(content)
self.path.write_text(default_content)
logger.info("HotMemory: Created default MEMORY.md")

View File

@@ -392,26 +392,31 @@ def _build_insights(
return insights or ["Conversations look healthy. Keep up the good work."]
def _format_recurring_topics(repeated: list[tuple[str, int]]) -> list[str]:
"""Format the recurring-topics section of a reflection report."""
if repeated:
lines = ["### Recurring Topics"]
for word, count in repeated:
lines.append(f'- "{word}" ({count} mentions)')
lines.append("")
return lines
return ["### Recurring Topics\nNo strong patterns detected.\n"]
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
Scans past session entries for patterns: low-confidence responses,
errors, repeated topics, and conversation quality signals. Returns
a structured reflection that Timmy can use to improve.
def _assemble_report(
entries: list[dict],
errors: list[dict],
timmy_msgs: list[dict],
user_msgs: list[dict],
low_conf: list[dict],
repeated: list[tuple[str, int]],
) -> str:
"""Assemble the full self-reflection report from analyzed data."""
Args:
limit: How many recent entries to review (default 30).
Returns:
A formatted self-reflection report.
"""
sl = get_session_logger()
sl.flush()
entries = sl.get_recent_entries(limit=limit)
if not entries:
return "No conversation history to reflect on yet."
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
# Build reflection report
sections: list[str] = ["## Self-Reflection Report\n"]
sections.append(
f"Reviewed {len(entries)} recent entries: "
@@ -441,37 +446,16 @@ def _assemble_report(
)
)
sections.extend(_format_recurring_topics(repeated))
if repeated:
sections.append("### Recurring Topics")
for word, count in repeated:
sections.append(f'- "{word}" ({count} mentions)')
sections.append("")
else:
sections.append("### Recurring Topics\nNo strong patterns detected.\n")
sections.append("### Insights")
for insight in _build_insights(low_conf, errors, repeated):
sections.append(f"- {insight}")
return "\n".join(sections)
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
Scans past session entries for patterns: low-confidence responses,
errors, repeated topics, and conversation quality signals. Returns
a structured reflection that Timmy can use to improve.
Args:
limit: How many recent entries to review (default 30).
Returns:
A formatted self-reflection report.
"""
sl = get_session_logger()
sl.flush()
entries = sl.get_recent_entries(limit=limit)
if not entries:
return "No conversation history to reflect on yet."
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
return _assemble_report(entries, errors, timmy_msgs, user_msgs, low_conf, repeated)

View File

@@ -341,11 +341,6 @@ class ThinkingEngine:
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
@@ -357,7 +352,7 @@ class ThinkingEngine:
if not content:
return None
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
@@ -1178,25 +1173,14 @@ class ThinkingEngine:
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
def _store_thought(self, content: str, seed_type: str) -> Thought:
"""Persist a thought to SQLite."""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=arrived_at or datetime.now(UTC).isoformat(),
created_at=datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
@@ -1277,53 +1261,6 @@ class ThinkingEngine:
logger.debug("Failed to broadcast thought: %s", exc)
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
@@ -1341,17 +1278,58 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 50))
try:
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
engine = thinking_engine
db_path = engine._db_path
# Build query with optional seed_type filter
with _get_conn(db_path) as conn:
if seed_type:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
return _format_thought_rows(rows, query, seed_type)
# Format results
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)

View File

@@ -909,35 +909,82 @@ def _experiment_tool_catalog() -> dict:
}
_CREATIVE_CATALOG_SOURCES: list[tuple[str, str, list[str]]] = [
("creative.tools.git_tools", "GIT_TOOL_CATALOG", ["forge", "helm", "orchestrator"]),
("creative.tools.image_tools", "IMAGE_TOOL_CATALOG", ["pixel", "orchestrator"]),
("creative.tools.music_tools", "MUSIC_TOOL_CATALOG", ["lyra", "orchestrator"]),
("creative.tools.video_tools", "VIDEO_TOOL_CATALOG", ["reel", "orchestrator"]),
("creative.director", "DIRECTOR_TOOL_CATALOG", ["orchestrator"]),
("creative.assembler", "ASSEMBLER_TOOL_CATALOG", ["reel", "orchestrator"]),
]
def _import_creative_catalogs(catalog: dict) -> None:
"""Import and merge creative tool catalogs from creative module."""
for module_path, attr_name, available_in in _CREATIVE_CATALOG_SOURCES:
_merge_catalog(catalog, module_path, attr_name, available_in)
def _merge_catalog(
catalog: dict, module_path: str, attr_name: str, available_in: list[str]
) -> None:
"""Import a single creative catalog and merge its entries."""
# ── Git tools ─────────────────────────────────────────────────────────────
try:
from importlib import import_module
from creative.tools.git_tools import GIT_TOOL_CATALOG
source_catalog = getattr(import_module(module_path), attr_name)
for tool_id, info in source_catalog.items():
for tool_id, info in GIT_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": available_in,
"available_in": ["forge", "helm", "orchestrator"],
}
except ImportError:
pass
# ── Image tools ────────────────────────────────────────────────────────────
try:
from creative.tools.image_tools import IMAGE_TOOL_CATALOG
for tool_id, info in IMAGE_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["pixel", "orchestrator"],
}
except ImportError:
pass
# ── Music tools ────────────────────────────────────────────────────────────
try:
from creative.tools.music_tools import MUSIC_TOOL_CATALOG
for tool_id, info in MUSIC_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["lyra", "orchestrator"],
}
except ImportError:
pass
# ── Video tools ────────────────────────────────────────────────────────────
try:
from creative.tools.video_tools import VIDEO_TOOL_CATALOG
for tool_id, info in VIDEO_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
}
except ImportError:
pass
# ── Creative pipeline ──────────────────────────────────────────────────────
try:
from creative.director import DIRECTOR_TOOL_CATALOG
for tool_id, info in DIRECTOR_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["orchestrator"],
}
except ImportError:
pass
# ── Assembler tools ───────────────────────────────────────────────────────
try:
from creative.assembler import ASSEMBLER_TOOL_CATALOG
for tool_id, info in ASSEMBLER_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
}
except ImportError:
pass

View File

@@ -89,31 +89,45 @@ def list_swarm_agents() -> dict[str, Any]:
}
def _find_kimi_cli() -> str | None:
"""Return the path to the kimi CLI binary, or None if not installed."""
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
Returns:
Dict with success status and Kimi's output or error.
"""
import shutil
return shutil.which("kimi")
def _resolve_workdir(working_directory: str) -> str | dict[str, Any]:
"""Return a validated working directory path, or an error dict."""
import subprocess
from pathlib import Path
from config import settings
kimi_path = shutil.which("kimi")
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = working_directory or settings.repo_root
if not Path(workdir).is_dir():
return {
"success": False,
"error": f"Working directory does not exist: {workdir}",
}
return workdir
cmd = [kimi_path, "--print", "-p", task]
def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"""Execute the kimi subprocess and return a result dict."""
import subprocess
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
try:
result = subprocess.run(
@@ -143,34 +157,3 @@ def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"success": False,
"error": f"Failed to run Kimi: {exc}",
}
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
Returns:
Dict with success status and Kimi's output or error.
"""
kimi_path = _find_kimi_cli()
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = _resolve_workdir(working_directory)
if isinstance(workdir, dict):
return workdir
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
return _run_kimi([kimi_path, "--print", "-p", task], workdir)

View File

@@ -326,46 +326,6 @@ def get_live_system_status() -> dict[str, Any]:
return result
def _build_pytest_cmd(venv_python: Path, scope: str) -> list[str]:
"""Build the pytest command list for the given scope."""
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
cmd.append(scope)
return cmd
def _parse_pytest_output(output: str) -> dict[str, int]:
"""Extract passed/failed/error counts from pytest output."""
import re
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
return {"passed": passed, "failed": failed, "errors": errors}
def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[str, Any]:
"""Run Timmy's own test suite and report results.
@@ -389,17 +349,49 @@ def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[s
if not venv_python.exists():
return {"success": False, "error": f"No venv found at {venv_python}"}
cmd = _build_pytest_cmd(venv_python, scope)
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
# Unit tests only — skip functional/e2e/integration
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
# Specific path
cmd.append(scope)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120, cwd=repo)
output = result.stdout + result.stderr
counts = _parse_pytest_output(output)
# Parse pytest output for counts
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
import re
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
return {
"success": result.returncode == 0,
**counts,
"total": counts["passed"] + counts["failed"] + counts["errors"],
"passed": passed,
"failed": failed,
"errors": errors,
"total": passed + failed + errors,
"return_code": result.returncode,
"summary": output[-2000:] if len(output) > 2000 else output,
}

View File

@@ -78,11 +78,6 @@ DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice"
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
@dataclass
class VoiceConfig:
"""Configuration for the voice loop."""
@@ -166,6 +161,13 @@ class VoiceLoop:
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
def _rms(block: np.ndarray) -> float:
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
@@ -175,69 +177,42 @@ class VoiceLoop:
dtype="float32",
blocksize=block_size,
) as stream:
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
return self._finalize_utterance(chunks, min_blocks, sr)
rms = _rms(block)
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or max length.
Returns the list of captured audio chunks (may be empty).
"""
chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
silent_count = 0
audio_chunks.append(block.copy())
if silent_count >= silence_blocks:
break
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if len(chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
# End of utterance
if silent_count >= silence_blocks:
break
return chunks
# Safety cap
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate recorded chunks and report duration.
Returns ``None`` if the utterance is too short to be meaningful.
"""
if not chunks or len(chunks) < min_blocks:
if not audio_chunks or len(audio_chunks) < min_blocks:
return None
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
audio = np.concatenate(audio_chunks, axis=0).flatten()
duration = len(audio) / sr
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
@@ -394,33 +369,15 @@ class VoiceLoop:
# ── Main Loop ───────────────────────────────────────────────────────
# Whisper hallucinates these on silence/noise — skip them.
_WHISPER_HALLUCINATIONS = frozenset(
{
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
}
)
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
# Spoken phrases that end the voice session.
_EXIT_COMMANDS = frozenset(
{
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
}
)
# Suppress MCP / Agno stderr noise during voice mode.
_suppress_mcp_noise()
# Suppress MCP async-generator teardown tracebacks on exit.
_install_quiet_asyncgen_hooks()
def _log_banner(self) -> None:
"""Log the startup banner with STT/TTS/LLM configuration."""
tts_label = (
"macOS say"
if self.config.use_say_fallback
@@ -436,50 +393,52 @@ class VoiceLoop:
" Press Ctrl-C to exit.\n" + "=" * 60
)
def _is_hallucination(self, text: str) -> bool:
"""Return True if *text* is a known Whisper hallucination."""
return not text or text.lower() in self._WHISPER_HALLUCINATIONS
def _is_exit_command(self, text: str) -> bool:
"""Return True if the user asked to stop the voice session."""
return text.lower().strip().rstrip(".!") in self._EXIT_COMMANDS
def _process_turn(self, text: str) -> None:
"""Handle a single listen-think-speak turn after transcription."""
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
self._speak(response)
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
_suppress_mcp_noise()
_install_quiet_asyncgen_hooks()
self._log_banner()
self._running = True
try:
while self._running:
# 1. LISTEN — record until silence
audio = self._record_utterance()
if audio is None:
continue
# 2. TRANSCRIBE — Whisper STT
text = self._transcribe(audio)
if self._is_hallucination(text):
if not text or text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
):
# Whisper hallucinations on silence/noise
logger.debug("Ignoring likely Whisper hallucination: '%s'", text)
continue
if self._is_exit_command(text):
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
# Exit commands
if text.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
):
logger.info("👋 Goodbye!")
break
self._process_turn(text)
# 3. THINK — send to Timmy
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
# 4. SPEAK — TTS output
self._speak(response)
except KeyboardInterrupt:
logger.info("👋 Voice loop stopped.")

View File

@@ -86,40 +86,6 @@ def _pip_snapshot(mood: str, confidence: float) -> dict:
return pip_familiar.snapshot().to_dict()
def _resolve_mood(state) -> str:
"""Map cognitive mood/engagement to a presence mood string."""
if state.engagement == "idle" and state.mood == "settled":
return "calm"
return _MOOD_MAP.get(state.mood, "calm")
def _resolve_confidence(state) -> float:
"""Compute normalised confidence from cognitive tracker state."""
if state._confidence_count > 0:
raw = state._confidence_sum / state._confidence_count
else:
raw = 0.7
return round(max(0.0, min(1.0, raw)), 2)
def _build_active_threads(state) -> list[dict]:
"""Convert active commitments into presence thread dicts."""
return [
{"type": "thinking", "ref": c[:80], "status": "active"}
for c in state.active_commitments[:10]
]
def _build_environment() -> dict:
"""Return the environment section using local wall-clock time."""
local_now = datetime.now()
return {
"time_of_day": _time_of_day(local_now.hour),
"local_time": local_now.strftime("%-I:%M %p"),
"day_of_week": local_now.strftime("%A"),
}
def get_state_dict() -> dict:
"""Build presence state dict from current cognitive state.
@@ -132,19 +98,37 @@ def get_state_dict() -> dict:
state = cognitive_tracker.get_state()
now = datetime.now(UTC)
mood = _resolve_mood(state)
confidence = _resolve_confidence(state)
# Map cognitive mood to presence mood
mood = _MOOD_MAP.get(state.mood, "calm")
if state.engagement == "idle" and state.mood == "settled":
mood = "calm"
# Confidence from cognitive tracker
if state._confidence_count > 0:
confidence = state._confidence_sum / state._confidence_count
else:
confidence = 0.7
# Build active threads from commitments
threads = []
for commitment in state.active_commitments[:10]:
threads.append({"type": "thinking", "ref": commitment[:80], "status": "active"})
# Activity
activity = _ACTIVITY_MAP.get(state.engagement, "idle")
# Environment
local_now = datetime.now()
return {
"version": 1,
"liveness": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"current_focus": state.focus_topic or "",
"active_threads": _build_active_threads(state),
"active_threads": threads,
"recent_events": [],
"concerns": [],
"mood": mood,
"confidence": confidence,
"confidence": round(max(0.0, min(1.0, confidence)), 2),
"energy": round(_current_energy(), 2),
"identity": {
"name": "Timmy",
@@ -159,7 +143,11 @@ def get_state_dict() -> dict:
"visitor_present": False,
"conversation_turns": state.conversation_depth,
},
"environment": _build_environment(),
"environment": {
"time_of_day": _time_of_day(local_now.hour),
"local_time": local_now.strftime("%-I:%M %p"),
"day_of_week": local_now.strftime("%A"),
},
"familiar": _pip_snapshot(mood, confidence),
"meta": {
"schema_version": 1,

View File

@@ -2493,57 +2493,3 @@
.db-cell { max-width: 300px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
.db-cell:hover { white-space: normal; word-break: break-all; }
.db-truncated { font-size: 0.7rem; color: var(--amber); padding: 0.3rem 0; }
/* ── Tower ────────────────────────────────────────────────────────────── */
.tower-container { max-width: 1400px; margin: 0 auto; }
.tower-header { margin-bottom: 1rem; }
.tower-title { font-size: 1.6rem; font-weight: 700; color: var(--green); letter-spacing: 0.15em; }
.tower-subtitle { font-size: 0.85rem; color: var(--text-dim); }
.tower-conn-badge { font-size: 0.7rem; font-weight: 600; padding: 2px 8px; border-radius: 3px; letter-spacing: 0.08em; }
.tower-conn-live { color: var(--green); border: 1px solid var(--green); }
.tower-conn-offline { color: var(--red); border: 1px solid var(--red); }
.tower-conn-connecting { color: var(--amber); border: 1px solid var(--amber); }
.tower-phase-card { min-height: 300px; }
.tower-phase-thinking { border-left: 3px solid var(--purple); }
.tower-phase-predicting { border-left: 3px solid var(--orange); }
.tower-phase-advising { border-left: 3px solid var(--green); }
.tower-scroll { max-height: 50vh; overflow-y: auto; }
.tower-empty { text-align: center; color: var(--text-dim); padding: 16px; font-size: 0.85rem; }
.tower-stat-grid { display: grid; grid-template-columns: repeat(4, 1fr); gap: 0.5rem; text-align: center; }
.tower-stat-label { display: block; font-size: 0.65rem; color: var(--text-dim); letter-spacing: 0.1em; }
.tower-stat-value { display: block; font-size: 1.1rem; font-weight: 700; color: var(--text-bright); }
.tower-event { padding: 8px; margin-bottom: 6px; border-left: 3px solid var(--border); border-radius: 3px; background: var(--bg-card); }
.tower-etype-task_posted { border-left-color: var(--purple); }
.tower-etype-bid_submitted { border-left-color: var(--orange); }
.tower-etype-task_completed { border-left-color: var(--green); }
.tower-etype-task_failed { border-left-color: var(--red); }
.tower-etype-agent_joined { border-left-color: var(--purple); }
.tower-etype-tool_executed { border-left-color: var(--amber); }
.tower-ev-head { display: flex; justify-content: space-between; align-items: center; margin-bottom: 4px; }
.tower-ev-badge { font-size: 0.65rem; font-weight: 600; color: var(--text-bright); letter-spacing: 0.08em; }
.tower-ev-dots { font-size: 0.6rem; color: var(--amber); }
.tower-ev-desc { font-size: 0.8rem; color: var(--text); }
.tower-ev-time { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
.tower-pred { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--orange); }
.tower-pred-done { border-left-color: var(--green); }
.tower-pred-pending { border-left-color: var(--amber); }
.tower-pred-head { display: flex; justify-content: space-between; align-items: center; }
.tower-pred-task { font-size: 0.75rem; font-weight: 600; color: var(--text-bright); font-family: monospace; }
.tower-pred-acc { font-size: 0.75rem; font-weight: 700; }
.tower-pred-detail { font-size: 0.75rem; color: var(--text-dim); margin-top: 4px; }
.tower-advisory { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--border); }
.tower-adv-high { border-left-color: var(--red); }
.tower-adv-medium { border-left-color: var(--orange); }
.tower-adv-low { border-left-color: var(--green); }
.tower-adv-head { display: flex; justify-content: space-between; font-size: 0.65rem; margin-bottom: 4px; }
.tower-adv-cat { font-weight: 600; color: var(--text-dim); letter-spacing: 0.08em; }
.tower-adv-prio { font-weight: 700; color: var(--amber); }
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }

View File

@@ -1,100 +0,0 @@
"""Tests that CSRF rejection does NOT execute the endpoint handler.
Regression test for #626: the middleware was calling call_next() before
checking @csrf_exempt, causing side effects even on CSRF-rejected requests.
"""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from dashboard.middleware.csrf import CSRFMiddleware, csrf_exempt
class TestCSRFNoSideEffects:
"""Verify endpoints are NOT executed when CSRF validation fails."""
@pytest.fixture(autouse=True)
def enable_csrf(self):
"""Re-enable CSRF for these tests."""
from config import settings
original = settings.timmy_disable_csrf
settings.timmy_disable_csrf = False
yield
settings.timmy_disable_csrf = original
def test_protected_endpoint_not_executed_on_csrf_failure(self):
"""A protected endpoint must NOT run when CSRF token is missing.
Before the fix, the middleware called call_next() to resolve the
endpoint, executing its side effects before returning 403.
"""
app = FastAPI()
app.add_middleware(CSRFMiddleware)
side_effect_log = []
@app.post("/transfer")
def transfer_money():
side_effect_log.append("money_transferred")
return {"message": "transferred"}
client = TestClient(app)
response = client.post("/transfer")
assert response.status_code == 403
assert side_effect_log == [], (
"Endpoint was executed despite CSRF failure — side effects occurred!"
)
def test_csrf_exempt_endpoint_still_executes(self):
"""A @csrf_exempt endpoint should still execute without a CSRF token."""
app = FastAPI()
app.add_middleware(CSRFMiddleware)
side_effect_log = []
@app.post("/webhook-handler")
@csrf_exempt
def webhook_handler():
side_effect_log.append("webhook_processed")
return {"message": "processed"}
client = TestClient(app)
response = client.post("/webhook-handler")
assert response.status_code == 200
assert side_effect_log == ["webhook_processed"]
def test_exempt_and_protected_no_cross_contamination(self):
"""Mixed exempt/protected: only exempt endpoints execute without tokens."""
app = FastAPI()
app.add_middleware(CSRFMiddleware)
execution_log = []
@app.post("/safe-webhook")
@csrf_exempt
def safe_webhook():
execution_log.append("safe")
return {"message": "safe"}
@app.post("/dangerous-action")
def dangerous_action():
execution_log.append("dangerous")
return {"message": "danger"}
client = TestClient(app)
# Exempt endpoint runs
resp1 = client.post("/safe-webhook")
assert resp1.status_code == 200
# Protected endpoint blocked WITHOUT executing
resp2 = client.post("/dangerous-action")
assert resp2.status_code == 403
assert execution_log == ["safe"], (
f"Expected only 'safe' execution, got: {execution_log}"
)

View File

@@ -1,187 +0,0 @@
"""Tests for Tower dashboard route (/tower)."""
from unittest.mock import MagicMock, patch
def _mock_spark_engine():
"""Return a mock spark_engine with realistic return values."""
engine = MagicMock()
engine.status.return_value = {
"enabled": True,
"events_captured": 5,
"memories_stored": 3,
"predictions": {"total": 2, "avg_accuracy": 0.85},
"event_types": {
"task_posted": 2,
"bid_submitted": 1,
"task_assigned": 1,
"task_completed": 1,
"task_failed": 0,
"agent_joined": 0,
"tool_executed": 0,
"creative_step": 0,
},
}
event = MagicMock()
event.event_type = "task_completed"
event.description = "Task finished"
event.importance = 0.8
event.created_at = "2026-01-01T00:00:00"
event.agent_id = "agent-1234-abcd"
event.task_id = "task-5678-efgh"
event.data = '{"result": "ok"}'
engine.get_timeline.return_value = [event]
pred = MagicMock()
pred.task_id = "task-5678-efgh"
pred.accuracy = 0.9
pred.evaluated_at = "2026-01-01T01:00:00"
pred.created_at = "2026-01-01T00:30:00"
pred.predicted_value = '{"outcome": "success"}'
engine.get_predictions.return_value = [pred]
advisory = MagicMock()
advisory.category = "performance"
advisory.priority = "high"
advisory.title = "Slow tasks"
advisory.detail = "Tasks taking longer than expected"
advisory.suggested_action = "Scale up workers"
engine.get_advisories.return_value = [advisory]
return engine
class TestTowerUI:
"""Tests for GET /tower endpoint."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_returns_200(self, mock_engine, client):
response = client.get("/tower")
assert response.status_code == 200
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_returns_html(self, mock_engine, client):
response = client.get("/tower")
assert "text/html" in response.headers["content-type"]
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_contains_dashboard_content(self, mock_engine, client):
response = client.get("/tower")
body = response.text
assert "tower" in body.lower() or "spark" in body.lower()
class TestSparkSnapshot:
"""Tests for _spark_snapshot helper."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_structure(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
assert snap["type"] == "spark_state"
assert "status" in snap
assert "events" in snap
assert "predictions" in snap
assert "advisories" in snap
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_events_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
ev = snap["events"][0]
assert ev["event_type"] == "task_completed"
assert ev["importance"] == 0.8
assert ev["agent_id"] == "agent-12"
assert ev["task_id"] == "task-567"
assert ev["data"] == {"result": "ok"}
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_predictions_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
pred = snap["predictions"][0]
assert pred["task_id"] == "task-567"
assert pred["accuracy"] == 0.9
assert pred["evaluated"] is True
assert pred["predicted"] == {"outcome": "success"}
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_advisories_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
adv = snap["advisories"][0]
assert adv["category"] == "performance"
assert adv["priority"] == "high"
assert adv["title"] == "Slow tasks"
assert adv["suggested_action"] == "Scale up workers"
@patch("dashboard.routes.tower.spark_engine")
def test_snapshot_handles_empty_state(self, mock_engine):
mock_engine.status.return_value = {"enabled": False}
mock_engine.get_timeline.return_value = []
mock_engine.get_predictions.return_value = []
mock_engine.get_advisories.return_value = []
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
assert snap["events"] == []
assert snap["predictions"] == []
assert snap["advisories"] == []
@patch("dashboard.routes.tower.spark_engine")
def test_snapshot_handles_invalid_json_data(self, mock_engine):
mock_engine.status.return_value = {"enabled": True}
event = MagicMock()
event.event_type = "test"
event.description = "bad data"
event.importance = 0.5
event.created_at = "2026-01-01T00:00:00"
event.agent_id = None
event.task_id = None
event.data = "not-json{"
mock_engine.get_timeline.return_value = [event]
pred = MagicMock()
pred.task_id = None
pred.accuracy = None
pred.evaluated_at = None
pred.created_at = "2026-01-01T00:00:00"
pred.predicted_value = None
mock_engine.get_predictions.return_value = [pred]
mock_engine.get_advisories.return_value = []
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
ev = snap["events"][0]
assert ev["data"] == {}
assert "agent_id" not in ev
assert "task_id" not in ev
pred = snap["predictions"][0]
assert pred["task_id"] == "?"
assert pred["predicted"] == {}
class TestTowerWebSocket:
"""Tests for WS /tower/ws endpoint."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
@patch("dashboard.routes.tower._PUSH_INTERVAL", 0)
def test_ws_sends_initial_snapshot(self, mock_engine, client):
import json
with client.websocket_connect("/tower/ws") as ws:
data = json.loads(ws.receive_text())
assert data["type"] == "spark_state"
assert "status" in data
assert "events" in data

View File

@@ -5,13 +5,11 @@ from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_build_report_description,
_create_bug_report,
_dedup_cache,
_extract_traceback_info,
_get_git_context,
_is_duplicate,
_log_bug_report_created,
_log_error_event,
_notify_bug_report,
_record_to_session,
@@ -233,68 +231,6 @@ class TestLogErrorEvent:
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
class TestBuildReportDescription:
"""Test _build_report_description helper."""
def test_includes_error_info(self):
try:
raise RuntimeError("desc test")
except RuntimeError as e:
desc = _build_report_description(
e,
"test_src",
None,
"hash1",
"tb...",
"file.py",
10,
{"branch": "main"},
)
assert "RuntimeError" in desc
assert "test_src" in desc
assert "file.py:10" in desc
assert "hash1" in desc
def test_includes_context_when_provided(self):
try:
raise RuntimeError("ctx desc")
except RuntimeError as e:
desc = _build_report_description(
e,
"src",
{"path": "/api"},
"h",
"tb",
"f.py",
1,
{},
)
assert "path=/api" in desc
def test_omits_context_when_none(self):
try:
raise RuntimeError("no ctx")
except RuntimeError as e:
desc = _build_report_description(
e,
"src",
None,
"h",
"tb",
"f.py",
1,
{},
)
assert "**Context:**" not in desc
class TestLogBugReportCreated:
"""Test _log_bug_report_created helper."""
def test_does_not_crash_on_missing_deps(self):
_log_bug_report_created("test", "task-1", "hash1", "title")
class TestCreateBugReport:
"""Test _create_bug_report helper."""

View File

@@ -1,149 +0,0 @@
"""Tests for provider health history store and API endpoint."""
import time
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock
import pytest
from src.infrastructure.router.history import HealthHistoryStore
@pytest.fixture
def store():
"""In-memory history store for testing."""
s = HealthHistoryStore(db_path=":memory:")
yield s
s.close()
@pytest.fixture
def sample_providers():
return [
{
"name": "anthropic",
"status": "healthy",
"error_rate": 0.01,
"avg_latency_ms": 250.5,
"circuit_state": "closed",
"total_requests": 100,
},
{
"name": "local",
"status": "degraded",
"error_rate": 0.15,
"avg_latency_ms": 80.0,
"circuit_state": "closed",
"total_requests": 50,
},
]
def test_record_and_retrieve(store, sample_providers):
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 1
assert len(history[0]["providers"]) == 2
assert history[0]["providers"][0]["name"] == "anthropic"
assert history[0]["providers"][1]["name"] == "local"
assert "timestamp" in history[0]
def test_multiple_snapshots(store, sample_providers):
store.record_snapshot(sample_providers)
time.sleep(0.01)
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 2
def test_hours_filtering(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=48)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
history = store.get_history(hours=24)
assert len(history) == 1
history = store.get_history(hours=72)
assert len(history) == 2
def test_prune(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=200)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
deleted = store.prune(keep_hours=168)
assert deleted == 1
history = store.get_history(hours=999)
assert len(history) == 1
def test_empty_history(store):
assert store.get_history(hours=24) == []
def test_capture_snapshot_from_router(store):
mock_metrics = MagicMock()
mock_metrics.error_rate = 0.05
mock_metrics.avg_latency_ms = 200.0
mock_metrics.total_requests = 42
mock_provider = MagicMock()
mock_provider.name = "test-provider"
mock_provider.status.value = "healthy"
mock_provider.metrics = mock_metrics
mock_provider.circuit_state.value = "closed"
mock_router = MagicMock()
mock_router.providers = [mock_provider]
store._capture_snapshot(mock_router)
history = store.get_history(hours=1)
assert len(history) == 1
p = history[0]["providers"][0]
assert p["name"] == "test-provider"
assert p["status"] == "healthy"
assert p["error_rate"] == 0.05
assert p["total_requests"] == 42
def test_history_api_endpoint(store, sample_providers):
"""GET /api/v1/router/history returns snapshot data."""
store.record_snapshot(sample_providers)
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.infrastructure.router.api import get_cascade_router
from src.infrastructure.router.api import router as api_router
from src.infrastructure.router.history import get_history_store
app = FastAPI()
app.include_router(api_router)
app.dependency_overrides[get_history_store] = lambda: store
app.dependency_overrides[get_cascade_router] = lambda: MagicMock()
client = TestClient(app)
resp = client.get("/api/v1/router/history?hours=1")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert len(data[0]["providers"]) == 2
assert data[0]["providers"][0]["name"] == "anthropic"
app.dependency_overrides.clear()

View File

@@ -174,103 +174,6 @@ class TestDiscordVendor:
assert result is False
class TestExtractContent:
def test_strips_bot_mention(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 12345
msg = MagicMock()
msg.content = "<@12345> hello there"
assert vendor._extract_content(msg) == "hello there"
def test_no_client_user(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user = None
msg = MagicMock()
msg.content = "hello"
assert vendor._extract_content(msg) == "hello"
def test_empty_after_strip(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 99
msg = MagicMock()
msg.content = "<@99>"
assert vendor._extract_content(msg) == ""
class TestInvokeAgent:
@staticmethod
def _make_typing_target():
"""Build a mock target whose .typing() is an async context manager."""
from contextlib import asynccontextmanager
target = AsyncMock()
@asynccontextmanager
async def _typing():
yield
target.typing = _typing
return target
@pytest.mark.asyncio
async def test_timeout_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools", side_effect=TimeoutError
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "too long" in response
@pytest.mark.asyncio
async def test_exception_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
side_effect=RuntimeError("boom"),
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "trouble" in response
class TestSendResponse:
@pytest.mark.asyncio
async def test_skips_empty(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response(None, target)
target.send.assert_not_called()
await DiscordVendor._send_response("", target)
target.send.assert_not_called()
@pytest.mark.asyncio
async def test_sends_short_message(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response("hello", target)
target.send.assert_called_once_with("hello")
class TestChunkMessage:
def test_short_message(self):
from integrations.chat_bridge.vendors.discord import _chunk_message

View File

@@ -361,53 +361,6 @@ class TestRun:
assert response == "ok"
# ── _handle_retry_or_raise ────────────────────────────────────────────────
class TestHandleRetryOrRaise:
def test_raises_on_last_attempt(self):
BaseAgent = _make_base_class()
with pytest.raises(ValueError, match="boom"):
BaseAgent._handle_retry_or_raise(
ValueError("boom"),
attempt=3,
max_retries=3,
transient=False,
)
def test_raises_on_last_attempt_transient(self):
BaseAgent = _make_base_class()
exc = httpx.ConnectError("down")
with pytest.raises(httpx.ConnectError):
BaseAgent._handle_retry_or_raise(
exc,
attempt=3,
max_retries=3,
transient=True,
)
def test_no_raise_on_early_attempt(self):
BaseAgent = _make_base_class()
# Should return None (no raise) on non-final attempt
result = BaseAgent._handle_retry_or_raise(
ValueError("retry me"),
attempt=1,
max_retries=3,
transient=False,
)
assert result is None
def test_no_raise_on_early_transient(self):
BaseAgent = _make_base_class()
result = BaseAgent._handle_retry_or_raise(
httpx.ReadTimeout("busy"),
attempt=2,
max_retries=3,
transient=True,
)
assert result is None
# ── get_capabilities / get_status ────────────────────────────────────────────

View File

@@ -55,14 +55,14 @@ def test_think_sends_topic_to_agent():
)
def test_think_ignores_model_size_option():
"""think --model-size 70b must not forward model_size to create_timmy."""
def test_think_passes_model_size_option():
"""think --model-size 70b must forward the model size to create_timmy."""
mock_timmy = MagicMock()
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["think", "topic", "--model-size", "70b"])
mock_create.assert_called_once_with(backend=None, session_id="cli")
mock_create.assert_called_once_with(backend=None, model_size="70b", session_id="cli")
# ---------------------------------------------------------------------------

View File

@@ -6,9 +6,6 @@ import pytest
from timmy.mcp_tools import (
_bridge_to_work_order,
_build_issue_args,
_build_issue_body,
_category_from_labels,
_generate_avatar_image,
_parse_command,
close_mcp_sessions,
@@ -135,49 +132,6 @@ def test_filesystem_mcp_returns_tools():
assert "/home/user/project" in params_kwargs["args"]
# ---------------------------------------------------------------------------
# _build_issue_body / _build_issue_args / _category_from_labels
# ---------------------------------------------------------------------------
def test_build_issue_body_appends_signature():
"""_build_issue_body appends the auto-filing signature."""
result = _build_issue_body("Some description")
assert result.startswith("Some description\n\n")
assert "Auto-filed by Timmy" in result
def test_build_issue_body_empty():
"""_build_issue_body handles empty body."""
result = _build_issue_body("")
assert result.startswith("---\n")
def test_build_issue_args():
"""_build_issue_args returns correct MCP arguments."""
with patch("timmy.mcp_tools.settings") as mock_settings:
mock_settings.gitea_repo = "owner/repo"
result = _build_issue_args("Title", "Body")
assert result == {
"method": "create",
"owner": "owner",
"repo": "repo",
"title": "Title",
"body": "Body",
}
def test_category_from_labels_bug():
"""_category_from_labels returns 'bug' when labels contain bug."""
assert _category_from_labels("bug, enhancement") == "bug"
def test_category_from_labels_default():
"""_category_from_labels returns 'suggestion' by default."""
assert _category_from_labels("enhancement") == "suggestion"
assert _category_from_labels("") == "suggestion"
# ---------------------------------------------------------------------------
# create_gitea_issue_via_mcp
# ---------------------------------------------------------------------------

View File

@@ -15,7 +15,7 @@ except ImportError:
np = None
try:
from timmy.voice_loop import VoiceConfig, VoiceLoop, _rms, _strip_markdown
from timmy.voice_loop import VoiceConfig, VoiceLoop, _strip_markdown
except ImportError:
pass # pytestmark will skip all tests anyway
@@ -147,31 +147,6 @@ class TestStripMarkdown:
assert "*" not in result
class TestRms:
def test_silent_block(self):
block = np.zeros(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(0.0, abs=1e-7)
def test_loud_block(self):
block = np.ones(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(1.0, abs=1e-5)
class TestFinalizeUtterance:
def test_returns_none_for_empty(self):
assert VoiceLoop._finalize_utterance([], min_blocks=5, sample_rate=16000) is None
def test_returns_none_for_too_short(self):
chunks = [np.zeros(1600, dtype=np.float32) for _ in range(3)]
assert VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000) is None
def test_returns_audio_for_sufficient_chunks(self):
chunks = [np.ones(1600, dtype=np.float32) for _ in range(6)]
result = VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000)
assert result is not None
assert len(result) == 6 * 1600
class TestThink:
def test_think_returns_response(self):
loop = VoiceLoop()
@@ -261,7 +236,6 @@ class TestHallucinationFilter:
"""Whisper tends to hallucinate on silence/noise. The loop should filter these."""
def test_known_hallucinations_filtered(self):
loop = VoiceLoop()
hallucinations = [
"you",
"thanks.",
@@ -269,35 +243,33 @@ class TestHallucinationFilter:
"Bye.",
"Thanks for watching!",
"Thank you for watching!",
"",
]
for text in hallucinations:
assert loop._is_hallucination(text), f"'{text}' should be filtered"
def test_real_speech_not_filtered(self):
loop = VoiceLoop()
assert not loop._is_hallucination("Hello Timmy")
assert not loop._is_hallucination("What time is it?")
assert text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
), f"'{text}' should be filtered"
class TestExitCommands:
"""Voice loop should recognize exit commands."""
def test_exit_commands(self):
loop = VoiceLoop()
exits = ["goodbye", "exit", "quit", "stop", "goodbye timmy", "stop listening"]
for cmd in exits:
assert loop._is_exit_command(cmd), f"'{cmd}' should be an exit command"
def test_exit_with_punctuation(self):
loop = VoiceLoop()
assert loop._is_exit_command("goodbye!")
assert loop._is_exit_command("stop.")
def test_non_exit_commands(self):
loop = VoiceLoop()
assert not loop._is_exit_command("hello")
assert not loop._is_exit_command("what time is it")
assert cmd.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
), f"'{cmd}' should be an exit command"
class TestPlayAudio:

View File

@@ -1,109 +0,0 @@
"""Unit tests for the lightning package (factory + ledger)."""
from __future__ import annotations
import pytest
from lightning.factory import Invoice, MockBackend, get_backend
from lightning.ledger import (
TxStatus,
TxType,
clear,
create_invoice_entry,
get_balance,
get_transactions,
mark_settled,
)
@pytest.fixture(autouse=True)
def _clean_ledger():
"""Reset the in-memory ledger between tests."""
clear()
yield
clear()
# ── Factory tests ────────────────────────────────────────────────────
class TestMockBackend:
def test_create_invoice_returns_invoice(self):
backend = MockBackend()
inv = backend.create_invoice(100, "test memo")
assert isinstance(inv, Invoice)
assert inv.amount_sats == 100
assert inv.memo == "test memo"
assert len(inv.payment_hash) == 64 # SHA-256 hex
assert inv.payment_request.startswith("lnbc")
def test_invoices_have_unique_hashes(self):
backend = MockBackend()
a = backend.create_invoice(10)
b = backend.create_invoice(10)
assert a.payment_hash != b.payment_hash
class TestGetBackend:
def test_returns_mock_backend(self):
backend = get_backend()
assert isinstance(backend, MockBackend)
# ── Ledger tests ─────────────────────────────────────────────────────
class TestLedger:
def test_create_invoice_entry(self):
entry = create_invoice_entry(
payment_hash="abc123",
amount_sats=500,
memo="test",
source="unit_test",
)
assert entry.tx_type == TxType.incoming
assert entry.status == TxStatus.pending
assert entry.amount_sats == 500
def test_mark_settled(self):
create_invoice_entry(payment_hash="hash1", amount_sats=100)
result = mark_settled("hash1", preimage="secret")
assert result is not None
assert result.status == TxStatus.settled
assert result.preimage == "secret"
assert result.settled_at != ""
def test_mark_settled_unknown_hash(self):
assert mark_settled("nonexistent") is None
def test_get_balance_empty(self):
bal = get_balance()
assert bal["net_sats"] == 0
assert bal["available_sats"] == 0
def test_get_balance_with_settled(self):
create_invoice_entry(payment_hash="h1", amount_sats=1000)
mark_settled("h1")
bal = get_balance()
assert bal["incoming_total_sats"] == 1000
assert bal["net_sats"] == 1000
assert bal["available_sats"] == 1000
def test_get_balance_pending_not_counted(self):
create_invoice_entry(payment_hash="h2", amount_sats=500)
bal = get_balance()
assert bal["incoming_total_sats"] == 0
assert bal["pending_incoming_sats"] == 500
def test_get_transactions_returns_entries(self):
create_invoice_entry(payment_hash="t1", amount_sats=10)
create_invoice_entry(payment_hash="t2", amount_sats=20)
txs = get_transactions()
assert len(txs) == 2
def test_get_transactions_filter_by_status(self):
create_invoice_entry(payment_hash="f1", amount_sats=10)
create_invoice_entry(payment_hash="f2", amount_sats=20)
mark_settled("f1")
assert len(get_transactions(status="settled")) == 1
assert len(get_transactions(status="pending")) == 1