Merge pull request #11 from Alexspayne/claude/briefing-approval-queue-T0GNi

This commit is contained in:
Alexander Whitestone
2026-02-22 09:10:04 -05:00
committed by GitHub
11 changed files with 1326 additions and 0 deletions

View File

@@ -1,4 +1,6 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
@@ -16,6 +18,7 @@ from dashboard.routes.voice import router as voice_router
from dashboard.routes.voice_enhanced import router as voice_enhanced_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.swarm_ws import router as swarm_ws_router
from dashboard.routes.briefing import router as briefing_router
logging.basicConfig(
level=logging.INFO,
@@ -27,9 +30,50 @@ logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent
PROJECT_ROOT = BASE_DIR.parent.parent
_BRIEFING_INTERVAL_HOURS = 6
async def _briefing_scheduler() -> None:
"""Background task: regenerate Timmy's briefing every 6 hours.
Runs once at startup (after a short delay to let the server settle),
then on a 6-hour cadence. Skips generation if a fresh briefing already
exists (< 30 min old).
"""
from timmy.briefing import engine as briefing_engine
from notifications.push import notify_briefing_ready
await asyncio.sleep(2) # Let server finish starting before first run
while True:
try:
if briefing_engine.needs_refresh():
logger.info("Generating morning briefing…")
briefing = briefing_engine.generate()
await notify_briefing_ready(briefing)
else:
logger.info("Briefing is fresh; skipping generation.")
except Exception as exc:
logger.error("Briefing scheduler error: %s", exc)
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(_briefing_scheduler())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(
title="Timmy Time — Mission Control",
version="1.0.0",
lifespan=lifespan,
# Docs disabled unless DEBUG=true in env / .env
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
@@ -47,6 +91,7 @@ app.include_router(voice_router)
app.include_router(voice_enhanced_router)
app.include_router(mobile_router)
app.include_router(swarm_ws_router)
app.include_router(briefing_router)
@app.get("/", response_class=HTMLResponse)

View File

@@ -0,0 +1,70 @@
"""Briefing routes — Morning briefing and approval queue.
GET /briefing — render the briefing page
GET /briefing/approvals — HTMX partial: pending approval cards
POST /briefing/approvals/{id}/approve — approve an item (HTMX)
POST /briefing/approvals/{id}/reject — reject an item (HTMX)
"""
import logging
from pathlib import Path
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from timmy.briefing import engine as briefing_engine
from timmy import approvals as approval_store
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/briefing", tags=["briefing"])
templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
@router.get("", response_class=HTMLResponse)
async def get_briefing(request: Request):
"""Return today's briefing page (generated or cached)."""
briefing = briefing_engine.get_or_generate()
return templates.TemplateResponse(
request,
"briefing.html",
{"briefing": briefing},
)
@router.get("/approvals", response_class=HTMLResponse)
async def get_approvals(request: Request):
"""Return HTMX partial with all pending approval items."""
items = approval_store.list_pending()
return templates.TemplateResponse(
request,
"partials/approval_cards.html",
{"items": items},
)
@router.post("/approvals/{item_id}/approve", response_class=HTMLResponse)
async def approve_item(request: Request, item_id: str):
"""Approve an approval item; return the updated card via HTMX."""
item = approval_store.approve(item_id)
if item is None:
return HTMLResponse("<p class='text-danger'>Item not found.</p>", status_code=404)
return templates.TemplateResponse(
request,
"partials/approval_card_single.html",
{"item": item},
)
@router.post("/approvals/{item_id}/reject", response_class=HTMLResponse)
async def reject_item(request: Request, item_id: str):
"""Reject an approval item; return the updated card via HTMX."""
item = approval_store.reject(item_id)
if item is None:
return HTMLResponse("<p class='text-danger'>Item not found.</p>", status_code=404)
return templates.TemplateResponse(
request,
"partials/approval_card_single.html",
{"item": item},
)

View File

@@ -21,6 +21,7 @@
<span class="mc-subtitle">MISSION CONTROL</span>
</div>
<div class="mc-header-right">
<a href="/briefing" class="mc-test-link">BRIEFING</a>
<a href="/swarm/live" class="mc-test-link">SWARM</a>
<a href="/marketplace/ui" class="mc-test-link">MARKET</a>
<a href="/mobile" class="mc-test-link">MOBILE</a>

View File

@@ -0,0 +1,208 @@
{% extends "base.html" %}
{% block title %}Timmy Time — Morning Briefing{% endblock %}
{% block content %}
<div class="container briefing-container py-4">
<!-- Header -->
<div class="briefing-header mb-4">
<div class="briefing-greeting">Good morning.</div>
<div class="briefing-timestamp">
Briefing generated
<span class="briefing-ts-val">{{ briefing.generated_at.strftime('%Y-%m-%d %H:%M UTC') }}</span>
&mdash; covering
<span class="briefing-ts-val">{{ briefing.period_start.strftime('%H:%M') }}</span>
to
<span class="briefing-ts-val">{{ briefing.period_end.strftime('%H:%M UTC') }}</span>
</div>
</div>
<!-- Summary -->
<div class="card mc-panel briefing-summary mb-5">
<div class="card-header mc-panel-header">// TIMMY&rsquo;S REPORT</div>
<div class="card-body p-4">
<div class="briefing-prose">{{ briefing.summary }}</div>
</div>
</div>
<!-- Approval Queue -->
<div class="card mc-panel">
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
<span>// APPROVAL QUEUE</span>
<span class="badge bg-warning text-dark"
id="approval-count">{{ briefing.approval_items | length }} pending</span>
</div>
<div class="card-body p-3"
id="approval-queue"
hx-get="/briefing/approvals"
hx-trigger="load"
hx-swap="innerHTML">
<!-- HTMX fills this on load with live data -->
<div class="text-center text-muted py-3">Loading approval items&hellip;</div>
</div>
</div>
</div>
<style>
/* ------------------------------------------------------------------ */
/* Briefing-specific styles — mobile-first */
/* ------------------------------------------------------------------ */
.briefing-container {
max-width: 680px;
}
.briefing-header {
border-left: 3px solid var(--mc-amber, #ffc107);
padding-left: 1rem;
}
.briefing-greeting {
font-size: 1.6rem;
font-weight: 700;
color: var(--mc-amber, #ffc107);
letter-spacing: 0.04em;
font-family: 'JetBrains Mono', monospace;
}
.briefing-timestamp {
font-size: 0.75rem;
color: #6c757d;
margin-top: 0.25rem;
}
.briefing-ts-val {
color: #adb5bd;
}
.briefing-prose {
font-size: 1rem;
line-height: 1.75;
color: #dee2e6;
white-space: pre-wrap;
word-break: break-word;
}
/* Approval cards */
.approval-card {
border: 1px solid #2a3a4a;
border-radius: 6px;
padding: 1rem;
margin-bottom: 0.75rem;
background: #0d1b2a;
transition: border-color 0.2s;
}
.approval-card.approved {
border-color: #198754;
opacity: 0.7;
}
.approval-card.rejected {
border-color: #dc3545;
opacity: 0.7;
}
.approval-card-title {
font-weight: 600;
font-size: 0.95rem;
color: #f8f9fa;
margin-bottom: 0.25rem;
}
.approval-card-desc {
font-size: 0.85rem;
color: #adb5bd;
margin-bottom: 0.5rem;
}
.approval-card-action {
font-size: 0.8rem;
color: #6c757d;
font-family: 'JetBrains Mono', monospace;
margin-bottom: 0.75rem;
border-left: 2px solid #495057;
padding-left: 0.5rem;
}
.impact-badge {
font-size: 0.7rem;
padding: 0.2em 0.5em;
border-radius: 4px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
}
.impact-low { background: #198754; color: #fff; }
.impact-medium { background: #fd7e14; color: #fff; }
.impact-high { background: #dc3545; color: #fff; }
.approval-actions {
display: flex;
gap: 0.5rem;
flex-wrap: wrap;
}
.btn-approve {
background: #198754;
color: #fff;
border: none;
border-radius: 4px;
padding: 0.4rem 0.9rem;
font-size: 0.82rem;
font-family: 'JetBrains Mono', monospace;
cursor: pointer;
min-height: 44px;
}
.btn-approve:hover { background: #157347; }
.btn-reject {
background: transparent;
color: #dc3545;
border: 1px solid #dc3545;
border-radius: 4px;
padding: 0.4rem 0.9rem;
font-size: 0.82rem;
font-family: 'JetBrains Mono', monospace;
cursor: pointer;
min-height: 44px;
}
.btn-reject:hover { background: #dc354520; }
.no-approvals {
text-align: center;
color: #6c757d;
padding: 2rem 0;
font-size: 0.9rem;
}
/* Refresh button */
.btn-refresh {
background: transparent;
color: #adb5bd;
border: 1px solid #495057;
border-radius: 4px;
padding: 0.3rem 0.7rem;
font-size: 0.75rem;
font-family: 'JetBrains Mono', monospace;
cursor: pointer;
text-decoration: none;
display: inline-block;
}
.btn-refresh:hover {
color: #f8f9fa;
border-color: #6c757d;
}
@media (max-width: 576px) {
.briefing-greeting { font-size: 1.3rem; }
.briefing-prose { font-size: 0.95rem; }
}
</style>
{% endblock %}

View File

@@ -0,0 +1,33 @@
<div class="approval-card {{ item.status }}" id="approval-{{ item.id }}">
<div class="d-flex justify-content-between align-items-start mb-1">
<div class="approval-card-title">{{ item.title }}</div>
<span class="impact-badge impact-{{ item.impact }}">{{ item.impact }}</span>
</div>
<div class="approval-card-desc">{{ item.description }}</div>
<div class="approval-card-action">&#x25B6; {{ item.proposed_action }}</div>
{% if item.status == "pending" %}
<div class="approval-actions">
<button class="btn-approve"
hx-post="/briefing/approvals/{{ item.id }}/approve"
hx-target="#approval-{{ item.id }}"
hx-swap="outerHTML">
APPROVE
</button>
<button class="btn-reject"
hx-post="/briefing/approvals/{{ item.id }}/reject"
hx-target="#approval-{{ item.id }}"
hx-swap="outerHTML">
REJECT
</button>
</div>
{% elif item.status == "approved" %}
<div class="text-success" style="font-size:0.82rem; font-family:'JetBrains Mono',monospace;">
&#x2713; Approved
</div>
{% elif item.status == "rejected" %}
<div class="text-danger" style="font-size:0.82rem; font-family:'JetBrains Mono',monospace;">
&#x2717; Rejected
</div>
{% endif %}
</div>

View File

@@ -0,0 +1,9 @@
{% if items %}
{% for item in items %}
{% include "partials/approval_card_single.html" %}
{% endfor %}
{% else %}
<div class="no-approvals">
No pending approvals. Timmy is standing by.
</div>
{% endif %}

View File

@@ -121,3 +121,25 @@ class PushNotifier:
# Module-level singleton
notifier = PushNotifier()
async def notify_briefing_ready(briefing) -> None:
"""Placeholder: notify the owner that a new morning briefing is ready.
Logs to console now. Wire to real push (APNs/Pushover) later.
Args:
briefing: A timmy.briefing.Briefing instance.
"""
n_approvals = len(briefing.approval_items) if briefing.approval_items else 0
message = (
f"Your morning briefing is ready. "
f"{n_approvals} item(s) await your approval."
)
notifier.notify(
title="Morning Briefing Ready",
message=message,
category="briefing",
native=True,
)
logger.info("Briefing push notification dispatched (%d approval(s))", n_approvals)

185
src/timmy/approvals.py Normal file
View File

@@ -0,0 +1,185 @@
"""Approval item management — the governance layer for autonomous Timmy actions.
The GOLDEN_TIMMY constant is the single source of truth for whether Timmy
may act autonomously. All features that want to take action must:
1. Create an ApprovalItem
2. Check GOLDEN_TIMMY
3. If True → wait for owner approval before executing
4. If False → log the action and proceed (Dark Timmy mode)
Default is always True. The owner changes this intentionally.
"""
import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# GOLDEN TIMMY RULE
# ---------------------------------------------------------------------------
GOLDEN_TIMMY = True
# When True: no autonomous action executes without an approved ApprovalItem.
# When False: Dark Timmy mode — Timmy may act on his own judgment.
# Default is always True. Owner changes this intentionally.
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
_DEFAULT_DB = Path.home() / ".timmy" / "approvals.db"
_EXPIRY_DAYS = 7
@dataclass
class ApprovalItem:
id: str
title: str
description: str
proposed_action: str # what Timmy wants to do
impact: str # "low" | "medium" | "high"
created_at: datetime
status: str # "pending" | "approved" | "rejected"
def _get_conn(db_path: Path = _DEFAULT_DB) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute(
"""
CREATE TABLE IF NOT EXISTS approval_items (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT NOT NULL,
proposed_action TEXT NOT NULL,
impact TEXT NOT NULL DEFAULT 'low',
created_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
)
"""
)
conn.commit()
return conn
def _row_to_item(row: sqlite3.Row) -> ApprovalItem:
return ApprovalItem(
id=row["id"],
title=row["title"],
description=row["description"],
proposed_action=row["proposed_action"],
impact=row["impact"],
created_at=datetime.fromisoformat(row["created_at"]),
status=row["status"],
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def create_item(
title: str,
description: str,
proposed_action: str,
impact: str = "low",
db_path: Path = _DEFAULT_DB,
) -> ApprovalItem:
"""Create and persist a new approval item."""
item = ApprovalItem(
id=str(uuid.uuid4()),
title=title,
description=description,
proposed_action=proposed_action,
impact=impact,
created_at=datetime.now(timezone.utc),
status="pending",
)
conn = _get_conn(db_path)
conn.execute(
"""
INSERT INTO approval_items
(id, title, description, proposed_action, impact, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
item.id,
item.title,
item.description,
item.proposed_action,
item.impact,
item.created_at.isoformat(),
item.status,
),
)
conn.commit()
conn.close()
return item
def list_pending(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all pending approval items, newest first."""
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT * FROM approval_items WHERE status = 'pending' ORDER BY created_at DESC"
).fetchall()
conn.close()
return [_row_to_item(r) for r in rows]
def list_all(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all approval items regardless of status, newest first."""
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT * FROM approval_items ORDER BY created_at DESC"
).fetchall()
conn.close()
return [_row_to_item(r) for r in rows]
def get_item(item_id: str, db_path: Path = _DEFAULT_DB) -> Optional[ApprovalItem]:
conn = _get_conn(db_path)
row = conn.execute(
"SELECT * FROM approval_items WHERE id = ?", (item_id,)
).fetchone()
conn.close()
return _row_to_item(row) if row else None
def approve(item_id: str, db_path: Path = _DEFAULT_DB) -> Optional[ApprovalItem]:
"""Mark an approval item as approved."""
conn = _get_conn(db_path)
conn.execute(
"UPDATE approval_items SET status = 'approved' WHERE id = ?", (item_id,)
)
conn.commit()
conn.close()
return get_item(item_id, db_path)
def reject(item_id: str, db_path: Path = _DEFAULT_DB) -> Optional[ApprovalItem]:
"""Mark an approval item as rejected."""
conn = _get_conn(db_path)
conn.execute(
"UPDATE approval_items SET status = 'rejected' WHERE id = ?", (item_id,)
)
conn.commit()
conn.close()
return get_item(item_id, db_path)
def expire_old(db_path: Path = _DEFAULT_DB) -> int:
"""Auto-expire pending items older than EXPIRY_DAYS. Returns count removed."""
cutoff = (datetime.now(timezone.utc) - timedelta(days=_EXPIRY_DAYS)).isoformat()
conn = _get_conn(db_path)
cursor = conn.execute(
"DELETE FROM approval_items WHERE status = 'pending' AND created_at < ?",
(cutoff,),
)
conn.commit()
count = cursor.rowcount
conn.close()
return count

306
src/timmy/briefing.py Normal file
View File

@@ -0,0 +1,306 @@
"""Morning Briefing Engine — Timmy shows up before you ask.
BriefingEngine queries recent swarm activity and chat history, asks Timmy's
Agno agent to summarise the period, and returns a Briefing with an embedded
list of ApprovalItems the owner needs to action.
Briefings are cached in SQLite so page loads are instant. A background task
regenerates the briefing every 6 hours.
"""
import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path.home() / ".timmy" / "briefings.db"
_CACHE_MINUTES = 30
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ApprovalItem:
"""Lightweight representation used inside a Briefing.
The canonical mutable version (with persistence) lives in timmy.approvals.
This one travels with the Briefing dataclass as a read-only snapshot.
"""
id: str
title: str
description: str
proposed_action: str
impact: str
created_at: datetime
status: str
@dataclass
class Briefing:
generated_at: datetime
summary: str # 150-300 words
approval_items: list[ApprovalItem] = field(default_factory=list)
period_start: datetime = field(
default_factory=lambda: datetime.now(timezone.utc) - timedelta(hours=6)
)
period_end: datetime = field(
default_factory=lambda: datetime.now(timezone.utc)
)
# ---------------------------------------------------------------------------
# SQLite cache
# ---------------------------------------------------------------------------
def _get_cache_conn(db_path: Path = _DEFAULT_DB) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute(
"""
CREATE TABLE IF NOT EXISTS briefings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
generated_at TEXT NOT NULL,
period_start TEXT NOT NULL,
period_end TEXT NOT NULL,
summary TEXT NOT NULL
)
"""
)
conn.commit()
return conn
def _save_briefing(briefing: Briefing, db_path: Path = _DEFAULT_DB) -> None:
conn = _get_cache_conn(db_path)
conn.execute(
"""
INSERT INTO briefings (generated_at, period_start, period_end, summary)
VALUES (?, ?, ?, ?)
""",
(
briefing.generated_at.isoformat(),
briefing.period_start.isoformat(),
briefing.period_end.isoformat(),
briefing.summary,
),
)
conn.commit()
conn.close()
def _load_latest(db_path: Path = _DEFAULT_DB) -> Optional[Briefing]:
"""Load the most-recently cached briefing, or None if there is none."""
conn = _get_cache_conn(db_path)
row = conn.execute(
"SELECT * FROM briefings ORDER BY generated_at DESC LIMIT 1"
).fetchone()
conn.close()
if row is None:
return None
return Briefing(
generated_at=datetime.fromisoformat(row["generated_at"]),
period_start=datetime.fromisoformat(row["period_start"]),
period_end=datetime.fromisoformat(row["period_end"]),
summary=row["summary"],
)
def is_fresh(briefing: Briefing, max_age_minutes: int = _CACHE_MINUTES) -> bool:
"""Return True if the briefing was generated within max_age_minutes."""
now = datetime.now(timezone.utc)
age = now - briefing.generated_at.replace(tzinfo=timezone.utc) if briefing.generated_at.tzinfo is None else now - briefing.generated_at
return age.total_seconds() < max_age_minutes * 60
# ---------------------------------------------------------------------------
# Activity gathering helpers
# ---------------------------------------------------------------------------
def _gather_swarm_summary(since: datetime) -> str:
"""Pull recent task/agent stats from swarm.db. Graceful if DB missing."""
swarm_db = Path("data/swarm.db")
if not swarm_db.exists():
return "No swarm activity recorded yet."
try:
conn = sqlite3.connect(str(swarm_db))
conn.row_factory = sqlite3.Row
since_iso = since.isoformat()
completed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'completed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
failed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'failed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
agents = conn.execute(
"SELECT COUNT(*) as c FROM agents WHERE registered_at > ?",
(since_iso,),
).fetchone()["c"]
conn.close()
parts = []
if completed:
parts.append(f"{completed} task(s) completed")
if failed:
parts.append(f"{failed} task(s) failed")
if agents:
parts.append(f"{agents} new agent(s) joined the swarm")
return "; ".join(parts) if parts else "No swarm activity in this period."
except Exception as exc:
logger.debug("Swarm summary error: %s", exc)
return "Swarm data unavailable."
def _gather_chat_summary(since: datetime) -> str:
"""Pull recent chat messages from the in-memory log."""
try:
from dashboard.store import message_log
messages = message_log.all()
# Filter to messages in the briefing window (best-effort: no timestamps)
recent = messages[-10:] if len(messages) > 10 else messages
if not recent:
return "No recent conversations."
lines = []
for msg in recent:
role = "Owner" if msg.role == "user" else "Timmy"
lines.append(f"{role}: {msg.content[:120]}")
return "\n".join(lines)
except Exception as exc:
logger.debug("Chat summary error: %s", exc)
return "No recent conversations."
# ---------------------------------------------------------------------------
# BriefingEngine
# ---------------------------------------------------------------------------
class BriefingEngine:
"""Generates morning briefings by querying activity and asking Timmy."""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
def get_cached(self) -> Optional[Briefing]:
"""Return the cached briefing if it exists, without regenerating."""
return _load_latest(self._db_path)
def needs_refresh(self) -> bool:
"""True if there is no fresh briefing cached."""
cached = _load_latest(self._db_path)
if cached is None:
return True
return not is_fresh(cached)
def generate(self) -> Briefing:
"""Generate a fresh briefing. May take a few seconds (LLM call)."""
now = datetime.now(timezone.utc)
period_start = now - timedelta(hours=6)
swarm_info = _gather_swarm_summary(period_start)
chat_info = _gather_chat_summary(period_start)
prompt = (
"You are Timmy, a sovereign local AI companion.\n"
"Here is what happened since the last briefing:\n\n"
f"SWARM ACTIVITY:\n{swarm_info}\n\n"
f"RECENT CONVERSATIONS:\n{chat_info}\n\n"
"Summarize the last period of activity into a 5-minute morning briefing. "
"Be concise, warm, and direct. "
"Use plain prose — no bullet points. "
"Maximum 300 words. "
"End with a short paragraph listing any items that need the owner's approval, "
"or say 'No approvals needed today.' if there are none."
)
try:
summary = self._call_agent(prompt)
except Exception as exc:
logger.warning("generate(): agent call raised unexpectedly: %s", exc)
summary = (
"Good morning. Timmy is offline right now, so this briefing "
"could not be generated from live data. Check that Ollama is "
"running and try again."
)
# Attach any outstanding pending approval items
approval_items = self._load_pending_items()
briefing = Briefing(
generated_at=now,
summary=summary,
approval_items=approval_items,
period_start=period_start,
period_end=now,
)
_save_briefing(briefing, self._db_path)
logger.info("Briefing generated at %s", now.isoformat())
return briefing
def get_or_generate(self) -> Briefing:
"""Return a fresh cached briefing or generate a new one."""
cached = _load_latest(self._db_path)
if cached is not None and is_fresh(cached):
# Reattach live pending items (they change between page loads)
cached.approval_items = self._load_pending_items()
return cached
return self.generate()
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _call_agent(self, prompt: str) -> str:
"""Call Timmy's Agno agent and return the response text."""
try:
from timmy.agent import create_timmy
agent = create_timmy()
run = agent.run(prompt, stream=False)
return run.content if hasattr(run, "content") else str(run)
except Exception as exc:
logger.warning("Agent call failed during briefing generation: %s", exc)
return (
"Good morning. Timmy is offline right now, so this briefing "
"could not be generated from live data. Check that Ollama is "
"running and try again."
)
def _load_pending_items(self) -> list[ApprovalItem]:
"""Return pending ApprovalItems from the approvals DB."""
try:
from timmy import approvals as _approvals
raw_items = _approvals.list_pending()
return [
ApprovalItem(
id=item.id,
title=item.title,
description=item.description,
proposed_action=item.proposed_action,
impact=item.impact,
created_at=item.created_at,
status=item.status,
)
for item in raw_items
]
except Exception as exc:
logger.debug("Could not load approval items: %s", exc)
return []
# Module-level singleton
engine = BriefingEngine()

201
tests/test_approvals.py Normal file
View File

@@ -0,0 +1,201 @@
"""Tests for timmy/approvals.py — governance layer."""
import tempfile
from datetime import datetime, timezone
from pathlib import Path
import pytest
from timmy.approvals import (
GOLDEN_TIMMY,
ApprovalItem,
approve,
create_item,
expire_old,
get_item,
list_all,
list_pending,
reject,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def tmp_db(tmp_path):
"""A fresh per-test SQLite DB so tests are isolated."""
return tmp_path / "test_approvals.db"
# ---------------------------------------------------------------------------
# GOLDEN_TIMMY constant
# ---------------------------------------------------------------------------
def test_golden_timmy_is_true():
"""GOLDEN_TIMMY must default to True — the governance foundation."""
assert GOLDEN_TIMMY is True
# ---------------------------------------------------------------------------
# ApprovalItem creation
# ---------------------------------------------------------------------------
def test_create_item_returns_pending(tmp_db):
item = create_item("Deploy new model", "Update Ollama model", "pull llama3.3", impact="medium", db_path=tmp_db)
assert item.status == "pending"
assert item.title == "Deploy new model"
assert item.impact == "medium"
assert isinstance(item.id, str) and len(item.id) > 0
assert isinstance(item.created_at, datetime)
def test_create_item_default_impact_is_low(tmp_db):
item = create_item("Minor task", "desc", "do thing", db_path=tmp_db)
assert item.impact == "low"
def test_create_item_persists_across_calls(tmp_db):
item = create_item("Persistent task", "persists", "action", db_path=tmp_db)
fetched = get_item(item.id, db_path=tmp_db)
assert fetched is not None
assert fetched.id == item.id
assert fetched.title == "Persistent task"
# ---------------------------------------------------------------------------
# Listing
# ---------------------------------------------------------------------------
def test_list_pending_returns_only_pending(tmp_db):
item1 = create_item("Task A", "desc", "action A", db_path=tmp_db)
item2 = create_item("Task B", "desc", "action B", db_path=tmp_db)
approve(item1.id, db_path=tmp_db)
pending = list_pending(db_path=tmp_db)
ids = [i.id for i in pending]
assert item2.id in ids
assert item1.id not in ids
def test_list_all_includes_all_statuses(tmp_db):
item1 = create_item("Task A", "d", "a", db_path=tmp_db)
item2 = create_item("Task B", "d", "b", db_path=tmp_db)
approve(item1.id, db_path=tmp_db)
reject(item2.id, db_path=tmp_db)
all_items = list_all(db_path=tmp_db)
statuses = {i.status for i in all_items}
assert "approved" in statuses
assert "rejected" in statuses
def test_list_pending_empty_on_fresh_db(tmp_db):
assert list_pending(db_path=tmp_db) == []
# ---------------------------------------------------------------------------
# Approve / Reject
# ---------------------------------------------------------------------------
def test_approve_changes_status(tmp_db):
item = create_item("Approve me", "desc", "act", db_path=tmp_db)
updated = approve(item.id, db_path=tmp_db)
assert updated is not None
assert updated.status == "approved"
def test_reject_changes_status(tmp_db):
item = create_item("Reject me", "desc", "act", db_path=tmp_db)
updated = reject(item.id, db_path=tmp_db)
assert updated is not None
assert updated.status == "rejected"
def test_approve_nonexistent_returns_none(tmp_db):
result = approve("not-a-real-id", db_path=tmp_db)
assert result is None
def test_reject_nonexistent_returns_none(tmp_db):
result = reject("not-a-real-id", db_path=tmp_db)
assert result is None
# ---------------------------------------------------------------------------
# Get item
# ---------------------------------------------------------------------------
def test_get_item_returns_correct_item(tmp_db):
item = create_item("Get me", "d", "a", db_path=tmp_db)
fetched = get_item(item.id, db_path=tmp_db)
assert fetched is not None
assert fetched.id == item.id
assert fetched.title == "Get me"
def test_get_item_nonexistent_returns_none(tmp_db):
assert get_item("ghost-id", db_path=tmp_db) is None
# ---------------------------------------------------------------------------
# Expiry
# ---------------------------------------------------------------------------
def test_expire_old_removes_stale_pending(tmp_db):
"""Items created long before the cutoff should be expired."""
import sqlite3
from timmy.approvals import _get_conn
item = create_item("Old item", "d", "a", db_path=tmp_db)
# Backdate the created_at to 8 days ago
old_ts = (datetime.now(timezone.utc).replace(year=2020)).isoformat()
conn = _get_conn(tmp_db)
conn.execute("UPDATE approval_items SET created_at = ? WHERE id = ?", (old_ts, item.id))
conn.commit()
conn.close()
removed = expire_old(db_path=tmp_db)
assert removed == 1
assert get_item(item.id, db_path=tmp_db) is None
def test_expire_old_keeps_actioned_items(tmp_db):
"""Approved/rejected items should NOT be expired."""
import sqlite3
from timmy.approvals import _get_conn
item = create_item("Actioned item", "d", "a", db_path=tmp_db)
approve(item.id, db_path=tmp_db)
# Backdate
old_ts = (datetime.now(timezone.utc).replace(year=2020)).isoformat()
conn = _get_conn(tmp_db)
conn.execute("UPDATE approval_items SET created_at = ? WHERE id = ?", (old_ts, item.id))
conn.commit()
conn.close()
removed = expire_old(db_path=tmp_db)
assert removed == 0
assert get_item(item.id, db_path=tmp_db) is not None
def test_expire_old_returns_zero_when_nothing_to_expire(tmp_db):
create_item("Fresh item", "d", "a", db_path=tmp_db)
removed = expire_old(db_path=tmp_db)
assert removed == 0
# ---------------------------------------------------------------------------
# Multiple items ordering
# ---------------------------------------------------------------------------
def test_list_pending_newest_first(tmp_db):
item1 = create_item("First", "d", "a", db_path=tmp_db)
item2 = create_item("Second", "d", "b", db_path=tmp_db)
pending = list_pending(db_path=tmp_db)
# Most recently created should appear first
assert pending[0].id == item2.id
assert pending[1].id == item1.id

246
tests/test_briefing.py Normal file
View File

@@ -0,0 +1,246 @@
"""Tests for timmy/briefing.py — morning briefing engine."""
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from timmy.briefing import (
Briefing,
BriefingEngine,
_load_latest,
_save_briefing,
is_fresh,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def tmp_db(tmp_path):
return tmp_path / "test_briefings.db"
@pytest.fixture()
def engine(tmp_db):
return BriefingEngine(db_path=tmp_db)
def _make_briefing(offset_minutes: int = 0) -> Briefing:
"""Create a Briefing with generated_at offset by offset_minutes from now."""
now = datetime.now(timezone.utc) - timedelta(minutes=offset_minutes)
return Briefing(
generated_at=now,
summary="Good morning. All quiet on the swarm front.",
approval_items=[],
period_start=now - timedelta(hours=6),
period_end=now,
)
# ---------------------------------------------------------------------------
# Briefing dataclass
# ---------------------------------------------------------------------------
def test_briefing_fields():
b = _make_briefing()
assert isinstance(b.generated_at, datetime)
assert isinstance(b.summary, str)
assert isinstance(b.approval_items, list)
assert isinstance(b.period_start, datetime)
assert isinstance(b.period_end, datetime)
def test_briefing_default_period_is_6_hours():
b = Briefing(generated_at=datetime.now(timezone.utc), summary="test")
delta = b.period_end - b.period_start
assert abs(delta.total_seconds() - 6 * 3600) < 5 # within 5 seconds
# ---------------------------------------------------------------------------
# is_fresh
# ---------------------------------------------------------------------------
def test_is_fresh_recent_briefing():
b = _make_briefing(offset_minutes=5)
assert is_fresh(b) is True
def test_is_fresh_stale_briefing():
b = _make_briefing(offset_minutes=45)
assert is_fresh(b) is False
def test_is_fresh_custom_max_age():
b = _make_briefing(offset_minutes=10)
assert is_fresh(b, max_age_minutes=5) is False
assert is_fresh(b, max_age_minutes=15) is True
# ---------------------------------------------------------------------------
# SQLite cache (save/load round-trip)
# ---------------------------------------------------------------------------
def test_save_and_load_briefing(tmp_db):
b = _make_briefing()
_save_briefing(b, db_path=tmp_db)
loaded = _load_latest(db_path=tmp_db)
assert loaded is not None
assert loaded.summary == b.summary
def test_load_latest_returns_none_when_empty(tmp_db):
assert _load_latest(db_path=tmp_db) is None
def test_load_latest_returns_most_recent(tmp_db):
old = _make_briefing(offset_minutes=60)
new = _make_briefing(offset_minutes=5)
_save_briefing(old, db_path=tmp_db)
_save_briefing(new, db_path=tmp_db)
loaded = _load_latest(db_path=tmp_db)
assert loaded is not None
# Should return the newer one (generated_at closest to now)
assert abs((loaded.generated_at.replace(tzinfo=timezone.utc) - new.generated_at).total_seconds()) < 5
# ---------------------------------------------------------------------------
# BriefingEngine.needs_refresh
# ---------------------------------------------------------------------------
def test_needs_refresh_when_no_cache(engine, tmp_db):
assert engine.needs_refresh() is True
def test_needs_refresh_false_when_fresh(engine, tmp_db):
fresh = _make_briefing(offset_minutes=5)
_save_briefing(fresh, db_path=tmp_db)
assert engine.needs_refresh() is False
def test_needs_refresh_true_when_stale(engine, tmp_db):
stale = _make_briefing(offset_minutes=45)
_save_briefing(stale, db_path=tmp_db)
assert engine.needs_refresh() is True
# ---------------------------------------------------------------------------
# BriefingEngine.get_cached
# ---------------------------------------------------------------------------
def test_get_cached_returns_none_when_empty(engine):
assert engine.get_cached() is None
def test_get_cached_returns_briefing(engine, tmp_db):
b = _make_briefing()
_save_briefing(b, db_path=tmp_db)
cached = engine.get_cached()
assert cached is not None
assert cached.summary == b.summary
# ---------------------------------------------------------------------------
# BriefingEngine.generate (agent mocked)
# ---------------------------------------------------------------------------
def test_generate_returns_briefing(engine):
with patch.object(engine, "_call_agent", return_value="All is well."):
with patch.object(engine, "_load_pending_items", return_value=[]):
b = engine.generate()
assert isinstance(b, Briefing)
assert b.summary == "All is well."
assert b.approval_items == []
def test_generate_persists_to_cache(engine, tmp_db):
with patch.object(engine, "_call_agent", return_value="Morning report."):
with patch.object(engine, "_load_pending_items", return_value=[]):
engine.generate()
cached = _load_latest(db_path=tmp_db)
assert cached is not None
assert cached.summary == "Morning report."
def test_generate_handles_agent_failure(engine):
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
with patch.object(engine, "_load_pending_items", return_value=[]):
b = engine.generate()
assert isinstance(b, Briefing)
assert "offline" in b.summary.lower() or "Morning" in b.summary
# ---------------------------------------------------------------------------
# BriefingEngine.get_or_generate
# ---------------------------------------------------------------------------
def test_get_or_generate_uses_cache_when_fresh(engine, tmp_db):
fresh = _make_briefing(offset_minutes=5)
_save_briefing(fresh, db_path=tmp_db)
with patch.object(engine, "generate") as mock_gen:
with patch.object(engine, "_load_pending_items", return_value=[]):
result = engine.get_or_generate()
mock_gen.assert_not_called()
assert result.summary == fresh.summary
def test_get_or_generate_generates_when_stale(engine, tmp_db):
stale = _make_briefing(offset_minutes=45)
_save_briefing(stale, db_path=tmp_db)
with patch.object(engine, "_call_agent", return_value="New report."):
with patch.object(engine, "_load_pending_items", return_value=[]):
result = engine.get_or_generate()
assert result.summary == "New report."
def test_get_or_generate_generates_when_no_cache(engine):
with patch.object(engine, "_call_agent", return_value="Fresh report."):
with patch.object(engine, "_load_pending_items", return_value=[]):
result = engine.get_or_generate()
assert result.summary == "Fresh report."
# ---------------------------------------------------------------------------
# BriefingEngine._call_agent (unit — mocked agent)
# ---------------------------------------------------------------------------
def test_call_agent_returns_content(engine):
mock_run = MagicMock()
mock_run.content = "Agent said hello."
mock_agent = MagicMock()
mock_agent.run.return_value = mock_run
with patch("timmy.briefing.BriefingEngine._call_agent", wraps=engine._call_agent):
with patch("timmy.agent.create_timmy", return_value=mock_agent):
result = engine._call_agent("Say hello.")
# _call_agent calls create_timmy internally; result from content attr
assert isinstance(result, str)
def test_call_agent_falls_back_on_exception(engine):
with patch("timmy.agent.create_timmy", side_effect=Exception("no ollama")):
result = engine._call_agent("prompt")
assert "offline" in result.lower()
# ---------------------------------------------------------------------------
# Push notification hook
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_notify_briefing_ready_logs(caplog):
"""notify_briefing_ready should log and call notifier.notify."""
from notifications.push import notify_briefing_ready, PushNotifier
b = _make_briefing()
with patch("notifications.push.notifier") as mock_notifier:
await notify_briefing_ready(b)
mock_notifier.notify.assert_called_once()
call_kwargs = mock_notifier.notify.call_args
assert "Briefing" in call_kwargs[1]["title"] or "Briefing" in call_kwargs[0][0]