From 92c677f029a709e5224dd3ed5776592b9e69e889 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Sun, 22 Mar 2026 19:56:35 -0400 Subject: [PATCH] feat: add automated skill discovery pipeline Implements a background process that monitors session logs for successful agent action sequences, uses the LLM router to extract reusable skill templates, and stores them in a SQLite database. Discovered skills are surfaced via dashboard notifications (push + WebSocket + event bus) and a new /skills page with HTMX polling. Users can confirm, reject, or archive discovered skills. - src/timmy/skill_discovery.py: Core engine with LLM analysis + heuristic fallback - src/dashboard/routes/skills.py: CRUD routes for skill management - src/dashboard/templates/skills.html: Main skills page - src/dashboard/templates/partials/skills_list.html: HTMX partial - Background scheduler in app.py runs every 10 minutes - 31 unit tests covering DB ops, clustering, parsing, dedup, and scan Fixes #1011 Co-Authored-By: Claude Opus 4.6 --- src/dashboard/app.py | 29 + src/dashboard/routes/skills.py | 82 +++ .../templates/partials/skills_list.html | 74 +++ src/dashboard/templates/skills.html | 38 ++ src/timmy/skill_discovery.py | 495 ++++++++++++++++++ tests/unit/test_skill_discovery.py | 410 +++++++++++++++ 6 files changed, 1128 insertions(+) create mode 100644 src/dashboard/routes/skills.py create mode 100644 src/dashboard/templates/partials/skills_list.html create mode 100644 src/dashboard/templates/skills.html create mode 100644 src/timmy/skill_discovery.py create mode 100644 tests/unit/test_skill_discovery.py diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 7e1ccba9..3450b8c9 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router from dashboard.routes.models import router as models_router from dashboard.routes.quests import router as quests_router from dashboard.routes.scorecards import router as scorecards_router +from dashboard.routes.skills import router as skills_router from dashboard.routes.spark import router as spark_router from dashboard.routes.system import router as system_router from dashboard.routes.tasks import router as tasks_router @@ -218,6 +219,32 @@ async def _loop_qa_scheduler() -> None: await asyncio.sleep(interval) +_SKILL_DISCOVERY_INTERVAL = 600 # 10 minutes + + +async def _skill_discovery_scheduler() -> None: + """Background task: scan session logs for reusable skill patterns.""" + await asyncio.sleep(20) # Stagger after other schedulers + + while True: + try: + from timmy.skill_discovery import get_skill_discovery_engine + + engine = get_skill_discovery_engine() + discovered = await engine.scan() + if discovered: + logger.info( + "Skill discovery: found %d new skill(s)", + len(discovered), + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Skill discovery scheduler error: %s", exc) + + await asyncio.sleep(_SKILL_DISCOVERY_INTERVAL) + + _PRESENCE_POLL_SECONDS = 30 _PRESENCE_INITIAL_DELAY = 3 @@ -380,6 +407,7 @@ def _startup_background_tasks() -> list[asyncio.Task]: asyncio.create_task(_loop_qa_scheduler()), asyncio.create_task(_presence_watcher()), asyncio.create_task(_start_chat_integrations_background()), + asyncio.create_task(_skill_discovery_scheduler()), ] @@ -631,6 +659,7 @@ app.include_router(tower_router) app.include_router(daily_run_router) app.include_router(quests_router) app.include_router(scorecards_router) +app.include_router(skills_router) @app.websocket("/ws") diff --git a/src/dashboard/routes/skills.py b/src/dashboard/routes/skills.py new file mode 100644 index 00000000..2dae470e --- /dev/null +++ b/src/dashboard/routes/skills.py @@ -0,0 +1,82 @@ +"""Skill Discovery routes — view and manage auto-discovered skills.""" + +import logging + +from fastapi import APIRouter, Form, HTTPException, Request +from fastapi.responses import HTMLResponse + +from dashboard.templating import templates + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/skills", tags=["skills"]) + + +@router.get("", response_class=HTMLResponse) +async def skills_page(request: Request): + """Main skill discovery page.""" + from timmy.skill_discovery import get_skill_discovery_engine + + engine = get_skill_discovery_engine() + skills = engine.list_skills(limit=50) + counts = engine.skill_count() + return templates.TemplateResponse( + request, + "skills.html", + {"skills": skills, "counts": counts}, + ) + + +@router.get("/list", response_class=HTMLResponse) +async def skills_list_partial(request: Request, status: str = ""): + """HTMX partial: return skill list for polling.""" + from timmy.skill_discovery import get_skill_discovery_engine + + engine = get_skill_discovery_engine() + skills = engine.list_skills(status=status or None, limit=50) + counts = engine.skill_count() + return templates.TemplateResponse( + request, + "partials/skills_list.html", + {"skills": skills, "counts": counts}, + ) + + +@router.post("/{skill_id}/status", response_class=HTMLResponse) +async def update_skill_status(request: Request, skill_id: str, status: str = Form(...)): + """Update a skill's status (confirm / reject / archive).""" + from timmy.skill_discovery import get_skill_discovery_engine + + engine = get_skill_discovery_engine() + if not engine.update_status(skill_id, status): + raise HTTPException(status_code=400, detail=f"Invalid status: {status}") + + skills = engine.list_skills(limit=50) + counts = engine.skill_count() + return templates.TemplateResponse( + request, + "partials/skills_list.html", + {"skills": skills, "counts": counts}, + ) + + +@router.post("/scan", response_class=HTMLResponse) +async def trigger_scan(request: Request): + """Manually trigger a skill discovery scan.""" + from timmy.skill_discovery import get_skill_discovery_engine + + engine = get_skill_discovery_engine() + try: + discovered = await engine.scan() + msg = f"Scan complete: {len(discovered)} new skill(s) found." + except Exception as exc: + logger.warning("Manual skill scan failed: %s", exc) + msg = f"Scan failed: {exc}" + + skills = engine.list_skills(limit=50) + counts = engine.skill_count() + return templates.TemplateResponse( + request, + "partials/skills_list.html", + {"skills": skills, "counts": counts, "scan_message": msg}, + ) diff --git a/src/dashboard/templates/partials/skills_list.html b/src/dashboard/templates/partials/skills_list.html new file mode 100644 index 00000000..adffe6d4 --- /dev/null +++ b/src/dashboard/templates/partials/skills_list.html @@ -0,0 +1,74 @@ +{% if scan_message is defined and scan_message %} +
+ {{ scan_message }} +
+{% endif %} + +{% if skills %} +
+ + + + + + + + + + + + + {% for skill in skills %} + + + + + + + + + {% endfor %} + +
NameCategoryConfidenceStatusDiscoveredActions
+ {{ skill.name }} + {% if skill.description %} +
{{ skill.description[:100] }} + {% endif %} +
{{ skill.category }} + {% set conf = skill.confidence * 100 %} + + {{ "%.0f"|format(conf) }}% + + + {% if skill.status == 'confirmed' %} + confirmed + {% elif skill.status == 'rejected' %} + rejected + {% elif skill.status == 'archived' %} + archived + {% else %} + discovered + {% endif %} + {{ skill.created_at[:10] if skill.created_at else '' }} + {% if skill.status == 'discovered' %} +
+ + +
+
+ + +
+ {% elif skill.status == 'confirmed' %} +
+ + +
+ {% endif %} +
+
+{% else %} +
+ No skills discovered yet. Click "Scan Now" to analyze recent activity. +
+{% endif %} diff --git a/src/dashboard/templates/skills.html b/src/dashboard/templates/skills.html new file mode 100644 index 00000000..e3563234 --- /dev/null +++ b/src/dashboard/templates/skills.html @@ -0,0 +1,38 @@ +{% extends "base.html" %} + +{% block title %}Skill Discovery - Timmy Time{% endblock %} + +{% block extra_styles %}{% endblock %} + +{% block content %} +
+ + {% from "macros.html" import panel %} + + {% call panel("SKILL DISCOVERY", id="skills-panel") %} +
+
+ + Discovered: {{ counts.get('discovered', 0) }} | + Confirmed: {{ counts.get('confirmed', 0) }} | + Archived: {{ counts.get('archived', 0) }} + +
+ +
+ +
+ {% include "partials/skills_list.html" %} +
+ {% endcall %} + +
+{% endblock %} diff --git a/src/timmy/skill_discovery.py b/src/timmy/skill_discovery.py new file mode 100644 index 00000000..76d0f320 --- /dev/null +++ b/src/timmy/skill_discovery.py @@ -0,0 +1,495 @@ +"""Automated Skill Discovery Pipeline. + +Monitors the agent's session logs for high-confidence successful outcomes, +uses the LLM router to deconstruct successful action sequences into +reusable skill templates, and stores discovered skills with metadata. + +Notifies the dashboard when new skills are crystallized. +""" + +import json +import logging +import sqlite3 +import uuid +from collections.abc import Generator +from contextlib import closing, contextmanager +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Database +# --------------------------------------------------------------------------- + +DB_PATH = Path(settings.repo_root) / "data" / "skills.db" + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS discovered_skills ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT DEFAULT '', + category TEXT DEFAULT 'general', + source_entries TEXT DEFAULT '[]', + template TEXT DEFAULT '', + confidence REAL DEFAULT 0.0, + status TEXT DEFAULT 'discovered', + created_at TEXT DEFAULT (datetime('now')), + updated_at TEXT DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_skills_status ON discovered_skills(status); +CREATE INDEX IF NOT EXISTS idx_skills_category ON discovered_skills(category); +CREATE INDEX IF NOT EXISTS idx_skills_created ON discovered_skills(created_at); +""" + +VALID_STATUSES = {"discovered", "confirmed", "rejected", "archived"} + + +@contextmanager +def _get_db() -> Generator[sqlite3.Connection, None, None]: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(DB_PATH))) as conn: + conn.row_factory = sqlite3.Row + conn.execute(f"PRAGMA busy_timeout = {settings.db_busy_timeout_ms}") + conn.executescript(_SCHEMA) + yield conn + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + + +@dataclass +class DiscoveredSkill: + """A skill extracted from successful agent actions.""" + + id: str = field(default_factory=lambda: f"skill_{uuid.uuid4().hex[:12]}") + name: str = "" + description: str = "" + category: str = "general" + source_entries: list[dict] = field(default_factory=list) + template: str = "" + confidence: float = 0.0 + status: str = "discovered" + created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + updated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "name": self.name, + "description": self.description, + "category": self.category, + "source_entries": self.source_entries, + "template": self.template, + "confidence": self.confidence, + "status": self.status, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + +# --------------------------------------------------------------------------- +# Prompt template for LLM analysis +# --------------------------------------------------------------------------- + +_ANALYSIS_PROMPT = """\ +You are a skill extraction engine. Analyze the following sequence of \ +successful agent actions and extract a reusable skill template. + +Actions: +{actions} + +Respond with a JSON object containing: +- "name": short skill name (2-5 words) +- "description": one-sentence description of what this skill does +- "category": one of "research", "coding", "devops", "communication", "analysis", "general" +- "template": a step-by-step template that generalizes this action sequence +- "confidence": your confidence that this is a genuinely reusable skill (0.0-1.0) + +Respond ONLY with valid JSON, no markdown fences or extra text.""" + + +# --------------------------------------------------------------------------- +# Core engine +# --------------------------------------------------------------------------- + + +class SkillDiscoveryEngine: + """Scans session logs for successful action patterns and extracts skills.""" + + def __init__( + self, + confidence_threshold: float = 0.7, + min_actions: int = 2, + ): + self.confidence_threshold = confidence_threshold + self.min_actions = min_actions + + # -- Public API --------------------------------------------------------- + + async def scan(self) -> list[DiscoveredSkill]: + """Scan recent session logs and discover new skills. + + Returns a list of newly discovered skills. + """ + entries = self._load_recent_successful_actions() + if len(entries) < self.min_actions: + logger.debug( + "Skill discovery: only %d actions found (need %d), skipping", + len(entries), + self.min_actions, + ) + return [] + + # Group entries into action sequences (tool calls clustered together) + sequences = self._cluster_action_sequences(entries) + discovered: list[DiscoveredSkill] = [] + + for seq in sequences: + if len(seq) < self.min_actions: + continue + + skill = await self._analyze_sequence(seq) + if skill and skill.confidence >= self.confidence_threshold: + # Check for duplicates + if not self._is_duplicate(skill): + self._save_skill(skill) + await self._notify(skill) + discovered.append(skill) + logger.info( + "Discovered skill: %s (confidence=%.2f)", + skill.name, + skill.confidence, + ) + + return discovered + + def list_skills( + self, + status: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Return discovered skills from the database.""" + with _get_db() as conn: + if status and status in VALID_STATUSES: + rows = conn.execute( + "SELECT * FROM discovered_skills WHERE status = ? " + "ORDER BY created_at DESC LIMIT ?", + (status, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM discovered_skills ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + def get_skill(self, skill_id: str) -> dict[str, Any] | None: + """Get a single skill by ID.""" + with _get_db() as conn: + row = conn.execute( + "SELECT * FROM discovered_skills WHERE id = ?", + (skill_id,), + ).fetchone() + return dict(row) if row else None + + def update_status(self, skill_id: str, new_status: str) -> bool: + """Update a skill's status (confirm, reject, archive).""" + if new_status not in VALID_STATUSES: + return False + with _get_db() as conn: + conn.execute( + "UPDATE discovered_skills SET status = ?, updated_at = ? WHERE id = ?", + (new_status, datetime.now(UTC).isoformat(), skill_id), + ) + conn.commit() + return True + + def skill_count(self) -> dict[str, int]: + """Return counts of skills by status.""" + with _get_db() as conn: + rows = conn.execute( + "SELECT status, COUNT(*) as cnt FROM discovered_skills GROUP BY status" + ).fetchall() + return {r["status"]: r["cnt"] for r in rows} + + # -- Internal ----------------------------------------------------------- + + def _load_recent_successful_actions(self, limit: int = 100) -> list[dict]: + """Load recent successful tool calls from session logs.""" + try: + from timmy.session_logger import get_session_logger + + sl = get_session_logger() + entries = sl.get_recent_entries(limit=limit) + # Filter for successful tool calls and high-confidence messages + return [ + e + for e in entries + if (e.get("type") == "tool_call") + or ( + e.get("type") == "message" + and e.get("role") == "timmy" + and (e.get("confidence") or 0) >= 0.7 + ) + ] + except Exception as exc: + logger.warning("Failed to load session entries: %s", exc) + return [] + + def _cluster_action_sequences( + self, + entries: list[dict], + max_gap_seconds: int = 300, + ) -> list[list[dict]]: + """Group entries into sequences based on temporal proximity.""" + if not entries: + return [] + + from datetime import datetime as dt + + sequences: list[list[dict]] = [] + current_seq: list[dict] = [entries[0]] + + for entry in entries[1:]: + try: + prev_ts = dt.fromisoformat(current_seq[-1].get("timestamp", "")) + curr_ts = dt.fromisoformat(entry.get("timestamp", "")) + gap = abs((curr_ts - prev_ts).total_seconds()) + except (ValueError, TypeError): + gap = max_gap_seconds + 1 + + if gap <= max_gap_seconds: + current_seq.append(entry) + else: + if current_seq: + sequences.append(current_seq) + current_seq = [entry] + + if current_seq: + sequences.append(current_seq) + + return sequences + + async def _analyze_sequence(self, sequence: list[dict]) -> DiscoveredSkill | None: + """Use the LLM router to analyze an action sequence.""" + actions_text = self._format_actions(sequence) + prompt = _ANALYSIS_PROMPT.format(actions=actions_text) + + try: + from infrastructure.router.cascade import get_router + + router = get_router() + response = await router.complete( + messages=[ + { + "role": "system", + "content": "You extract reusable skills from agent actions.", + }, + {"role": "user", "content": prompt}, + ], + ) + content = response.get("content", "") + return self._parse_llm_response(content, sequence) + except Exception as exc: + logger.warning("LLM analysis failed, using heuristic: %s", exc) + return self._heuristic_extraction(sequence) + + def _format_actions(self, sequence: list[dict]) -> str: + """Format action sequence for the LLM prompt.""" + lines = [] + for i, entry in enumerate(sequence, 1): + etype = entry.get("type", "unknown") + if etype == "tool_call": + tool = entry.get("tool", "unknown") + result = (entry.get("result") or "")[:200] + lines.append(f"{i}. Tool: {tool} → {result}") + elif etype == "message": + content = (entry.get("content") or "")[:200] + lines.append(f"{i}. Response: {content}") + elif etype == "decision": + decision = (entry.get("decision") or "")[:200] + lines.append(f"{i}. Decision: {decision}") + return "\n".join(lines) + + def _parse_llm_response( + self, + content: str, + source_entries: list[dict], + ) -> DiscoveredSkill | None: + """Parse LLM JSON response into a DiscoveredSkill.""" + try: + # Strip markdown fences if present + cleaned = content.strip() + if cleaned.startswith("```"): + cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned[3:] + if cleaned.endswith("```"): + cleaned = cleaned[:-3] + cleaned = cleaned.strip() + + data = json.loads(cleaned) + return DiscoveredSkill( + name=data.get("name", "Unnamed Skill"), + description=data.get("description", ""), + category=data.get("category", "general"), + template=data.get("template", ""), + confidence=float(data.get("confidence", 0.0)), + source_entries=source_entries[:5], # Keep first 5 for reference + ) + except (json.JSONDecodeError, ValueError, TypeError) as exc: + logger.debug("Failed to parse LLM response: %s", exc) + return None + + def _heuristic_extraction(self, sequence: list[dict]) -> DiscoveredSkill | None: + """Fallback: extract skill from action patterns without LLM.""" + tool_calls = [e for e in sequence if e.get("type") == "tool_call"] + if not tool_calls: + return None + + # Name from the dominant tool + tool_names = [e.get("tool", "unknown") for e in tool_calls] + dominant_tool = max(set(tool_names), key=tool_names.count) + + # Simple template from the tool sequence + steps = [] + for i, tc in enumerate(tool_calls[:10], 1): + steps.append(f"Step {i}: Use {tc.get('tool', 'unknown')}") + + return DiscoveredSkill( + name=f"{dominant_tool.replace('_', ' ').title()} Pattern", + description=f"Automated pattern using {dominant_tool} ({len(tool_calls)} steps)", + category="general", + template="\n".join(steps), + confidence=0.5, # Lower confidence for heuristic + source_entries=sequence[:5], + ) + + def _is_duplicate(self, skill: DiscoveredSkill) -> bool: + """Check if a similar skill already exists.""" + with _get_db() as conn: + rows = conn.execute( + "SELECT name FROM discovered_skills WHERE name = ? AND status != 'rejected'", + (skill.name,), + ).fetchall() + return len(rows) > 0 + + def _save_skill(self, skill: DiscoveredSkill) -> None: + """Persist a discovered skill to the database.""" + with _get_db() as conn: + conn.execute( + """INSERT INTO discovered_skills + (id, name, description, category, source_entries, + template, confidence, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + skill.id, + skill.name, + skill.description, + skill.category, + json.dumps(skill.source_entries), + skill.template, + skill.confidence, + skill.status, + skill.created_at, + skill.updated_at, + ), + ) + conn.commit() + + def _write_skill_file(self, skill: DiscoveredSkill) -> Path: + """Write a skill template to the skills/ directory.""" + skills_dir = Path(settings.repo_root) / "skills" / "discovered" + skills_dir.mkdir(parents=True, exist_ok=True) + + filename = skill.name.lower().replace(" ", "_") + ".md" + filepath = skills_dir / filename + + content = f"""# {skill.name} + +**Category:** {skill.category} +**Confidence:** {skill.confidence:.0%} +**Discovered:** {skill.created_at[:10]} +**Status:** {skill.status} + +## Description + +{skill.description} + +## Template + +{skill.template} +""" + filepath.write_text(content) + logger.info("Wrote skill file: %s", filepath) + return filepath + + async def _notify(self, skill: DiscoveredSkill) -> None: + """Notify the dashboard about a newly discovered skill.""" + # Push notification + try: + from infrastructure.notifications.push import notifier + + notifier.notify( + title="Skill Discovered", + message=f"{skill.name} (confidence: {skill.confidence:.0%})", + category="system", + ) + except Exception as exc: + logger.debug("Push notification failed: %s", exc) + + # WebSocket broadcast + try: + from infrastructure.ws_manager.handler import ws_manager + + await ws_manager.broadcast( + "skill_discovered", + { + "id": skill.id, + "name": skill.name, + "confidence": skill.confidence, + "category": skill.category, + }, + ) + except Exception as exc: + logger.debug("WebSocket broadcast failed: %s", exc) + + # Event bus + try: + from infrastructure.events.bus import Event, get_event_bus + + await get_event_bus().publish( + Event( + type="skill.discovered", + source="skill_discovery", + data=skill.to_dict(), + ) + ) + except Exception as exc: + logger.debug("Event bus publish failed: %s", exc) + + # Write skill file to skills/ directory + try: + self._write_skill_file(skill) + except Exception as exc: + logger.debug("Skill file write failed: %s", exc) + + +# --------------------------------------------------------------------------- +# Singleton +# --------------------------------------------------------------------------- + +_engine: SkillDiscoveryEngine | None = None + + +def get_skill_discovery_engine() -> SkillDiscoveryEngine: + """Get or create the global skill discovery engine.""" + global _engine + if _engine is None: + _engine = SkillDiscoveryEngine() + return _engine diff --git a/tests/unit/test_skill_discovery.py b/tests/unit/test_skill_discovery.py new file mode 100644 index 00000000..aa4b0141 --- /dev/null +++ b/tests/unit/test_skill_discovery.py @@ -0,0 +1,410 @@ +"""Unit tests for the skill discovery pipeline. + +Tests the discovery engine's core logic: action clustering, skill extraction, +database persistence, deduplication, and status management. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from timmy.skill_discovery import ( + DiscoveredSkill, + SkillDiscoveryEngine, +) + + +@pytest.fixture +def engine(): + """Create a fresh SkillDiscoveryEngine for each test.""" + return SkillDiscoveryEngine(confidence_threshold=0.7, min_actions=2) + + +@pytest.fixture(autouse=True) +def temp_db(tmp_path, monkeypatch): + """Use a temporary database for each test.""" + db_path = tmp_path / "skills.db" + monkeypatch.setattr("timmy.skill_discovery.DB_PATH", db_path) + return db_path + + +# --------------------------------------------------------------------------- +# DiscoveredSkill dataclass +# --------------------------------------------------------------------------- + + +class TestDiscoveredSkill: + def test_defaults(self): + skill = DiscoveredSkill() + assert skill.name == "" + assert skill.status == "discovered" + assert skill.confidence == 0.0 + assert skill.id.startswith("skill_") + + def test_to_dict(self): + skill = DiscoveredSkill(name="Test Skill", confidence=0.85) + d = skill.to_dict() + assert d["name"] == "Test Skill" + assert d["confidence"] == 0.85 + assert "id" in d + assert "created_at" in d + + def test_custom_fields(self): + skill = DiscoveredSkill( + name="Code Review", + category="coding", + confidence=0.92, + template="Step 1: Read code\nStep 2: Analyze", + ) + assert skill.category == "coding" + assert "Step 1" in skill.template + + +# --------------------------------------------------------------------------- +# Database operations +# --------------------------------------------------------------------------- + + +class TestDatabase: + def test_save_and_list(self, engine): + skill = DiscoveredSkill( + name="Git Workflow", + description="Automates git operations", + category="devops", + confidence=0.88, + ) + engine._save_skill(skill) + skills = engine.list_skills() + assert len(skills) == 1 + assert skills[0]["name"] == "Git Workflow" + assert skills[0]["category"] == "devops" + + def test_list_by_status(self, engine): + s1 = DiscoveredSkill(name="Skill A", status="discovered") + s2 = DiscoveredSkill(name="Skill B", status="confirmed") + engine._save_skill(s1) + engine._save_skill(s2) + + discovered = engine.list_skills(status="discovered") + assert len(discovered) == 1 + assert discovered[0]["name"] == "Skill A" + + confirmed = engine.list_skills(status="confirmed") + assert len(confirmed) == 1 + assert confirmed[0]["name"] == "Skill B" + + def test_get_skill(self, engine): + skill = DiscoveredSkill(name="Find Me") + engine._save_skill(skill) + found = engine.get_skill(skill.id) + assert found is not None + assert found["name"] == "Find Me" + + def test_get_skill_not_found(self, engine): + assert engine.get_skill("nonexistent") is None + + def test_update_status(self, engine): + skill = DiscoveredSkill(name="Status Test") + engine._save_skill(skill) + assert engine.update_status(skill.id, "confirmed") + found = engine.get_skill(skill.id) + assert found["status"] == "confirmed" + + def test_update_invalid_status(self, engine): + skill = DiscoveredSkill(name="Invalid Status") + engine._save_skill(skill) + assert not engine.update_status(skill.id, "bogus") + + def test_skill_count(self, engine): + engine._save_skill(DiscoveredSkill(name="A", status="discovered")) + engine._save_skill(DiscoveredSkill(name="B", status="discovered")) + engine._save_skill(DiscoveredSkill(name="C", status="confirmed")) + counts = engine.skill_count() + assert counts["discovered"] == 2 + assert counts["confirmed"] == 1 + + def test_list_limit(self, engine): + for i in range(5): + engine._save_skill(DiscoveredSkill(name=f"Skill {i}")) + assert len(engine.list_skills(limit=3)) == 3 + + +# --------------------------------------------------------------------------- +# Action clustering +# --------------------------------------------------------------------------- + + +class TestActionClustering: + def test_empty_entries(self, engine): + assert engine._cluster_action_sequences([]) == [] + + def test_single_sequence(self, engine): + now = datetime.now() + entries = [ + {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()}, + { + "type": "tool_call", + "tool": "write", + "timestamp": (now + timedelta(seconds=30)).isoformat(), + }, + ] + sequences = engine._cluster_action_sequences(entries) + assert len(sequences) == 1 + assert len(sequences[0]) == 2 + + def test_split_by_gap(self, engine): + now = datetime.now() + entries = [ + {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()}, + { + "type": "tool_call", + "tool": "write", + "timestamp": (now + timedelta(seconds=600)).isoformat(), + }, + ] + sequences = engine._cluster_action_sequences(entries, max_gap_seconds=300) + assert len(sequences) == 2 + + def test_bad_timestamps(self, engine): + entries = [ + {"type": "tool_call", "tool": "read", "timestamp": "not-a-date"}, + {"type": "tool_call", "tool": "write", "timestamp": "also-bad"}, + ] + sequences = engine._cluster_action_sequences(entries) + # Should still produce sequences (split on bad parse) + assert len(sequences) >= 1 + + +# --------------------------------------------------------------------------- +# LLM response parsing +# --------------------------------------------------------------------------- + + +class TestLLMParsing: + def test_parse_valid_json(self, engine): + response = json.dumps( + { + "name": "API Search", + "description": "Searches APIs efficiently", + "category": "research", + "template": "1. Identify API\n2. Call endpoint", + "confidence": 0.85, + } + ) + skill = engine._parse_llm_response(response, []) + assert skill is not None + assert skill.name == "API Search" + assert skill.confidence == 0.85 + assert skill.category == "research" + + def test_parse_with_markdown_fences(self, engine): + response = '```json\n{"name": "Fenced", "confidence": 0.9}\n```' + skill = engine._parse_llm_response(response, []) + assert skill is not None + assert skill.name == "Fenced" + + def test_parse_invalid_json(self, engine): + assert engine._parse_llm_response("not json", []) is None + + def test_parse_empty(self, engine): + assert engine._parse_llm_response("", []) is None + + +# --------------------------------------------------------------------------- +# Heuristic extraction +# --------------------------------------------------------------------------- + + +class TestHeuristicExtraction: + def test_extract_from_tool_calls(self, engine): + seq = [ + {"type": "tool_call", "tool": "git_commit", "result": "ok"}, + {"type": "tool_call", "tool": "git_push", "result": "ok"}, + {"type": "tool_call", "tool": "git_commit", "result": "ok"}, + ] + skill = engine._heuristic_extraction(seq) + assert skill is not None + assert "Git Commit" in skill.name + assert skill.confidence == 0.5 + + def test_extract_no_tool_calls(self, engine): + seq = [{"type": "message", "role": "user", "content": "hello"}] + assert engine._heuristic_extraction(seq) is None + + +# --------------------------------------------------------------------------- +# Deduplication +# --------------------------------------------------------------------------- + + +class TestDeduplication: + def test_not_duplicate(self, engine): + skill = DiscoveredSkill(name="Unique Skill") + assert not engine._is_duplicate(skill) + + def test_is_duplicate(self, engine): + skill = DiscoveredSkill(name="Duplicate Check") + engine._save_skill(skill) + new_skill = DiscoveredSkill(name="Duplicate Check") + assert engine._is_duplicate(new_skill) + + def test_rejected_not_duplicate(self, engine): + skill = DiscoveredSkill(name="Rejected Skill", status="rejected") + engine._save_skill(skill) + new_skill = DiscoveredSkill(name="Rejected Skill") + assert not engine._is_duplicate(new_skill) + + +# --------------------------------------------------------------------------- +# Format actions +# --------------------------------------------------------------------------- + + +class TestFormatActions: + def test_format_tool_call(self, engine): + seq = [{"type": "tool_call", "tool": "shell", "result": "output text"}] + text = engine._format_actions(seq) + assert "shell" in text + assert "output text" in text + + def test_format_message(self, engine): + seq = [{"type": "message", "role": "timmy", "content": "I analyzed the code"}] + text = engine._format_actions(seq) + assert "I analyzed the code" in text + + def test_format_decision(self, engine): + seq = [{"type": "decision", "decision": "Use async"}] + text = engine._format_actions(seq) + assert "Use async" in text + + +# --------------------------------------------------------------------------- +# Scan integration (mocked) +# --------------------------------------------------------------------------- + + +class TestScan: + @pytest.mark.asyncio + async def test_scan_too_few_actions(self, engine): + with patch.object(engine, "_load_recent_successful_actions", return_value=[]): + result = await engine.scan() + assert result == [] + + @pytest.mark.asyncio + async def test_scan_discovers_skill(self, engine): + now = datetime.now() + entries = [ + { + "type": "tool_call", + "tool": "search", + "result": "found results", + "timestamp": now.isoformat(), + }, + { + "type": "tool_call", + "tool": "analyze", + "result": "analysis complete", + "timestamp": (now + timedelta(seconds=10)).isoformat(), + }, + { + "type": "tool_call", + "tool": "report", + "result": "report generated", + "timestamp": (now + timedelta(seconds=20)).isoformat(), + }, + ] + + llm_response = json.dumps( + { + "name": "Research Pipeline", + "description": "Search, analyze, and report", + "category": "research", + "template": "1. Search\n2. Analyze\n3. Report", + "confidence": 0.9, + } + ) + + with ( + patch.object(engine, "_load_recent_successful_actions", return_value=entries), + patch( + "infrastructure.router.cascade.get_router", + return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})), + ), + patch.object(engine, "_notify", new_callable=AsyncMock), + patch.object(engine, "_write_skill_file"), + ): + result = await engine.scan() + assert len(result) == 1 + assert result[0].name == "Research Pipeline" + assert result[0].confidence == 0.9 + + @pytest.mark.asyncio + async def test_scan_skips_low_confidence(self, engine): + now = datetime.now() + entries = [ + { + "type": "tool_call", + "tool": "a", + "result": "ok", + "timestamp": now.isoformat(), + }, + { + "type": "tool_call", + "tool": "b", + "result": "ok", + "timestamp": (now + timedelta(seconds=10)).isoformat(), + }, + ] + + llm_response = json.dumps( + {"name": "Low Conf", "confidence": 0.3, "category": "general", "template": "..."} + ) + + with ( + patch.object(engine, "_load_recent_successful_actions", return_value=entries), + patch( + "infrastructure.router.cascade.get_router", + return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})), + ), + ): + result = await engine.scan() + assert result == [] + + @pytest.mark.asyncio + async def test_scan_falls_back_to_heuristic(self, engine): + engine.confidence_threshold = 0.4 # Lower for heuristic + now = datetime.now() + entries = [ + { + "type": "tool_call", + "tool": "deploy", + "result": "ok", + "timestamp": now.isoformat(), + }, + { + "type": "tool_call", + "tool": "deploy", + "result": "ok", + "timestamp": (now + timedelta(seconds=10)).isoformat(), + }, + ] + + with ( + patch.object(engine, "_load_recent_successful_actions", return_value=entries), + patch( + "infrastructure.router.cascade.get_router", + return_value=MagicMock( + complete=AsyncMock(side_effect=Exception("LLM unavailable")) + ), + ), + patch.object(engine, "_notify", new_callable=AsyncMock), + patch.object(engine, "_write_skill_file"), + ): + result = await engine.scan() + assert len(result) == 1 + assert "Deploy" in result[0].name + assert result[0].confidence == 0.5 -- 2.43.0