Compare commits

...

15 Commits

Author SHA1 Message Date
9732c80892 feat: Real-time Spark Visualization in Tower Dashboard (#612)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 16:10:42 -04:00
f3b3d1e648 [loop-cycle-1658] feat: provider health history endpoint (#457) (#611) 2026-03-20 16:09:20 -04:00
4ba8d25749 feat: Lightning Network integration for tool usage (#610)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 13:07:02 -04:00
2622f0a0fb [loop-cycle-1242] fix: cycle_retro reads cycle_result.json (#603) (#609) 2026-03-20 12:55:01 -04:00
e3d60b89a9 fix: remove model_size kwarg from create_timmy() CLI calls (#606)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:48:49 -04:00
6214ad3225 refactor: extract helpers from run_self_tests() (#601)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:40:44 -04:00
5f5da2163f [loop-cycle] refactor: extract helpers from _handle_tool_confirmation (#592) (#600) 2026-03-20 12:32:24 -04:00
0029c34bb1 refactor: break up search_thoughts() into focused helpers (#597)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:26:51 -04:00
2577b71207 fix: capture thought timestamp at cycle start, not after LLM call (#590)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:13:48 -04:00
1a8b8ecaed [loop-cycle-1235] refactor: break up _migrate_schema() into focused helpers (#591) (#595) 2026-03-20 12:07:15 -04:00
d821e76589 [loop-cycle-1234] refactor: break up _generate_avatar_image (#563) (#589) 2026-03-20 11:57:53 -04:00
bc010ecfba [loop-cycle-1233] refactor: add docstrings to calm.py route handlers (#569) (#585) 2026-03-20 11:44:06 -04:00
faf6c1a5f1 [loop-cycle-1233] refactor: break up BaseAgent.run() (#561) (#584) 2026-03-20 11:24:36 -04:00
48103bb076 [loop-cycle-956] refactor: break up _handle_message() into focused helpers (#553) (#574) 2026-03-19 21:42:01 -04:00
9f244ffc70 refactor: break up _record_utterance() into focused helpers (#572)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:37:32 -04:00
27 changed files with 1681 additions and 422 deletions

View File

@@ -54,6 +54,7 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
# How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50
@@ -246,9 +247,37 @@ def update_summary() -> None:
SUMMARY_FILE.write_text(json.dumps(summary, indent=2) + "\n")
def _load_cycle_result() -> dict:
"""Read .loop/cycle_result.json if it exists; return empty dict on failure."""
if not CYCLE_RESULT_FILE.exists():
return {}
try:
raw = CYCLE_RESULT_FILE.read_text().strip()
# Strip hermes fence markers (```json ... ```) if present
if raw.startswith("```"):
lines = raw.splitlines()
lines = [l for l in lines if not l.startswith("```")]
raw = "\n".join(lines)
return json.loads(raw)
except (json.JSONDecodeError, OSError):
return {}
def main() -> None:
args = parse_args()
# Backfill from cycle_result.json when CLI args have defaults
cr = _load_cycle_result()
if cr:
if args.issue is None and cr.get("issue"):
args.issue = int(cr["issue"])
if args.type == "unknown" and cr.get("type"):
args.type = cr["type"]
if args.tests_passed == 0 and cr.get("tests_passed"):
args.tests_passed = int(cr["tests_passed"])
if not args.notes and cr.get("notes"):
args.notes = cr["notes"]
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()

View File

@@ -46,6 +46,7 @@ from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import router as world_router
@@ -583,6 +584,7 @@ app.include_router(system_router)
app.include_router(experiments_router)
app.include_router(db_explorer_router)
app.include_router(world_router)
app.include_router(tower_router)
@app.websocket("/ws")

View File

@@ -19,14 +19,17 @@ router = APIRouter(tags=["calm"])
# Helper functions for state machine logic
def get_now_task(db: Session) -> Task | None:
"""Return the single active NOW task, or None."""
return db.query(Task).filter(Task.state == TaskState.NOW).first()
def get_next_task(db: Session) -> Task | None:
"""Return the single queued NEXT task, or None."""
return db.query(Task).filter(Task.state == TaskState.NEXT).first()
def get_later_tasks(db: Session) -> list[Task]:
"""Return all LATER tasks ordered by MIT flag then sort_order."""
return (
db.query(Task)
.filter(Task.state == TaskState.LATER)
@@ -36,6 +39,12 @@ def get_later_tasks(db: Session) -> list[Task]:
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
- At most one NOW task (extras demoted to NEXT).
- If no NOW, promote NEXT -> NOW.
- If no NEXT, promote highest-priority LATER -> NEXT.
"""
# Ensure only one NOW task exists. If multiple, demote extras to NEXT.
now_tasks = db.query(Task).filter(Task.state == TaskState.NOW).all()
if len(now_tasks) > 1:
@@ -74,6 +83,7 @@ def promote_tasks(db: Session):
# Endpoints
@router.get("/calm", response_class=HTMLResponse)
async def get_calm_view(request: Request, db: Session = Depends(get_db)):
"""Render the main CALM dashboard with NOW/NEXT/LATER counts."""
now_task = get_now_task(db)
next_task = get_next_task(db)
later_tasks_count = len(get_later_tasks(db))
@@ -90,6 +100,7 @@ async def get_calm_view(request: Request, db: Session = Depends(get_db)):
@router.get("/calm/ritual/morning", response_class=HTMLResponse)
async def get_morning_ritual_form(request: Request):
"""Render the morning ritual intake form."""
return templates.TemplateResponse(request, "calm/morning_ritual_form.html", {})
@@ -102,6 +113,7 @@ async def post_morning_ritual(
mit3_title: str = Form(None),
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
@@ -173,6 +185,7 @@ async def post_morning_ritual(
@router.get("/calm/ritual/evening", response_class=HTMLResponse)
async def get_evening_ritual_form(request: Request, db: Session = Depends(get_db)):
"""Render the evening ritual form for today's journal entry."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -189,6 +202,7 @@ async def post_evening_ritual(
gratitude: str = Form(None),
energy_level: int = Form(None),
):
"""Process evening ritual: save reflection/gratitude, archive active tasks."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -223,6 +237,7 @@ async def create_new_task(
is_mit: bool = Form(False),
certainty: TaskCertainty = Form(TaskCertainty.SOFT),
):
"""Create a new task in LATER state and return updated count."""
task = Task(
title=title,
description=description,
@@ -247,6 +262,7 @@ async def start_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Move a task to NOW state, demoting the current NOW to NEXT."""
current_now_task = get_now_task(db)
if current_now_task and current_now_task.id != task_id:
current_now_task.state = TaskState.NEXT # Demote current NOW to NEXT
@@ -281,6 +297,7 @@ async def complete_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Mark a task as DONE and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -309,6 +326,7 @@ async def defer_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Defer a task and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -333,6 +351,7 @@ async def defer_task(
@router.get("/calm/partials/later_tasks_list", response_class=HTMLResponse)
async def get_later_tasks_list(request: Request, db: Session = Depends(get_db)):
"""Render the expandable list of LATER tasks."""
later_tasks = get_later_tasks(db)
return templates.TemplateResponse(
"calm/partials/later_tasks_list.html",
@@ -348,6 +367,7 @@ async def reorder_tasks(
later_task_ids: str = Form(""),
next_task_id: int | None = Form(None),
):
"""Reorder LATER tasks and optionally promote one to NEXT."""
# Reorder LATER tasks
if later_task_ids:
ids_in_order = [int(x.strip()) for x in later_task_ids.split(",") if x.strip()]

View File

@@ -16,52 +16,11 @@ router = APIRouter(tags=["system"])
@router.get("/lightning/ledger", response_class=HTMLResponse)
async def lightning_ledger(request: Request):
"""Ledger and balance page."""
# Mock data for now, as this seems to be a UI-first feature
balance = {
"available_sats": 1337,
"incoming_total_sats": 2000,
"outgoing_total_sats": 663,
"fees_paid_sats": 5,
"net_sats": 1337,
"pending_incoming_sats": 0,
"pending_outgoing_sats": 0,
}
"""Ledger and balance page backed by the in-memory Lightning ledger."""
from lightning.ledger import get_balance, get_transactions
# Mock transactions
from collections import namedtuple
from enum import Enum
class TxType(Enum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(Enum):
completed = "completed"
pending = "pending"
Tx = namedtuple(
"Tx", ["tx_type", "status", "amount_sats", "payment_hash", "memo", "created_at"]
)
transactions = [
Tx(
TxType.outgoing,
TxStatus.completed,
50,
"hash1",
"Model inference",
"2026-03-04 10:00:00",
),
Tx(
TxType.incoming,
TxStatus.completed,
1000,
"hash2",
"Manual deposit",
"2026-03-03 15:00:00",
),
]
balance = get_balance()
transactions = get_transactions()
return templates.TemplateResponse(
request,
@@ -70,7 +29,7 @@ async def lightning_ledger(request: Request):
"balance": balance,
"transactions": transactions,
"tx_types": ["incoming", "outgoing"],
"tx_statuses": ["completed", "pending"],
"tx_statuses": ["pending", "settled", "failed", "expired"],
"filter_type": None,
"filter_status": None,
"stats": {},

View File

@@ -0,0 +1,108 @@
"""Tower dashboard — real-time Spark visualization via WebSocket.
GET /tower — HTML Tower dashboard (Thinking / Predicting / Advising)
WS /tower/ws — WebSocket stream of Spark engine state updates
"""
import asyncio
import json
import logging
from fastapi import APIRouter, Request, WebSocket
from fastapi.responses import HTMLResponse
from dashboard.templating import templates
from spark.engine import spark_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tower", tags=["tower"])
_PUSH_INTERVAL = 5 # seconds between state broadcasts
def _spark_snapshot() -> dict:
"""Build a JSON-serialisable snapshot of Spark state."""
status = spark_engine.status()
timeline = spark_engine.get_timeline(limit=10)
events = []
for ev in timeline:
entry = {
"event_type": ev.event_type,
"description": ev.description,
"importance": ev.importance,
"created_at": ev.created_at,
}
if ev.agent_id:
entry["agent_id"] = ev.agent_id[:8]
if ev.task_id:
entry["task_id"] = ev.task_id[:8]
try:
entry["data"] = json.loads(ev.data)
except (json.JSONDecodeError, TypeError):
entry["data"] = {}
events.append(entry)
predictions = spark_engine.get_predictions(limit=5)
preds = []
for p in predictions:
pred = {
"task_id": p.task_id[:8] if p.task_id else "?",
"accuracy": p.accuracy,
"evaluated": p.evaluated_at is not None,
"created_at": p.created_at,
}
try:
pred["predicted"] = json.loads(p.predicted_value)
except (json.JSONDecodeError, TypeError):
pred["predicted"] = {}
preds.append(pred)
advisories = spark_engine.get_advisories()
advs = [
{
"category": a.category,
"priority": a.priority,
"title": a.title,
"detail": a.detail,
"suggested_action": a.suggested_action,
}
for a in advisories
]
return {
"type": "spark_state",
"status": status,
"events": events,
"predictions": preds,
"advisories": advs,
}
@router.get("", response_class=HTMLResponse)
async def tower_ui(request: Request):
"""Render the Tower dashboard page."""
snapshot = _spark_snapshot()
return templates.TemplateResponse(
request,
"tower.html",
{"snapshot": snapshot},
)
@router.websocket("/ws")
async def tower_ws(websocket: WebSocket) -> None:
"""Stream Spark state snapshots to the Tower dashboard."""
await websocket.accept()
logger.info("Tower WS connected")
try:
# Send initial snapshot
await websocket.send_text(json.dumps(_spark_snapshot()))
while True:
await asyncio.sleep(_PUSH_INTERVAL)
await websocket.send_text(json.dumps(_spark_snapshot()))
except Exception:
logger.debug("Tower WS disconnected")

View File

@@ -0,0 +1,180 @@
{% extends "base.html" %}
{% block title %}Timmy Time — Tower{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid tower-container py-3">
<div class="tower-header">
<div class="tower-title">TOWER</div>
<div class="tower-subtitle">
Real-time Spark visualization &mdash;
<span id="tower-conn" class="tower-conn-badge tower-conn-connecting">CONNECTING</span>
</div>
</div>
<div class="row g-3">
<!-- Left: THINKING (events) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-thinking">// THINKING</div>
<div class="card-body p-3 tower-scroll" id="tower-events">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
<!-- Middle: PREDICTING (EIDOS) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-predicting">// PREDICTING</div>
<div class="card-body p-3" id="tower-predictions">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
<div class="card mc-panel">
<div class="card-header mc-panel-header">// EIDOS STATS</div>
<div class="card-body p-3">
<div class="tower-stat-grid" id="tower-stats">
<div class="tower-stat"><span class="tower-stat-label">EVENTS</span><span class="tower-stat-value" id="ts-events">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">MEMORIES</span><span class="tower-stat-value" id="ts-memories">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">PREDICTIONS</span><span class="tower-stat-value" id="ts-preds">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">ACCURACY</span><span class="tower-stat-value" id="ts-accuracy"></span></div>
</div>
</div>
</div>
</div>
<!-- Right: ADVISING -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-advising">// ADVISING</div>
<div class="card-body p-3 tower-scroll" id="tower-advisories">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
</div>
</div>
<script>
(function() {
var ws = null;
var badge = document.getElementById('tower-conn');
function setConn(state) {
badge.textContent = state.toUpperCase();
badge.className = 'tower-conn-badge tower-conn-' + state;
}
function esc(s) { var d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
function renderEvents(events) {
var el = document.getElementById('tower-events');
if (!events || !events.length) { el.innerHTML = '<div class="tower-empty">No events captured yet.</div>'; return; }
var html = '';
for (var i = 0; i < events.length; i++) {
var ev = events[i];
var dots = ev.importance >= 0.8 ? '\u25cf\u25cf\u25cf' : ev.importance >= 0.5 ? '\u25cf\u25cf' : '\u25cf';
html += '<div class="tower-event tower-etype-' + esc(ev.event_type) + '">'
+ '<div class="tower-ev-head">'
+ '<span class="tower-ev-badge">' + esc(ev.event_type.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-ev-dots">' + dots + '</span>'
+ '</div>'
+ '<div class="tower-ev-desc">' + esc(ev.description) + '</div>'
+ '<div class="tower-ev-time">' + esc((ev.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderPredictions(preds) {
var el = document.getElementById('tower-predictions');
if (!preds || !preds.length) { el.innerHTML = '<div class="tower-empty">No predictions yet.</div>'; return; }
var html = '';
for (var i = 0; i < preds.length; i++) {
var p = preds[i];
var cls = p.evaluated ? 'tower-pred-done' : 'tower-pred-pending';
var accTxt = p.accuracy != null ? Math.round(p.accuracy * 100) + '%' : 'PENDING';
var accCls = p.accuracy != null ? (p.accuracy >= 0.7 ? 'text-success' : p.accuracy < 0.4 ? 'text-danger' : 'text-warning') : '';
html += '<div class="tower-pred ' + cls + '">'
+ '<div class="tower-pred-head">'
+ '<span class="tower-pred-task">' + esc(p.task_id) + '</span>'
+ '<span class="tower-pred-acc ' + accCls + '">' + accTxt + '</span>'
+ '</div>';
if (p.predicted) {
var pr = p.predicted;
html += '<div class="tower-pred-detail">';
if (pr.likely_winner) html += '<span>Winner: ' + esc(pr.likely_winner.slice(0, 8)) + '</span> ';
if (pr.success_probability != null) html += '<span>Success: ' + Math.round(pr.success_probability * 100) + '%</span> ';
html += '</div>';
}
html += '<div class="tower-ev-time">' + esc((p.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderAdvisories(advs) {
var el = document.getElementById('tower-advisories');
if (!advs || !advs.length) { el.innerHTML = '<div class="tower-empty">No advisories yet.</div>'; return; }
var html = '';
for (var i = 0; i < advs.length; i++) {
var a = advs[i];
var prio = a.priority >= 0.7 ? 'high' : a.priority >= 0.4 ? 'medium' : 'low';
html += '<div class="tower-advisory tower-adv-' + prio + '">'
+ '<div class="tower-adv-head">'
+ '<span class="tower-adv-cat">' + esc(a.category.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-adv-prio">' + Math.round(a.priority * 100) + '%</span>'
+ '</div>'
+ '<div class="tower-adv-title">' + esc(a.title) + '</div>'
+ '<div class="tower-adv-detail">' + esc(a.detail) + '</div>'
+ '<div class="tower-adv-action">' + esc(a.suggested_action) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderStats(status) {
if (!status) return;
document.getElementById('ts-events').textContent = status.events_captured || 0;
document.getElementById('ts-memories').textContent = status.memories_stored || 0;
var p = status.predictions || {};
document.getElementById('ts-preds').textContent = p.total_predictions || 0;
var acc = p.avg_accuracy;
var accEl = document.getElementById('ts-accuracy');
if (acc != null) {
accEl.textContent = Math.round(acc * 100) + '%';
accEl.className = 'tower-stat-value ' + (acc >= 0.7 ? 'text-success' : acc < 0.4 ? 'text-danger' : 'text-warning');
} else {
accEl.textContent = '\u2014';
}
}
function handleMsg(data) {
if (data.type !== 'spark_state') return;
renderEvents(data.events);
renderPredictions(data.predictions);
renderAdvisories(data.advisories);
renderStats(data.status);
}
function connect() {
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(proto + '//' + location.host + '/tower/ws');
ws.onopen = function() { setConn('live'); };
ws.onclose = function() { setConn('offline'); setTimeout(connect, 3000); };
ws.onerror = function() { setConn('offline'); };
ws.onmessage = function(e) {
try { handleMsg(JSON.parse(e.data)); } catch(err) { console.error('Tower WS parse error', err); }
};
}
connect();
})();
</script>
{% endblock %}

View File

@@ -2,6 +2,7 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .history import HealthHistoryStore, get_history_store
__all__ = [
"CascadeRouter",
@@ -9,4 +10,6 @@ __all__ = [
"ProviderStatus",
"get_router",
"router",
"HealthHistoryStore",
"get_history_store",
]

View File

@@ -8,6 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from .cascade import CascadeRouter, get_router
from .history import HealthHistoryStore, get_history_store
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/router", tags=["router"])
@@ -199,6 +200,17 @@ async def reload_config(
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/history")
async def get_history(
hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours."""
if store is None:
store = get_history_store()
return store.get_history(hours=hours)
@router.get("/config")
async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -0,0 +1,152 @@
"""Provider health history — time-series snapshots for dashboard visualization."""
import asyncio
import logging
import sqlite3
from datetime import UTC, datetime, timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
_store: "HealthHistoryStore | None" = None
class HealthHistoryStore:
"""Stores timestamped provider health snapshots in SQLite."""
def __init__(self, db_path: str = "data/router_history.db") -> None:
self.db_path = db_path
if db_path != ":memory:":
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
self._bg_task: asyncio.Task | None = None
def _init_schema(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
provider_name TEXT NOT NULL,
status TEXT NOT NULL,
error_rate REAL NOT NULL,
avg_latency_ms REAL NOT NULL,
circuit_state TEXT NOT NULL,
total_requests INTEGER NOT NULL
)
""")
self._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_snapshots_ts
ON snapshots(timestamp)
""")
self._conn.commit()
def record_snapshot(self, providers: list[dict]) -> None:
"""Record a health snapshot for all providers."""
ts = datetime.now(UTC).isoformat()
rows = [
(
ts,
p["name"],
p["status"],
p["error_rate"],
p["avg_latency_ms"],
p["circuit_state"],
p["total_requests"],
)
for p in providers
]
self._conn.executemany(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
rows,
)
self._conn.commit()
def get_history(self, hours: int = 24) -> list[dict]:
"""Return snapshots from the last N hours, grouped by timestamp."""
cutoff = (datetime.now(UTC) - timedelta(hours=hours)).isoformat()
rows = self._conn.execute(
"""SELECT timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests
FROM snapshots WHERE timestamp >= ? ORDER BY timestamp""",
(cutoff,),
).fetchall()
# Group by timestamp
snapshots: dict[str, list[dict]] = {}
for row in rows:
ts = row["timestamp"]
if ts not in snapshots:
snapshots[ts] = []
snapshots[ts].append(
{
"name": row["provider_name"],
"status": row["status"],
"error_rate": row["error_rate"],
"avg_latency_ms": row["avg_latency_ms"],
"circuit_state": row["circuit_state"],
"total_requests": row["total_requests"],
}
)
return [{"timestamp": ts, "providers": providers} for ts, providers in snapshots.items()]
def prune(self, keep_hours: int = 168) -> int:
"""Remove snapshots older than keep_hours. Returns rows deleted."""
cutoff = (datetime.now(UTC) - timedelta(hours=keep_hours)).isoformat()
cursor = self._conn.execute("DELETE FROM snapshots WHERE timestamp < ?", (cutoff,))
self._conn.commit()
return cursor.rowcount
def close(self) -> None:
"""Close the database connection."""
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
self._conn.close()
def _capture_snapshot(self, cascade_router) -> None: # noqa: ANN001
"""Capture current provider state as a snapshot."""
providers = []
for p in cascade_router.providers:
providers.append(
{
"name": p.name,
"status": p.status.value,
"error_rate": round(p.metrics.error_rate, 4),
"avg_latency_ms": round(p.metrics.avg_latency_ms, 2),
"circuit_state": p.circuit_state.value,
"total_requests": p.metrics.total_requests,
}
)
self.record_snapshot(providers)
async def start_background_task(
self,
cascade_router,
interval_seconds: int = 60, # noqa: ANN001
) -> None:
"""Start periodic snapshot capture."""
async def _loop() -> None:
while True:
try:
self._capture_snapshot(cascade_router)
logger.debug("Recorded health snapshot")
except Exception:
logger.exception("Failed to record health snapshot")
await asyncio.sleep(interval_seconds)
self._bg_task = asyncio.create_task(_loop())
logger.info("Health history background task started (interval=%ds)", interval_seconds)
def get_history_store() -> HealthHistoryStore:
"""Get or create the singleton history store."""
global _store # noqa: PLW0603
if _store is None:
_store = HealthHistoryStore()
return _store

View File

@@ -515,25 +515,36 @@ class DiscordVendor(ChatPlatform):
async def _handle_message(self, message) -> None:
"""Process an incoming message and respond via a thread."""
# Strip the bot mention from the message content
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
content = self._extract_content(message)
if not content:
return
# Create or reuse a thread for this conversation
thread = await self._get_or_create_thread(message)
target = thread or message.channel
session_id = f"discord_{thread.id}" if thread else f"discord_{message.channel.id}"
# Derive session_id for per-conversation history via Agno's SQLite
if thread:
session_id = f"discord_{thread.id}"
else:
session_id = f"discord_{message.channel.id}"
run_output, response = await self._invoke_agent(content, session_id, target)
# Run Timmy agent with typing indicator and timeout
if run_output is not None:
await self._handle_paused_run(run_output, target, session_id)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
await self._send_response(response, target)
def _extract_content(self, message) -> str:
"""Strip the bot mention and return clean message text."""
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
return content
async def _invoke_agent(self, content: str, session_id: str, target):
"""Run chat_with_tools with a typing indicator and timeout.
Returns a (run_output, error_response) tuple. On success the
error_response is ``None``; on failure run_output is ``None``.
"""
run_output = None
response = None
try:
@@ -548,51 +559,57 @@ class DiscordVendor(ChatPlatform):
except Exception as exc:
logger.error("Discord: chat_with_tools() failed: %s", exc)
response = "I'm having trouble reaching my inference backend right now. Please try again shortly."
return run_output, response
# Check if Agno paused the run for tool confirmation
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
async def _handle_paused_run(self, run_output, target, session_id: str) -> None:
"""If Agno paused the run for tool confirmation, enqueue approvals."""
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
from config import settings
if not (is_paused and getattr(run_output, "active_requirements", None)):
return
if settings.discord_confirm_actions:
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from config import settings
from timmy.approvals import create_item
if not settings.discord_confirm_actions:
return
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
from timmy.approvals import create_item
# Discord has a 2000 character limit — send with error handling
if response and response.strip():
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
@staticmethod
async def _send_response(response: str | None, target) -> None:
"""Send a response to Discord, chunked to the 2000-char limit."""
if not response or not response.strip():
return
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
async def _get_or_create_thread(self, message):
"""Get the active thread for a channel, or create one.

View File

@@ -0,0 +1 @@
"""Lightning Network integration for tool-usage micro-payments."""

69
src/lightning/factory.py Normal file
View File

@@ -0,0 +1,69 @@
"""Lightning backend factory.
Returns a mock or real LND backend based on ``settings.lightning_backend``.
"""
from __future__ import annotations
import hashlib
import logging
import secrets
from dataclasses import dataclass
from config import settings
logger = logging.getLogger(__name__)
@dataclass
class Invoice:
"""Minimal Lightning invoice representation."""
payment_hash: str
payment_request: str
amount_sats: int
memo: str
class MockBackend:
"""In-memory mock Lightning backend for development and testing."""
def create_invoice(self, amount_sats: int, memo: str = "") -> Invoice:
"""Create a fake invoice with a random payment hash."""
raw = secrets.token_bytes(32)
payment_hash = hashlib.sha256(raw).hexdigest()
payment_request = f"lnbc{amount_sats}mock{payment_hash[:20]}"
logger.debug("Mock invoice: %s sats — %s", amount_sats, payment_hash[:12])
return Invoice(
payment_hash=payment_hash,
payment_request=payment_request,
amount_sats=amount_sats,
memo=memo,
)
# Singleton — lazily created
_backend: MockBackend | None = None
def get_backend() -> MockBackend:
"""Return the configured Lightning backend (currently mock-only).
Raises ``ValueError`` if an unsupported backend is requested.
"""
global _backend # noqa: PLW0603
if _backend is not None:
return _backend
kind = settings.lightning_backend
if kind == "mock":
_backend = MockBackend()
elif kind == "lnd":
# LND gRPC integration is on the roadmap — for now fall back to mock.
logger.warning("LND backend not yet implemented — using mock")
_backend = MockBackend()
else:
raise ValueError(f"Unknown lightning_backend: {kind!r}")
logger.info("Lightning backend: %s", kind)
return _backend

146
src/lightning/ledger.py Normal file
View File

@@ -0,0 +1,146 @@
"""In-memory Lightning transaction ledger.
Tracks invoices, settlements, and balances per the schema in
``docs/adr/018-lightning-ledger.md``. Uses a simple in-memory list so the
dashboard can display real (ephemeral) data without requiring SQLite yet.
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
logger = logging.getLogger(__name__)
class TxType(StrEnum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(StrEnum):
pending = "pending"
settled = "settled"
failed = "failed"
expired = "expired"
@dataclass
class LedgerEntry:
"""Single ledger row matching the ADR-018 schema."""
id: str
tx_type: TxType
status: TxStatus
payment_hash: str
amount_sats: int
memo: str
source: str
created_at: str
invoice: str = ""
preimage: str = ""
task_id: str = ""
agent_id: str = ""
settled_at: str = ""
fee_sats: int = 0
# ── In-memory store ──────────────────────────────────────────────────
_entries: list[LedgerEntry] = []
def create_invoice_entry(
payment_hash: str,
amount_sats: int,
memo: str = "",
source: str = "tool_usage",
task_id: str = "",
agent_id: str = "",
invoice: str = "",
) -> LedgerEntry:
"""Record a new incoming invoice in the ledger."""
entry = LedgerEntry(
id=uuid.uuid4().hex[:16],
tx_type=TxType.incoming,
status=TxStatus.pending,
payment_hash=payment_hash,
amount_sats=amount_sats,
memo=memo,
source=source,
task_id=task_id,
agent_id=agent_id,
invoice=invoice,
created_at=datetime.now(UTC).isoformat(),
)
_entries.append(entry)
logger.debug("Ledger entry created: %s (%s sats)", entry.id, amount_sats)
return entry
def mark_settled(payment_hash: str, preimage: str = "") -> LedgerEntry | None:
"""Mark a pending entry as settled by payment hash."""
for entry in _entries:
if entry.payment_hash == payment_hash and entry.status == TxStatus.pending:
entry.status = TxStatus.settled
entry.preimage = preimage
entry.settled_at = datetime.now(UTC).isoformat()
logger.debug("Ledger settled: %s", payment_hash[:12])
return entry
return None
def get_balance() -> dict:
"""Compute the current balance from settled and pending entries."""
incoming_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.settled
)
outgoing_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.settled
)
fees = sum(e.fee_sats for e in _entries if e.status == TxStatus.settled)
pending_in = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.pending
)
pending_out = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.pending
)
net = incoming_total - outgoing_total - fees
return {
"incoming_total_sats": incoming_total,
"outgoing_total_sats": outgoing_total,
"fees_paid_sats": fees,
"net_sats": net,
"pending_incoming_sats": pending_in,
"pending_outgoing_sats": pending_out,
"available_sats": net - pending_out,
}
def get_transactions(
tx_type: str | None = None,
status: str | None = None,
limit: int = 50,
) -> list[LedgerEntry]:
"""Return ledger entries, optionally filtered."""
result = _entries
if tx_type:
result = [e for e in result if e.tx_type.value == tx_type]
if status:
result = [e for e in result if e.status.value == status]
return list(reversed(result))[:limit]
def clear() -> None:
"""Reset the ledger (for testing)."""
_entries.clear()

View File

@@ -119,75 +119,84 @@ class BaseAgent(ABC):
"""
pass
async def run(self, message: str) -> str:
"""Run the agent with a message.
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_TRANSIENT = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
Retries on transient failures (connection errors, timeouts) with
exponential backoff. GPU contention from concurrent Ollama
requests causes ReadError / ReadTimeout — these are transient
and should be retried, not raised immediately (#70).
async def run(self, message: str, *, max_retries: int = 3) -> str:
"""Run the agent with a message, retrying on transient failures.
Returns:
Agent response
GPU contention from concurrent Ollama requests causes ReadError /
ReadTimeout — these are transient and retried with exponential
backoff (#70).
"""
max_retries = 3
last_exception = None
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_transient = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
response = await self._run_with_retries(message, max_retries)
await self._emit_response_event(message, response)
return response
async def _run_with_retries(self, message: str, max_retries: int) -> str:
"""Execute agent.run() with retry logic for transient errors."""
for attempt in range(1, max_retries + 1):
try:
result = self.agent.run(message, stream=False)
response = result.content if hasattr(result, "content") else str(result)
break # Success, exit the retry loop
except _transient as exc:
last_exception = exc
if attempt < max_retries:
# Contention backoff — longer waits because the GPU
# needs time to finish the other request.
wait = min(2**attempt, 16)
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting %ds before retry...",
attempt,
max_retries,
type(exc).__name__,
wait,
)
await asyncio.sleep(wait)
else:
logger.error(
"Ollama unreachable after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
return result.content if hasattr(result, "content") else str(result)
except self._TRANSIENT as exc:
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=True,
)
await asyncio.sleep(min(2**attempt, 16))
except Exception as exc:
last_exception = exc
if attempt < max_retries:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
else:
logger.error(
"Agent run failed after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=False,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
# Unreachable — _handle_retry_or_raise raises on last attempt.
raise RuntimeError("retry loop exited unexpectedly") # pragma: no cover
# Emit completion event
@staticmethod
def _handle_retry_or_raise(
exc: Exception,
attempt: int,
max_retries: int,
*,
transient: bool,
) -> None:
"""Log a retry warning or raise after exhausting attempts."""
if attempt < max_retries:
if transient:
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting before retry...",
attempt,
max_retries,
type(exc).__name__,
)
else:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
else:
label = "Ollama unreachable" if transient else "Agent run failed"
logger.error("%s after %d attempts: %s", label, max_retries, exc)
raise exc
async def _emit_response_event(self, message: str, response: str) -> None:
"""Publish a completion event to the event bus if connected."""
if self.event_bus:
await self.event_bus.publish(
Event(
@@ -197,8 +206,6 @@ class BaseAgent(ABC):
)
)
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools

View File

@@ -37,6 +37,35 @@ def _is_interactive() -> bool:
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _prompt_interactive(req, tool_name: str, tool_args: dict) -> None:
"""Display tool details and prompt the human for approval."""
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
if typer.confirm("Allow this action?", default=False):
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
"""Auto-approve allowlisted tools; reject everything else."""
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
@@ -51,6 +80,7 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
decide = _prompt_interactive if interactive else _decide_autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
@@ -66,39 +96,10 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
for req in reqs:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
decide(req, tool_name, tool_args)
# Resume the run so the agent sees the confirmation result
try:
@@ -138,7 +139,7 @@ def think(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
@@ -201,7 +202,7 @@ def chat(
session_id = str(uuid.uuid4())
else:
session_id = _CLI_SESSION_ID
timmy = create_timmy(backend=backend, model_size=model_size, session_id=session_id)
timmy = create_timmy(backend=backend, session_id=session_id)
# Use agent.run() so we can intercept paused runs for tool confirmation.
run_output = timmy.run(message_str, stream=False, session_id=session_id)
@@ -278,7 +279,7 @@ def status(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Print Timmy's operational status."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(STATUS_PROMPT, stream=False, session_id=_CLI_SESSION_ID)

View File

@@ -21,6 +21,10 @@ Usage::
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from PIL import ImageDraw
import os
import shutil
import sqlite3
@@ -270,20 +274,8 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return f"Failed to create issue via MCP: {exc}"
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
# Background gradient effect — concentric circles
def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None:
"""Draw radial gradient background with concentric circles."""
for i in range(size // 2, 0, -4):
g = int(25 + (i / (size // 2)) * 30)
draw.ellipse(
@@ -291,33 +283,45 @@ def _generate_avatar_image() -> bytes:
fill=(10, g, 20),
)
# Wizard hat (triangle)
def _draw_wizard(draw: ImageDraw.ImageDraw) -> None:
"""Draw wizard hat, face, eyes, smile, monogram, and robe."""
hat_color = (100, 50, 160) # purple
draw.polygon(
[(256, 40), (160, 220), (352, 220)],
fill=hat_color,
outline=(180, 130, 255),
)
hat_outline = (180, 130, 255)
gold = (220, 190, 50)
pupil = (30, 30, 60)
# Hat brim
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=(180, 130, 255))
# Hat + brim
draw.polygon([(256, 40), (160, 220), (352, 220)], fill=hat_color, outline=hat_outline)
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=hat_outline)
# Face circle
# Face
draw.ellipse([190, 220, 322, 370], fill=(60, 180, 100), outline=(80, 220, 120))
# Eyes
# Eyes (whites + pupils)
draw.ellipse([220, 275, 248, 310], fill=(255, 255, 255))
draw.ellipse([264, 275, 292, 310], fill=(255, 255, 255))
draw.ellipse([228, 285, 242, 300], fill=(30, 30, 60))
draw.ellipse([272, 285, 286, 300], fill=(30, 30, 60))
draw.ellipse([228, 285, 242, 300], fill=pupil)
draw.ellipse([272, 285, 286, 300], fill=pupil)
# Smile
draw.arc([225, 300, 287, 355], start=10, end=170, fill=(30, 30, 60), width=3)
draw.arc([225, 300, 287, 355], start=10, end=170, fill=pupil, width=3)
# Stars around the hat
# "T" monogram on hat
draw.text((243, 100), "T", fill=gold)
# Robe
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _draw_stars(draw: ImageDraw.ImageDraw) -> None:
"""Draw decorative gold stars around the wizard hat."""
gold = (220, 190, 50)
star_positions = [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]
for sx, sy in star_positions:
for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]:
r = 8
draw.polygon(
[
@@ -333,18 +337,26 @@ def _generate_avatar_image() -> bytes:
fill=gold,
)
# "T" monogram on the hat
draw.text((243, 100), "T", fill=gold)
# Robe / body
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
import io
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
_draw_background(draw, size)
_draw_wizard(draw)
_draw_stars(draw)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()

View File

@@ -78,83 +78,88 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists but new one doesn't fully)
if not has_memories:
if "memories" not in tables:
logger.info("Migration: Creating unified memories table")
# Schema will be created above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old facts table
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
# Schema will be created by _ensure_schema above
conn.commit()
return
_migrate_episodes(conn, tables)
_migrate_chunks(conn, tables)
_drop_legacy_tables(conn, tables)
conn.commit()
def _migrate_episodes(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate episodes table rows into the unified memories table."""
if "episodes" not in tables:
return
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate chunks table rows into the unified memories table as vault_chunk."""
if "chunks" not in tables:
return
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_tables(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Drop old facts table if it exists."""
if "facts" not in tables:
return
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")

View File

@@ -341,6 +341,11 @@ class ThinkingEngine:
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
@@ -352,7 +357,7 @@ class ThinkingEngine:
if not content:
return None
thought = self._store_thought(content, seed_type)
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
@@ -1173,14 +1178,25 @@ class ThinkingEngine:
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(self, content: str, seed_type: str) -> Thought:
"""Persist a thought to SQLite."""
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=datetime.now(UTC).isoformat(),
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
@@ -1261,6 +1277,53 @@ class ThinkingEngine:
logger.debug("Failed to broadcast thought: %s", exc)
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
@@ -1278,58 +1341,17 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 50))
try:
engine = thinking_engine
db_path = engine._db_path
# Build query with optional seed_type filter
with _get_conn(db_path) as conn:
if seed_type:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
# Format results
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)

View File

@@ -326,6 +326,46 @@ def get_live_system_status() -> dict[str, Any]:
return result
def _build_pytest_cmd(venv_python: Path, scope: str) -> list[str]:
"""Build the pytest command list for the given scope."""
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
cmd.append(scope)
return cmd
def _parse_pytest_output(output: str) -> dict[str, int]:
"""Extract passed/failed/error counts from pytest output."""
import re
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
return {"passed": passed, "failed": failed, "errors": errors}
def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[str, Any]:
"""Run Timmy's own test suite and report results.
@@ -349,49 +389,17 @@ def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[s
if not venv_python.exists():
return {"success": False, "error": f"No venv found at {venv_python}"}
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
# Unit tests only — skip functional/e2e/integration
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
# Specific path
cmd.append(scope)
cmd = _build_pytest_cmd(venv_python, scope)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120, cwd=repo)
output = result.stdout + result.stderr
# Parse pytest output for counts
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
import re
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
counts = _parse_pytest_output(output)
return {
"success": result.returncode == 0,
"passed": passed,
"failed": failed,
"errors": errors,
"total": passed + failed + errors,
**counts,
"total": counts["passed"] + counts["failed"] + counts["errors"],
"return_code": result.returncode,
"summary": output[-2000:] if len(output) > 2000 else output,
}

View File

@@ -78,6 +78,11 @@ DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice"
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
@dataclass
class VoiceConfig:
"""Configuration for the voice loop."""
@@ -161,13 +166,6 @@ class VoiceLoop:
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
def _rms(block: np.ndarray) -> float:
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
@@ -177,42 +175,69 @@ class VoiceLoop:
dtype="float32",
blocksize=block_size,
) as stream:
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
rms = _rms(block)
return self._finalize_utterance(chunks, min_blocks, sr)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or max length.
Returns the list of captured audio chunks (may be empty).
"""
chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
else:
audio_chunks.append(block.copy())
silent_count = 0
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if silent_count >= silence_blocks:
break
# End of utterance
if silent_count >= silence_blocks:
break
if len(chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
# Safety cap
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
return chunks
if not audio_chunks or len(audio_chunks) < min_blocks:
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate recorded chunks and report duration.
Returns ``None`` if the utterance is too short to be meaningful.
"""
if not chunks or len(chunks) < min_blocks:
return None
audio = np.concatenate(audio_chunks, axis=0).flatten()
duration = len(audio) / sr
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio

View File

@@ -2493,3 +2493,57 @@
.db-cell { max-width: 300px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
.db-cell:hover { white-space: normal; word-break: break-all; }
.db-truncated { font-size: 0.7rem; color: var(--amber); padding: 0.3rem 0; }
/* ── Tower ────────────────────────────────────────────────────────────── */
.tower-container { max-width: 1400px; margin: 0 auto; }
.tower-header { margin-bottom: 1rem; }
.tower-title { font-size: 1.6rem; font-weight: 700; color: var(--green); letter-spacing: 0.15em; }
.tower-subtitle { font-size: 0.85rem; color: var(--text-dim); }
.tower-conn-badge { font-size: 0.7rem; font-weight: 600; padding: 2px 8px; border-radius: 3px; letter-spacing: 0.08em; }
.tower-conn-live { color: var(--green); border: 1px solid var(--green); }
.tower-conn-offline { color: var(--red); border: 1px solid var(--red); }
.tower-conn-connecting { color: var(--amber); border: 1px solid var(--amber); }
.tower-phase-card { min-height: 300px; }
.tower-phase-thinking { border-left: 3px solid var(--purple); }
.tower-phase-predicting { border-left: 3px solid var(--orange); }
.tower-phase-advising { border-left: 3px solid var(--green); }
.tower-scroll { max-height: 50vh; overflow-y: auto; }
.tower-empty { text-align: center; color: var(--text-dim); padding: 16px; font-size: 0.85rem; }
.tower-stat-grid { display: grid; grid-template-columns: repeat(4, 1fr); gap: 0.5rem; text-align: center; }
.tower-stat-label { display: block; font-size: 0.65rem; color: var(--text-dim); letter-spacing: 0.1em; }
.tower-stat-value { display: block; font-size: 1.1rem; font-weight: 700; color: var(--text-bright); }
.tower-event { padding: 8px; margin-bottom: 6px; border-left: 3px solid var(--border); border-radius: 3px; background: var(--bg-card); }
.tower-etype-task_posted { border-left-color: var(--purple); }
.tower-etype-bid_submitted { border-left-color: var(--orange); }
.tower-etype-task_completed { border-left-color: var(--green); }
.tower-etype-task_failed { border-left-color: var(--red); }
.tower-etype-agent_joined { border-left-color: var(--purple); }
.tower-etype-tool_executed { border-left-color: var(--amber); }
.tower-ev-head { display: flex; justify-content: space-between; align-items: center; margin-bottom: 4px; }
.tower-ev-badge { font-size: 0.65rem; font-weight: 600; color: var(--text-bright); letter-spacing: 0.08em; }
.tower-ev-dots { font-size: 0.6rem; color: var(--amber); }
.tower-ev-desc { font-size: 0.8rem; color: var(--text); }
.tower-ev-time { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
.tower-pred { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--orange); }
.tower-pred-done { border-left-color: var(--green); }
.tower-pred-pending { border-left-color: var(--amber); }
.tower-pred-head { display: flex; justify-content: space-between; align-items: center; }
.tower-pred-task { font-size: 0.75rem; font-weight: 600; color: var(--text-bright); font-family: monospace; }
.tower-pred-acc { font-size: 0.75rem; font-weight: 700; }
.tower-pred-detail { font-size: 0.75rem; color: var(--text-dim); margin-top: 4px; }
.tower-advisory { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--border); }
.tower-adv-high { border-left-color: var(--red); }
.tower-adv-medium { border-left-color: var(--orange); }
.tower-adv-low { border-left-color: var(--green); }
.tower-adv-head { display: flex; justify-content: space-between; font-size: 0.65rem; margin-bottom: 4px; }
.tower-adv-cat { font-weight: 600; color: var(--text-dim); letter-spacing: 0.08em; }
.tower-adv-prio { font-weight: 700; color: var(--amber); }
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }

View File

@@ -0,0 +1,149 @@
"""Tests for provider health history store and API endpoint."""
import time
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock
import pytest
from src.infrastructure.router.history import HealthHistoryStore
@pytest.fixture
def store():
"""In-memory history store for testing."""
s = HealthHistoryStore(db_path=":memory:")
yield s
s.close()
@pytest.fixture
def sample_providers():
return [
{
"name": "anthropic",
"status": "healthy",
"error_rate": 0.01,
"avg_latency_ms": 250.5,
"circuit_state": "closed",
"total_requests": 100,
},
{
"name": "local",
"status": "degraded",
"error_rate": 0.15,
"avg_latency_ms": 80.0,
"circuit_state": "closed",
"total_requests": 50,
},
]
def test_record_and_retrieve(store, sample_providers):
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 1
assert len(history[0]["providers"]) == 2
assert history[0]["providers"][0]["name"] == "anthropic"
assert history[0]["providers"][1]["name"] == "local"
assert "timestamp" in history[0]
def test_multiple_snapshots(store, sample_providers):
store.record_snapshot(sample_providers)
time.sleep(0.01)
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 2
def test_hours_filtering(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=48)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
history = store.get_history(hours=24)
assert len(history) == 1
history = store.get_history(hours=72)
assert len(history) == 2
def test_prune(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=200)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
deleted = store.prune(keep_hours=168)
assert deleted == 1
history = store.get_history(hours=999)
assert len(history) == 1
def test_empty_history(store):
assert store.get_history(hours=24) == []
def test_capture_snapshot_from_router(store):
mock_metrics = MagicMock()
mock_metrics.error_rate = 0.05
mock_metrics.avg_latency_ms = 200.0
mock_metrics.total_requests = 42
mock_provider = MagicMock()
mock_provider.name = "test-provider"
mock_provider.status.value = "healthy"
mock_provider.metrics = mock_metrics
mock_provider.circuit_state.value = "closed"
mock_router = MagicMock()
mock_router.providers = [mock_provider]
store._capture_snapshot(mock_router)
history = store.get_history(hours=1)
assert len(history) == 1
p = history[0]["providers"][0]
assert p["name"] == "test-provider"
assert p["status"] == "healthy"
assert p["error_rate"] == 0.05
assert p["total_requests"] == 42
def test_history_api_endpoint(store, sample_providers):
"""GET /api/v1/router/history returns snapshot data."""
store.record_snapshot(sample_providers)
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.infrastructure.router.api import get_cascade_router
from src.infrastructure.router.api import router as api_router
from src.infrastructure.router.history import get_history_store
app = FastAPI()
app.include_router(api_router)
app.dependency_overrides[get_history_store] = lambda: store
app.dependency_overrides[get_cascade_router] = lambda: MagicMock()
client = TestClient(app)
resp = client.get("/api/v1/router/history?hours=1")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert len(data[0]["providers"]) == 2
assert data[0]["providers"][0]["name"] == "anthropic"
app.dependency_overrides.clear()

View File

@@ -174,6 +174,103 @@ class TestDiscordVendor:
assert result is False
class TestExtractContent:
def test_strips_bot_mention(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 12345
msg = MagicMock()
msg.content = "<@12345> hello there"
assert vendor._extract_content(msg) == "hello there"
def test_no_client_user(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user = None
msg = MagicMock()
msg.content = "hello"
assert vendor._extract_content(msg) == "hello"
def test_empty_after_strip(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 99
msg = MagicMock()
msg.content = "<@99>"
assert vendor._extract_content(msg) == ""
class TestInvokeAgent:
@staticmethod
def _make_typing_target():
"""Build a mock target whose .typing() is an async context manager."""
from contextlib import asynccontextmanager
target = AsyncMock()
@asynccontextmanager
async def _typing():
yield
target.typing = _typing
return target
@pytest.mark.asyncio
async def test_timeout_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools", side_effect=TimeoutError
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "too long" in response
@pytest.mark.asyncio
async def test_exception_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
side_effect=RuntimeError("boom"),
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "trouble" in response
class TestSendResponse:
@pytest.mark.asyncio
async def test_skips_empty(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response(None, target)
target.send.assert_not_called()
await DiscordVendor._send_response("", target)
target.send.assert_not_called()
@pytest.mark.asyncio
async def test_sends_short_message(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response("hello", target)
target.send.assert_called_once_with("hello")
class TestChunkMessage:
def test_short_message(self):
from integrations.chat_bridge.vendors.discord import _chunk_message

View File

@@ -361,6 +361,53 @@ class TestRun:
assert response == "ok"
# ── _handle_retry_or_raise ────────────────────────────────────────────────
class TestHandleRetryOrRaise:
def test_raises_on_last_attempt(self):
BaseAgent = _make_base_class()
with pytest.raises(ValueError, match="boom"):
BaseAgent._handle_retry_or_raise(
ValueError("boom"),
attempt=3,
max_retries=3,
transient=False,
)
def test_raises_on_last_attempt_transient(self):
BaseAgent = _make_base_class()
exc = httpx.ConnectError("down")
with pytest.raises(httpx.ConnectError):
BaseAgent._handle_retry_or_raise(
exc,
attempt=3,
max_retries=3,
transient=True,
)
def test_no_raise_on_early_attempt(self):
BaseAgent = _make_base_class()
# Should return None (no raise) on non-final attempt
result = BaseAgent._handle_retry_or_raise(
ValueError("retry me"),
attempt=1,
max_retries=3,
transient=False,
)
assert result is None
def test_no_raise_on_early_transient(self):
BaseAgent = _make_base_class()
result = BaseAgent._handle_retry_or_raise(
httpx.ReadTimeout("busy"),
attempt=2,
max_retries=3,
transient=True,
)
assert result is None
# ── get_capabilities / get_status ────────────────────────────────────────────

View File

@@ -55,14 +55,14 @@ def test_think_sends_topic_to_agent():
)
def test_think_passes_model_size_option():
"""think --model-size 70b must forward the model size to create_timmy."""
def test_think_ignores_model_size_option():
"""think --model-size 70b must not forward model_size to create_timmy."""
mock_timmy = MagicMock()
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["think", "topic", "--model-size", "70b"])
mock_create.assert_called_once_with(backend=None, model_size="70b", session_id="cli")
mock_create.assert_called_once_with(backend=None, session_id="cli")
# ---------------------------------------------------------------------------

View File

@@ -15,7 +15,7 @@ except ImportError:
np = None
try:
from timmy.voice_loop import VoiceConfig, VoiceLoop, _strip_markdown
from timmy.voice_loop import VoiceConfig, VoiceLoop, _rms, _strip_markdown
except ImportError:
pass # pytestmark will skip all tests anyway
@@ -147,6 +147,31 @@ class TestStripMarkdown:
assert "*" not in result
class TestRms:
def test_silent_block(self):
block = np.zeros(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(0.0, abs=1e-7)
def test_loud_block(self):
block = np.ones(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(1.0, abs=1e-5)
class TestFinalizeUtterance:
def test_returns_none_for_empty(self):
assert VoiceLoop._finalize_utterance([], min_blocks=5, sample_rate=16000) is None
def test_returns_none_for_too_short(self):
chunks = [np.zeros(1600, dtype=np.float32) for _ in range(3)]
assert VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000) is None
def test_returns_audio_for_sufficient_chunks(self):
chunks = [np.ones(1600, dtype=np.float32) for _ in range(6)]
result = VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000)
assert result is not None
assert len(result) == 6 * 1600
class TestThink:
def test_think_returns_response(self):
loop = VoiceLoop()

View File

@@ -0,0 +1,109 @@
"""Unit tests for the lightning package (factory + ledger)."""
from __future__ import annotations
import pytest
from lightning.factory import Invoice, MockBackend, get_backend
from lightning.ledger import (
TxStatus,
TxType,
clear,
create_invoice_entry,
get_balance,
get_transactions,
mark_settled,
)
@pytest.fixture(autouse=True)
def _clean_ledger():
"""Reset the in-memory ledger between tests."""
clear()
yield
clear()
# ── Factory tests ────────────────────────────────────────────────────
class TestMockBackend:
def test_create_invoice_returns_invoice(self):
backend = MockBackend()
inv = backend.create_invoice(100, "test memo")
assert isinstance(inv, Invoice)
assert inv.amount_sats == 100
assert inv.memo == "test memo"
assert len(inv.payment_hash) == 64 # SHA-256 hex
assert inv.payment_request.startswith("lnbc")
def test_invoices_have_unique_hashes(self):
backend = MockBackend()
a = backend.create_invoice(10)
b = backend.create_invoice(10)
assert a.payment_hash != b.payment_hash
class TestGetBackend:
def test_returns_mock_backend(self):
backend = get_backend()
assert isinstance(backend, MockBackend)
# ── Ledger tests ─────────────────────────────────────────────────────
class TestLedger:
def test_create_invoice_entry(self):
entry = create_invoice_entry(
payment_hash="abc123",
amount_sats=500,
memo="test",
source="unit_test",
)
assert entry.tx_type == TxType.incoming
assert entry.status == TxStatus.pending
assert entry.amount_sats == 500
def test_mark_settled(self):
create_invoice_entry(payment_hash="hash1", amount_sats=100)
result = mark_settled("hash1", preimage="secret")
assert result is not None
assert result.status == TxStatus.settled
assert result.preimage == "secret"
assert result.settled_at != ""
def test_mark_settled_unknown_hash(self):
assert mark_settled("nonexistent") is None
def test_get_balance_empty(self):
bal = get_balance()
assert bal["net_sats"] == 0
assert bal["available_sats"] == 0
def test_get_balance_with_settled(self):
create_invoice_entry(payment_hash="h1", amount_sats=1000)
mark_settled("h1")
bal = get_balance()
assert bal["incoming_total_sats"] == 1000
assert bal["net_sats"] == 1000
assert bal["available_sats"] == 1000
def test_get_balance_pending_not_counted(self):
create_invoice_entry(payment_hash="h2", amount_sats=500)
bal = get_balance()
assert bal["incoming_total_sats"] == 0
assert bal["pending_incoming_sats"] == 500
def test_get_transactions_returns_entries(self):
create_invoice_entry(payment_hash="t1", amount_sats=10)
create_invoice_entry(payment_hash="t2", amount_sats=20)
txs = get_transactions()
assert len(txs) == 2
def test_get_transactions_filter_by_status(self):
create_invoice_entry(payment_hash="f1", amount_sats=10)
create_invoice_entry(payment_hash="f2", amount_sats=20)
mark_settled("f1")
assert len(get_transactions(status="settled")) == 1
assert len(get_transactions(status="pending")) == 1