diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index 7e1ccba9..3450b8c9 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
+from dashboard.routes.skills import router as skills_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -218,6 +219,32 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
+_SKILL_DISCOVERY_INTERVAL = 600 # 10 minutes
+
+
+async def _skill_discovery_scheduler() -> None:
+ """Background task: scan session logs for reusable skill patterns."""
+ await asyncio.sleep(20) # Stagger after other schedulers
+
+ while True:
+ try:
+ from timmy.skill_discovery import get_skill_discovery_engine
+
+ engine = get_skill_discovery_engine()
+ discovered = await engine.scan()
+ if discovered:
+ logger.info(
+ "Skill discovery: found %d new skill(s)",
+ len(discovered),
+ )
+ except asyncio.CancelledError:
+ raise
+ except Exception as exc:
+ logger.error("Skill discovery scheduler error: %s", exc)
+
+ await asyncio.sleep(_SKILL_DISCOVERY_INTERVAL)
+
+
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -380,6 +407,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
+ asyncio.create_task(_skill_discovery_scheduler()),
]
@@ -631,6 +659,7 @@ app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
+app.include_router(skills_router)
@app.websocket("/ws")
diff --git a/src/dashboard/routes/skills.py b/src/dashboard/routes/skills.py
new file mode 100644
index 00000000..2dae470e
--- /dev/null
+++ b/src/dashboard/routes/skills.py
@@ -0,0 +1,82 @@
+"""Skill Discovery routes — view and manage auto-discovered skills."""
+
+import logging
+
+from fastapi import APIRouter, Form, HTTPException, Request
+from fastapi.responses import HTMLResponse
+
+from dashboard.templating import templates
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/skills", tags=["skills"])
+
+
+@router.get("", response_class=HTMLResponse)
+async def skills_page(request: Request):
+ """Main skill discovery page."""
+ from timmy.skill_discovery import get_skill_discovery_engine
+
+ engine = get_skill_discovery_engine()
+ skills = engine.list_skills(limit=50)
+ counts = engine.skill_count()
+ return templates.TemplateResponse(
+ request,
+ "skills.html",
+ {"skills": skills, "counts": counts},
+ )
+
+
+@router.get("/list", response_class=HTMLResponse)
+async def skills_list_partial(request: Request, status: str = ""):
+ """HTMX partial: return skill list for polling."""
+ from timmy.skill_discovery import get_skill_discovery_engine
+
+ engine = get_skill_discovery_engine()
+ skills = engine.list_skills(status=status or None, limit=50)
+ counts = engine.skill_count()
+ return templates.TemplateResponse(
+ request,
+ "partials/skills_list.html",
+ {"skills": skills, "counts": counts},
+ )
+
+
+@router.post("/{skill_id}/status", response_class=HTMLResponse)
+async def update_skill_status(request: Request, skill_id: str, status: str = Form(...)):
+ """Update a skill's status (confirm / reject / archive)."""
+ from timmy.skill_discovery import get_skill_discovery_engine
+
+ engine = get_skill_discovery_engine()
+ if not engine.update_status(skill_id, status):
+ raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
+
+ skills = engine.list_skills(limit=50)
+ counts = engine.skill_count()
+ return templates.TemplateResponse(
+ request,
+ "partials/skills_list.html",
+ {"skills": skills, "counts": counts},
+ )
+
+
+@router.post("/scan", response_class=HTMLResponse)
+async def trigger_scan(request: Request):
+ """Manually trigger a skill discovery scan."""
+ from timmy.skill_discovery import get_skill_discovery_engine
+
+ engine = get_skill_discovery_engine()
+ try:
+ discovered = await engine.scan()
+ msg = f"Scan complete: {len(discovered)} new skill(s) found."
+ except Exception as exc:
+ logger.warning("Manual skill scan failed: %s", exc)
+ msg = f"Scan failed: {exc}"
+
+ skills = engine.list_skills(limit=50)
+ counts = engine.skill_count()
+ return templates.TemplateResponse(
+ request,
+ "partials/skills_list.html",
+ {"skills": skills, "counts": counts, "scan_message": msg},
+ )
diff --git a/src/dashboard/templates/partials/skills_list.html b/src/dashboard/templates/partials/skills_list.html
new file mode 100644
index 00000000..adffe6d4
--- /dev/null
+++ b/src/dashboard/templates/partials/skills_list.html
@@ -0,0 +1,74 @@
+{% if scan_message is defined and scan_message %}
+
+ {{ scan_message }}
+
+{% endif %}
+
+{% if skills %}
+
+
+
+
+ | Name |
+ Category |
+ Confidence |
+ Status |
+ Discovered |
+ Actions |
+
+
+
+ {% for skill in skills %}
+
+
+ {{ skill.name }}
+ {% if skill.description %}
+ {{ skill.description[:100] }}
+ {% endif %}
+ |
+ {{ skill.category }} |
+
+ {% set conf = skill.confidence * 100 %}
+
+ {{ "%.0f"|format(conf) }}%
+
+ |
+
+ {% if skill.status == 'confirmed' %}
+ confirmed
+ {% elif skill.status == 'rejected' %}
+ rejected
+ {% elif skill.status == 'archived' %}
+ archived
+ {% else %}
+ discovered
+ {% endif %}
+ |
+ {{ skill.created_at[:10] if skill.created_at else '' }} |
+
+ {% if skill.status == 'discovered' %}
+
+
+ {% elif skill.status == 'confirmed' %}
+
+ {% endif %}
+ |
+
+ {% endfor %}
+
+
+
+{% else %}
+
+ No skills discovered yet. Click "Scan Now" to analyze recent activity.
+
+{% endif %}
diff --git a/src/dashboard/templates/skills.html b/src/dashboard/templates/skills.html
new file mode 100644
index 00000000..e3563234
--- /dev/null
+++ b/src/dashboard/templates/skills.html
@@ -0,0 +1,38 @@
+{% extends "base.html" %}
+
+{% block title %}Skill Discovery - Timmy Time{% endblock %}
+
+{% block extra_styles %}{% endblock %}
+
+{% block content %}
+
+
+ {% from "macros.html" import panel %}
+
+ {% call panel("SKILL DISCOVERY", id="skills-panel") %}
+
+
+
+ Discovered: {{ counts.get('discovered', 0) }} |
+ Confirmed: {{ counts.get('confirmed', 0) }} |
+ Archived: {{ counts.get('archived', 0) }}
+
+
+
+
+
+
+ {% include "partials/skills_list.html" %}
+
+ {% endcall %}
+
+
+{% endblock %}
diff --git a/src/timmy/skill_discovery.py b/src/timmy/skill_discovery.py
new file mode 100644
index 00000000..76d0f320
--- /dev/null
+++ b/src/timmy/skill_discovery.py
@@ -0,0 +1,495 @@
+"""Automated Skill Discovery Pipeline.
+
+Monitors the agent's session logs for high-confidence successful outcomes,
+uses the LLM router to deconstruct successful action sequences into
+reusable skill templates, and stores discovered skills with metadata.
+
+Notifies the dashboard when new skills are crystallized.
+"""
+
+import json
+import logging
+import sqlite3
+import uuid
+from collections.abc import Generator
+from contextlib import closing, contextmanager
+from dataclasses import dataclass, field
+from datetime import UTC, datetime
+from pathlib import Path
+from typing import Any
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+# ---------------------------------------------------------------------------
+# Database
+# ---------------------------------------------------------------------------
+
+DB_PATH = Path(settings.repo_root) / "data" / "skills.db"
+
+_SCHEMA = """
+CREATE TABLE IF NOT EXISTS discovered_skills (
+ id TEXT PRIMARY KEY,
+ name TEXT NOT NULL,
+ description TEXT DEFAULT '',
+ category TEXT DEFAULT 'general',
+ source_entries TEXT DEFAULT '[]',
+ template TEXT DEFAULT '',
+ confidence REAL DEFAULT 0.0,
+ status TEXT DEFAULT 'discovered',
+ created_at TEXT DEFAULT (datetime('now')),
+ updated_at TEXT DEFAULT (datetime('now'))
+);
+CREATE INDEX IF NOT EXISTS idx_skills_status ON discovered_skills(status);
+CREATE INDEX IF NOT EXISTS idx_skills_category ON discovered_skills(category);
+CREATE INDEX IF NOT EXISTS idx_skills_created ON discovered_skills(created_at);
+"""
+
+VALID_STATUSES = {"discovered", "confirmed", "rejected", "archived"}
+
+
+@contextmanager
+def _get_db() -> Generator[sqlite3.Connection, None, None]:
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ with closing(sqlite3.connect(str(DB_PATH))) as conn:
+ conn.row_factory = sqlite3.Row
+ conn.execute(f"PRAGMA busy_timeout = {settings.db_busy_timeout_ms}")
+ conn.executescript(_SCHEMA)
+ yield conn
+
+
+# ---------------------------------------------------------------------------
+# Data model
+# ---------------------------------------------------------------------------
+
+
+@dataclass
+class DiscoveredSkill:
+ """A skill extracted from successful agent actions."""
+
+ id: str = field(default_factory=lambda: f"skill_{uuid.uuid4().hex[:12]}")
+ name: str = ""
+ description: str = ""
+ category: str = "general"
+ source_entries: list[dict] = field(default_factory=list)
+ template: str = ""
+ confidence: float = 0.0
+ status: str = "discovered"
+ created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
+ updated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "id": self.id,
+ "name": self.name,
+ "description": self.description,
+ "category": self.category,
+ "source_entries": self.source_entries,
+ "template": self.template,
+ "confidence": self.confidence,
+ "status": self.status,
+ "created_at": self.created_at,
+ "updated_at": self.updated_at,
+ }
+
+
+# ---------------------------------------------------------------------------
+# Prompt template for LLM analysis
+# ---------------------------------------------------------------------------
+
+_ANALYSIS_PROMPT = """\
+You are a skill extraction engine. Analyze the following sequence of \
+successful agent actions and extract a reusable skill template.
+
+Actions:
+{actions}
+
+Respond with a JSON object containing:
+- "name": short skill name (2-5 words)
+- "description": one-sentence description of what this skill does
+- "category": one of "research", "coding", "devops", "communication", "analysis", "general"
+- "template": a step-by-step template that generalizes this action sequence
+- "confidence": your confidence that this is a genuinely reusable skill (0.0-1.0)
+
+Respond ONLY with valid JSON, no markdown fences or extra text."""
+
+
+# ---------------------------------------------------------------------------
+# Core engine
+# ---------------------------------------------------------------------------
+
+
+class SkillDiscoveryEngine:
+ """Scans session logs for successful action patterns and extracts skills."""
+
+ def __init__(
+ self,
+ confidence_threshold: float = 0.7,
+ min_actions: int = 2,
+ ):
+ self.confidence_threshold = confidence_threshold
+ self.min_actions = min_actions
+
+ # -- Public API ---------------------------------------------------------
+
+ async def scan(self) -> list[DiscoveredSkill]:
+ """Scan recent session logs and discover new skills.
+
+ Returns a list of newly discovered skills.
+ """
+ entries = self._load_recent_successful_actions()
+ if len(entries) < self.min_actions:
+ logger.debug(
+ "Skill discovery: only %d actions found (need %d), skipping",
+ len(entries),
+ self.min_actions,
+ )
+ return []
+
+ # Group entries into action sequences (tool calls clustered together)
+ sequences = self._cluster_action_sequences(entries)
+ discovered: list[DiscoveredSkill] = []
+
+ for seq in sequences:
+ if len(seq) < self.min_actions:
+ continue
+
+ skill = await self._analyze_sequence(seq)
+ if skill and skill.confidence >= self.confidence_threshold:
+ # Check for duplicates
+ if not self._is_duplicate(skill):
+ self._save_skill(skill)
+ await self._notify(skill)
+ discovered.append(skill)
+ logger.info(
+ "Discovered skill: %s (confidence=%.2f)",
+ skill.name,
+ skill.confidence,
+ )
+
+ return discovered
+
+ def list_skills(
+ self,
+ status: str | None = None,
+ limit: int = 50,
+ ) -> list[dict[str, Any]]:
+ """Return discovered skills from the database."""
+ with _get_db() as conn:
+ if status and status in VALID_STATUSES:
+ rows = conn.execute(
+ "SELECT * FROM discovered_skills WHERE status = ? "
+ "ORDER BY created_at DESC LIMIT ?",
+ (status, limit),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ "SELECT * FROM discovered_skills ORDER BY created_at DESC LIMIT ?",
+ (limit,),
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+ def get_skill(self, skill_id: str) -> dict[str, Any] | None:
+ """Get a single skill by ID."""
+ with _get_db() as conn:
+ row = conn.execute(
+ "SELECT * FROM discovered_skills WHERE id = ?",
+ (skill_id,),
+ ).fetchone()
+ return dict(row) if row else None
+
+ def update_status(self, skill_id: str, new_status: str) -> bool:
+ """Update a skill's status (confirm, reject, archive)."""
+ if new_status not in VALID_STATUSES:
+ return False
+ with _get_db() as conn:
+ conn.execute(
+ "UPDATE discovered_skills SET status = ?, updated_at = ? WHERE id = ?",
+ (new_status, datetime.now(UTC).isoformat(), skill_id),
+ )
+ conn.commit()
+ return True
+
+ def skill_count(self) -> dict[str, int]:
+ """Return counts of skills by status."""
+ with _get_db() as conn:
+ rows = conn.execute(
+ "SELECT status, COUNT(*) as cnt FROM discovered_skills GROUP BY status"
+ ).fetchall()
+ return {r["status"]: r["cnt"] for r in rows}
+
+ # -- Internal -----------------------------------------------------------
+
+ def _load_recent_successful_actions(self, limit: int = 100) -> list[dict]:
+ """Load recent successful tool calls from session logs."""
+ try:
+ from timmy.session_logger import get_session_logger
+
+ sl = get_session_logger()
+ entries = sl.get_recent_entries(limit=limit)
+ # Filter for successful tool calls and high-confidence messages
+ return [
+ e
+ for e in entries
+ if (e.get("type") == "tool_call")
+ or (
+ e.get("type") == "message"
+ and e.get("role") == "timmy"
+ and (e.get("confidence") or 0) >= 0.7
+ )
+ ]
+ except Exception as exc:
+ logger.warning("Failed to load session entries: %s", exc)
+ return []
+
+ def _cluster_action_sequences(
+ self,
+ entries: list[dict],
+ max_gap_seconds: int = 300,
+ ) -> list[list[dict]]:
+ """Group entries into sequences based on temporal proximity."""
+ if not entries:
+ return []
+
+ from datetime import datetime as dt
+
+ sequences: list[list[dict]] = []
+ current_seq: list[dict] = [entries[0]]
+
+ for entry in entries[1:]:
+ try:
+ prev_ts = dt.fromisoformat(current_seq[-1].get("timestamp", ""))
+ curr_ts = dt.fromisoformat(entry.get("timestamp", ""))
+ gap = abs((curr_ts - prev_ts).total_seconds())
+ except (ValueError, TypeError):
+ gap = max_gap_seconds + 1
+
+ if gap <= max_gap_seconds:
+ current_seq.append(entry)
+ else:
+ if current_seq:
+ sequences.append(current_seq)
+ current_seq = [entry]
+
+ if current_seq:
+ sequences.append(current_seq)
+
+ return sequences
+
+ async def _analyze_sequence(self, sequence: list[dict]) -> DiscoveredSkill | None:
+ """Use the LLM router to analyze an action sequence."""
+ actions_text = self._format_actions(sequence)
+ prompt = _ANALYSIS_PROMPT.format(actions=actions_text)
+
+ try:
+ from infrastructure.router.cascade import get_router
+
+ router = get_router()
+ response = await router.complete(
+ messages=[
+ {
+ "role": "system",
+ "content": "You extract reusable skills from agent actions.",
+ },
+ {"role": "user", "content": prompt},
+ ],
+ )
+ content = response.get("content", "")
+ return self._parse_llm_response(content, sequence)
+ except Exception as exc:
+ logger.warning("LLM analysis failed, using heuristic: %s", exc)
+ return self._heuristic_extraction(sequence)
+
+ def _format_actions(self, sequence: list[dict]) -> str:
+ """Format action sequence for the LLM prompt."""
+ lines = []
+ for i, entry in enumerate(sequence, 1):
+ etype = entry.get("type", "unknown")
+ if etype == "tool_call":
+ tool = entry.get("tool", "unknown")
+ result = (entry.get("result") or "")[:200]
+ lines.append(f"{i}. Tool: {tool} → {result}")
+ elif etype == "message":
+ content = (entry.get("content") or "")[:200]
+ lines.append(f"{i}. Response: {content}")
+ elif etype == "decision":
+ decision = (entry.get("decision") or "")[:200]
+ lines.append(f"{i}. Decision: {decision}")
+ return "\n".join(lines)
+
+ def _parse_llm_response(
+ self,
+ content: str,
+ source_entries: list[dict],
+ ) -> DiscoveredSkill | None:
+ """Parse LLM JSON response into a DiscoveredSkill."""
+ try:
+ # Strip markdown fences if present
+ cleaned = content.strip()
+ if cleaned.startswith("```"):
+ cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned[3:]
+ if cleaned.endswith("```"):
+ cleaned = cleaned[:-3]
+ cleaned = cleaned.strip()
+
+ data = json.loads(cleaned)
+ return DiscoveredSkill(
+ name=data.get("name", "Unnamed Skill"),
+ description=data.get("description", ""),
+ category=data.get("category", "general"),
+ template=data.get("template", ""),
+ confidence=float(data.get("confidence", 0.0)),
+ source_entries=source_entries[:5], # Keep first 5 for reference
+ )
+ except (json.JSONDecodeError, ValueError, TypeError) as exc:
+ logger.debug("Failed to parse LLM response: %s", exc)
+ return None
+
+ def _heuristic_extraction(self, sequence: list[dict]) -> DiscoveredSkill | None:
+ """Fallback: extract skill from action patterns without LLM."""
+ tool_calls = [e for e in sequence if e.get("type") == "tool_call"]
+ if not tool_calls:
+ return None
+
+ # Name from the dominant tool
+ tool_names = [e.get("tool", "unknown") for e in tool_calls]
+ dominant_tool = max(set(tool_names), key=tool_names.count)
+
+ # Simple template from the tool sequence
+ steps = []
+ for i, tc in enumerate(tool_calls[:10], 1):
+ steps.append(f"Step {i}: Use {tc.get('tool', 'unknown')}")
+
+ return DiscoveredSkill(
+ name=f"{dominant_tool.replace('_', ' ').title()} Pattern",
+ description=f"Automated pattern using {dominant_tool} ({len(tool_calls)} steps)",
+ category="general",
+ template="\n".join(steps),
+ confidence=0.5, # Lower confidence for heuristic
+ source_entries=sequence[:5],
+ )
+
+ def _is_duplicate(self, skill: DiscoveredSkill) -> bool:
+ """Check if a similar skill already exists."""
+ with _get_db() as conn:
+ rows = conn.execute(
+ "SELECT name FROM discovered_skills WHERE name = ? AND status != 'rejected'",
+ (skill.name,),
+ ).fetchall()
+ return len(rows) > 0
+
+ def _save_skill(self, skill: DiscoveredSkill) -> None:
+ """Persist a discovered skill to the database."""
+ with _get_db() as conn:
+ conn.execute(
+ """INSERT INTO discovered_skills
+ (id, name, description, category, source_entries,
+ template, confidence, status, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ (
+ skill.id,
+ skill.name,
+ skill.description,
+ skill.category,
+ json.dumps(skill.source_entries),
+ skill.template,
+ skill.confidence,
+ skill.status,
+ skill.created_at,
+ skill.updated_at,
+ ),
+ )
+ conn.commit()
+
+ def _write_skill_file(self, skill: DiscoveredSkill) -> Path:
+ """Write a skill template to the skills/ directory."""
+ skills_dir = Path(settings.repo_root) / "skills" / "discovered"
+ skills_dir.mkdir(parents=True, exist_ok=True)
+
+ filename = skill.name.lower().replace(" ", "_") + ".md"
+ filepath = skills_dir / filename
+
+ content = f"""# {skill.name}
+
+**Category:** {skill.category}
+**Confidence:** {skill.confidence:.0%}
+**Discovered:** {skill.created_at[:10]}
+**Status:** {skill.status}
+
+## Description
+
+{skill.description}
+
+## Template
+
+{skill.template}
+"""
+ filepath.write_text(content)
+ logger.info("Wrote skill file: %s", filepath)
+ return filepath
+
+ async def _notify(self, skill: DiscoveredSkill) -> None:
+ """Notify the dashboard about a newly discovered skill."""
+ # Push notification
+ try:
+ from infrastructure.notifications.push import notifier
+
+ notifier.notify(
+ title="Skill Discovered",
+ message=f"{skill.name} (confidence: {skill.confidence:.0%})",
+ category="system",
+ )
+ except Exception as exc:
+ logger.debug("Push notification failed: %s", exc)
+
+ # WebSocket broadcast
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+
+ await ws_manager.broadcast(
+ "skill_discovered",
+ {
+ "id": skill.id,
+ "name": skill.name,
+ "confidence": skill.confidence,
+ "category": skill.category,
+ },
+ )
+ except Exception as exc:
+ logger.debug("WebSocket broadcast failed: %s", exc)
+
+ # Event bus
+ try:
+ from infrastructure.events.bus import Event, get_event_bus
+
+ await get_event_bus().publish(
+ Event(
+ type="skill.discovered",
+ source="skill_discovery",
+ data=skill.to_dict(),
+ )
+ )
+ except Exception as exc:
+ logger.debug("Event bus publish failed: %s", exc)
+
+ # Write skill file to skills/ directory
+ try:
+ self._write_skill_file(skill)
+ except Exception as exc:
+ logger.debug("Skill file write failed: %s", exc)
+
+
+# ---------------------------------------------------------------------------
+# Singleton
+# ---------------------------------------------------------------------------
+
+_engine: SkillDiscoveryEngine | None = None
+
+
+def get_skill_discovery_engine() -> SkillDiscoveryEngine:
+ """Get or create the global skill discovery engine."""
+ global _engine
+ if _engine is None:
+ _engine = SkillDiscoveryEngine()
+ return _engine
diff --git a/tests/unit/test_skill_discovery.py b/tests/unit/test_skill_discovery.py
new file mode 100644
index 00000000..aa4b0141
--- /dev/null
+++ b/tests/unit/test_skill_discovery.py
@@ -0,0 +1,410 @@
+"""Unit tests for the skill discovery pipeline.
+
+Tests the discovery engine's core logic: action clustering, skill extraction,
+database persistence, deduplication, and status management.
+"""
+
+from __future__ import annotations
+
+import json
+from datetime import datetime, timedelta
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from timmy.skill_discovery import (
+ DiscoveredSkill,
+ SkillDiscoveryEngine,
+)
+
+
+@pytest.fixture
+def engine():
+ """Create a fresh SkillDiscoveryEngine for each test."""
+ return SkillDiscoveryEngine(confidence_threshold=0.7, min_actions=2)
+
+
+@pytest.fixture(autouse=True)
+def temp_db(tmp_path, monkeypatch):
+ """Use a temporary database for each test."""
+ db_path = tmp_path / "skills.db"
+ monkeypatch.setattr("timmy.skill_discovery.DB_PATH", db_path)
+ return db_path
+
+
+# ---------------------------------------------------------------------------
+# DiscoveredSkill dataclass
+# ---------------------------------------------------------------------------
+
+
+class TestDiscoveredSkill:
+ def test_defaults(self):
+ skill = DiscoveredSkill()
+ assert skill.name == ""
+ assert skill.status == "discovered"
+ assert skill.confidence == 0.0
+ assert skill.id.startswith("skill_")
+
+ def test_to_dict(self):
+ skill = DiscoveredSkill(name="Test Skill", confidence=0.85)
+ d = skill.to_dict()
+ assert d["name"] == "Test Skill"
+ assert d["confidence"] == 0.85
+ assert "id" in d
+ assert "created_at" in d
+
+ def test_custom_fields(self):
+ skill = DiscoveredSkill(
+ name="Code Review",
+ category="coding",
+ confidence=0.92,
+ template="Step 1: Read code\nStep 2: Analyze",
+ )
+ assert skill.category == "coding"
+ assert "Step 1" in skill.template
+
+
+# ---------------------------------------------------------------------------
+# Database operations
+# ---------------------------------------------------------------------------
+
+
+class TestDatabase:
+ def test_save_and_list(self, engine):
+ skill = DiscoveredSkill(
+ name="Git Workflow",
+ description="Automates git operations",
+ category="devops",
+ confidence=0.88,
+ )
+ engine._save_skill(skill)
+ skills = engine.list_skills()
+ assert len(skills) == 1
+ assert skills[0]["name"] == "Git Workflow"
+ assert skills[0]["category"] == "devops"
+
+ def test_list_by_status(self, engine):
+ s1 = DiscoveredSkill(name="Skill A", status="discovered")
+ s2 = DiscoveredSkill(name="Skill B", status="confirmed")
+ engine._save_skill(s1)
+ engine._save_skill(s2)
+
+ discovered = engine.list_skills(status="discovered")
+ assert len(discovered) == 1
+ assert discovered[0]["name"] == "Skill A"
+
+ confirmed = engine.list_skills(status="confirmed")
+ assert len(confirmed) == 1
+ assert confirmed[0]["name"] == "Skill B"
+
+ def test_get_skill(self, engine):
+ skill = DiscoveredSkill(name="Find Me")
+ engine._save_skill(skill)
+ found = engine.get_skill(skill.id)
+ assert found is not None
+ assert found["name"] == "Find Me"
+
+ def test_get_skill_not_found(self, engine):
+ assert engine.get_skill("nonexistent") is None
+
+ def test_update_status(self, engine):
+ skill = DiscoveredSkill(name="Status Test")
+ engine._save_skill(skill)
+ assert engine.update_status(skill.id, "confirmed")
+ found = engine.get_skill(skill.id)
+ assert found["status"] == "confirmed"
+
+ def test_update_invalid_status(self, engine):
+ skill = DiscoveredSkill(name="Invalid Status")
+ engine._save_skill(skill)
+ assert not engine.update_status(skill.id, "bogus")
+
+ def test_skill_count(self, engine):
+ engine._save_skill(DiscoveredSkill(name="A", status="discovered"))
+ engine._save_skill(DiscoveredSkill(name="B", status="discovered"))
+ engine._save_skill(DiscoveredSkill(name="C", status="confirmed"))
+ counts = engine.skill_count()
+ assert counts["discovered"] == 2
+ assert counts["confirmed"] == 1
+
+ def test_list_limit(self, engine):
+ for i in range(5):
+ engine._save_skill(DiscoveredSkill(name=f"Skill {i}"))
+ assert len(engine.list_skills(limit=3)) == 3
+
+
+# ---------------------------------------------------------------------------
+# Action clustering
+# ---------------------------------------------------------------------------
+
+
+class TestActionClustering:
+ def test_empty_entries(self, engine):
+ assert engine._cluster_action_sequences([]) == []
+
+ def test_single_sequence(self, engine):
+ now = datetime.now()
+ entries = [
+ {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()},
+ {
+ "type": "tool_call",
+ "tool": "write",
+ "timestamp": (now + timedelta(seconds=30)).isoformat(),
+ },
+ ]
+ sequences = engine._cluster_action_sequences(entries)
+ assert len(sequences) == 1
+ assert len(sequences[0]) == 2
+
+ def test_split_by_gap(self, engine):
+ now = datetime.now()
+ entries = [
+ {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()},
+ {
+ "type": "tool_call",
+ "tool": "write",
+ "timestamp": (now + timedelta(seconds=600)).isoformat(),
+ },
+ ]
+ sequences = engine._cluster_action_sequences(entries, max_gap_seconds=300)
+ assert len(sequences) == 2
+
+ def test_bad_timestamps(self, engine):
+ entries = [
+ {"type": "tool_call", "tool": "read", "timestamp": "not-a-date"},
+ {"type": "tool_call", "tool": "write", "timestamp": "also-bad"},
+ ]
+ sequences = engine._cluster_action_sequences(entries)
+ # Should still produce sequences (split on bad parse)
+ assert len(sequences) >= 1
+
+
+# ---------------------------------------------------------------------------
+# LLM response parsing
+# ---------------------------------------------------------------------------
+
+
+class TestLLMParsing:
+ def test_parse_valid_json(self, engine):
+ response = json.dumps(
+ {
+ "name": "API Search",
+ "description": "Searches APIs efficiently",
+ "category": "research",
+ "template": "1. Identify API\n2. Call endpoint",
+ "confidence": 0.85,
+ }
+ )
+ skill = engine._parse_llm_response(response, [])
+ assert skill is not None
+ assert skill.name == "API Search"
+ assert skill.confidence == 0.85
+ assert skill.category == "research"
+
+ def test_parse_with_markdown_fences(self, engine):
+ response = '```json\n{"name": "Fenced", "confidence": 0.9}\n```'
+ skill = engine._parse_llm_response(response, [])
+ assert skill is not None
+ assert skill.name == "Fenced"
+
+ def test_parse_invalid_json(self, engine):
+ assert engine._parse_llm_response("not json", []) is None
+
+ def test_parse_empty(self, engine):
+ assert engine._parse_llm_response("", []) is None
+
+
+# ---------------------------------------------------------------------------
+# Heuristic extraction
+# ---------------------------------------------------------------------------
+
+
+class TestHeuristicExtraction:
+ def test_extract_from_tool_calls(self, engine):
+ seq = [
+ {"type": "tool_call", "tool": "git_commit", "result": "ok"},
+ {"type": "tool_call", "tool": "git_push", "result": "ok"},
+ {"type": "tool_call", "tool": "git_commit", "result": "ok"},
+ ]
+ skill = engine._heuristic_extraction(seq)
+ assert skill is not None
+ assert "Git Commit" in skill.name
+ assert skill.confidence == 0.5
+
+ def test_extract_no_tool_calls(self, engine):
+ seq = [{"type": "message", "role": "user", "content": "hello"}]
+ assert engine._heuristic_extraction(seq) is None
+
+
+# ---------------------------------------------------------------------------
+# Deduplication
+# ---------------------------------------------------------------------------
+
+
+class TestDeduplication:
+ def test_not_duplicate(self, engine):
+ skill = DiscoveredSkill(name="Unique Skill")
+ assert not engine._is_duplicate(skill)
+
+ def test_is_duplicate(self, engine):
+ skill = DiscoveredSkill(name="Duplicate Check")
+ engine._save_skill(skill)
+ new_skill = DiscoveredSkill(name="Duplicate Check")
+ assert engine._is_duplicate(new_skill)
+
+ def test_rejected_not_duplicate(self, engine):
+ skill = DiscoveredSkill(name="Rejected Skill", status="rejected")
+ engine._save_skill(skill)
+ new_skill = DiscoveredSkill(name="Rejected Skill")
+ assert not engine._is_duplicate(new_skill)
+
+
+# ---------------------------------------------------------------------------
+# Format actions
+# ---------------------------------------------------------------------------
+
+
+class TestFormatActions:
+ def test_format_tool_call(self, engine):
+ seq = [{"type": "tool_call", "tool": "shell", "result": "output text"}]
+ text = engine._format_actions(seq)
+ assert "shell" in text
+ assert "output text" in text
+
+ def test_format_message(self, engine):
+ seq = [{"type": "message", "role": "timmy", "content": "I analyzed the code"}]
+ text = engine._format_actions(seq)
+ assert "I analyzed the code" in text
+
+ def test_format_decision(self, engine):
+ seq = [{"type": "decision", "decision": "Use async"}]
+ text = engine._format_actions(seq)
+ assert "Use async" in text
+
+
+# ---------------------------------------------------------------------------
+# Scan integration (mocked)
+# ---------------------------------------------------------------------------
+
+
+class TestScan:
+ @pytest.mark.asyncio
+ async def test_scan_too_few_actions(self, engine):
+ with patch.object(engine, "_load_recent_successful_actions", return_value=[]):
+ result = await engine.scan()
+ assert result == []
+
+ @pytest.mark.asyncio
+ async def test_scan_discovers_skill(self, engine):
+ now = datetime.now()
+ entries = [
+ {
+ "type": "tool_call",
+ "tool": "search",
+ "result": "found results",
+ "timestamp": now.isoformat(),
+ },
+ {
+ "type": "tool_call",
+ "tool": "analyze",
+ "result": "analysis complete",
+ "timestamp": (now + timedelta(seconds=10)).isoformat(),
+ },
+ {
+ "type": "tool_call",
+ "tool": "report",
+ "result": "report generated",
+ "timestamp": (now + timedelta(seconds=20)).isoformat(),
+ },
+ ]
+
+ llm_response = json.dumps(
+ {
+ "name": "Research Pipeline",
+ "description": "Search, analyze, and report",
+ "category": "research",
+ "template": "1. Search\n2. Analyze\n3. Report",
+ "confidence": 0.9,
+ }
+ )
+
+ with (
+ patch.object(engine, "_load_recent_successful_actions", return_value=entries),
+ patch(
+ "infrastructure.router.cascade.get_router",
+ return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})),
+ ),
+ patch.object(engine, "_notify", new_callable=AsyncMock),
+ patch.object(engine, "_write_skill_file"),
+ ):
+ result = await engine.scan()
+ assert len(result) == 1
+ assert result[0].name == "Research Pipeline"
+ assert result[0].confidence == 0.9
+
+ @pytest.mark.asyncio
+ async def test_scan_skips_low_confidence(self, engine):
+ now = datetime.now()
+ entries = [
+ {
+ "type": "tool_call",
+ "tool": "a",
+ "result": "ok",
+ "timestamp": now.isoformat(),
+ },
+ {
+ "type": "tool_call",
+ "tool": "b",
+ "result": "ok",
+ "timestamp": (now + timedelta(seconds=10)).isoformat(),
+ },
+ ]
+
+ llm_response = json.dumps(
+ {"name": "Low Conf", "confidence": 0.3, "category": "general", "template": "..."}
+ )
+
+ with (
+ patch.object(engine, "_load_recent_successful_actions", return_value=entries),
+ patch(
+ "infrastructure.router.cascade.get_router",
+ return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})),
+ ),
+ ):
+ result = await engine.scan()
+ assert result == []
+
+ @pytest.mark.asyncio
+ async def test_scan_falls_back_to_heuristic(self, engine):
+ engine.confidence_threshold = 0.4 # Lower for heuristic
+ now = datetime.now()
+ entries = [
+ {
+ "type": "tool_call",
+ "tool": "deploy",
+ "result": "ok",
+ "timestamp": now.isoformat(),
+ },
+ {
+ "type": "tool_call",
+ "tool": "deploy",
+ "result": "ok",
+ "timestamp": (now + timedelta(seconds=10)).isoformat(),
+ },
+ ]
+
+ with (
+ patch.object(engine, "_load_recent_successful_actions", return_value=entries),
+ patch(
+ "infrastructure.router.cascade.get_router",
+ return_value=MagicMock(
+ complete=AsyncMock(side_effect=Exception("LLM unavailable"))
+ ),
+ ),
+ patch.object(engine, "_notify", new_callable=AsyncMock),
+ patch.object(engine, "_write_skill_file"),
+ ):
+ result = await engine.scan()
+ assert len(result) == 1
+ assert "Deploy" in result[0].name
+ assert result[0].confidence == 0.5