"""Unit tests for the skill discovery pipeline. Tests the discovery engine's core logic: action clustering, skill extraction, database persistence, deduplication, and status management. """ from __future__ import annotations import json from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch import pytest from timmy.skill_discovery import ( DiscoveredSkill, SkillDiscoveryEngine, ) @pytest.fixture def engine(): """Create a fresh SkillDiscoveryEngine for each test.""" return SkillDiscoveryEngine(confidence_threshold=0.7, min_actions=2) @pytest.fixture(autouse=True) def temp_db(tmp_path, monkeypatch): """Use a temporary database for each test.""" db_path = tmp_path / "skills.db" monkeypatch.setattr("timmy.skill_discovery.DB_PATH", db_path) return db_path # --------------------------------------------------------------------------- # DiscoveredSkill dataclass # --------------------------------------------------------------------------- class TestDiscoveredSkill: def test_defaults(self): skill = DiscoveredSkill() assert skill.name == "" assert skill.status == "discovered" assert skill.confidence == 0.0 assert skill.id.startswith("skill_") def test_to_dict(self): skill = DiscoveredSkill(name="Test Skill", confidence=0.85) d = skill.to_dict() assert d["name"] == "Test Skill" assert d["confidence"] == 0.85 assert "id" in d assert "created_at" in d def test_custom_fields(self): skill = DiscoveredSkill( name="Code Review", category="coding", confidence=0.92, template="Step 1: Read code\nStep 2: Analyze", ) assert skill.category == "coding" assert "Step 1" in skill.template # --------------------------------------------------------------------------- # Database operations # --------------------------------------------------------------------------- class TestDatabase: def test_save_and_list(self, engine): skill = DiscoveredSkill( name="Git Workflow", description="Automates git operations", category="devops", confidence=0.88, ) engine._save_skill(skill) skills = engine.list_skills() assert len(skills) == 1 assert skills[0]["name"] == "Git Workflow" assert skills[0]["category"] == "devops" def test_list_by_status(self, engine): s1 = DiscoveredSkill(name="Skill A", status="discovered") s2 = DiscoveredSkill(name="Skill B", status="confirmed") engine._save_skill(s1) engine._save_skill(s2) discovered = engine.list_skills(status="discovered") assert len(discovered) == 1 assert discovered[0]["name"] == "Skill A" confirmed = engine.list_skills(status="confirmed") assert len(confirmed) == 1 assert confirmed[0]["name"] == "Skill B" def test_get_skill(self, engine): skill = DiscoveredSkill(name="Find Me") engine._save_skill(skill) found = engine.get_skill(skill.id) assert found is not None assert found["name"] == "Find Me" def test_get_skill_not_found(self, engine): assert engine.get_skill("nonexistent") is None def test_update_status(self, engine): skill = DiscoveredSkill(name="Status Test") engine._save_skill(skill) assert engine.update_status(skill.id, "confirmed") found = engine.get_skill(skill.id) assert found["status"] == "confirmed" def test_update_invalid_status(self, engine): skill = DiscoveredSkill(name="Invalid Status") engine._save_skill(skill) assert not engine.update_status(skill.id, "bogus") def test_skill_count(self, engine): engine._save_skill(DiscoveredSkill(name="A", status="discovered")) engine._save_skill(DiscoveredSkill(name="B", status="discovered")) engine._save_skill(DiscoveredSkill(name="C", status="confirmed")) counts = engine.skill_count() assert counts["discovered"] == 2 assert counts["confirmed"] == 1 def test_list_limit(self, engine): for i in range(5): engine._save_skill(DiscoveredSkill(name=f"Skill {i}")) assert len(engine.list_skills(limit=3)) == 3 # --------------------------------------------------------------------------- # Action clustering # --------------------------------------------------------------------------- class TestActionClustering: def test_empty_entries(self, engine): assert engine._cluster_action_sequences([]) == [] def test_single_sequence(self, engine): now = datetime.now() entries = [ {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()}, { "type": "tool_call", "tool": "write", "timestamp": (now + timedelta(seconds=30)).isoformat(), }, ] sequences = engine._cluster_action_sequences(entries) assert len(sequences) == 1 assert len(sequences[0]) == 2 def test_split_by_gap(self, engine): now = datetime.now() entries = [ {"type": "tool_call", "tool": "read", "timestamp": now.isoformat()}, { "type": "tool_call", "tool": "write", "timestamp": (now + timedelta(seconds=600)).isoformat(), }, ] sequences = engine._cluster_action_sequences(entries, max_gap_seconds=300) assert len(sequences) == 2 def test_bad_timestamps(self, engine): entries = [ {"type": "tool_call", "tool": "read", "timestamp": "not-a-date"}, {"type": "tool_call", "tool": "write", "timestamp": "also-bad"}, ] sequences = engine._cluster_action_sequences(entries) # Should still produce sequences (split on bad parse) assert len(sequences) >= 1 # --------------------------------------------------------------------------- # LLM response parsing # --------------------------------------------------------------------------- class TestLLMParsing: def test_parse_valid_json(self, engine): response = json.dumps( { "name": "API Search", "description": "Searches APIs efficiently", "category": "research", "template": "1. Identify API\n2. Call endpoint", "confidence": 0.85, } ) skill = engine._parse_llm_response(response, []) assert skill is not None assert skill.name == "API Search" assert skill.confidence == 0.85 assert skill.category == "research" def test_parse_with_markdown_fences(self, engine): response = '```json\n{"name": "Fenced", "confidence": 0.9}\n```' skill = engine._parse_llm_response(response, []) assert skill is not None assert skill.name == "Fenced" def test_parse_invalid_json(self, engine): assert engine._parse_llm_response("not json", []) is None def test_parse_empty(self, engine): assert engine._parse_llm_response("", []) is None # --------------------------------------------------------------------------- # Heuristic extraction # --------------------------------------------------------------------------- class TestHeuristicExtraction: def test_extract_from_tool_calls(self, engine): seq = [ {"type": "tool_call", "tool": "git_commit", "result": "ok"}, {"type": "tool_call", "tool": "git_push", "result": "ok"}, {"type": "tool_call", "tool": "git_commit", "result": "ok"}, ] skill = engine._heuristic_extraction(seq) assert skill is not None assert "Git Commit" in skill.name assert skill.confidence == 0.5 def test_extract_no_tool_calls(self, engine): seq = [{"type": "message", "role": "user", "content": "hello"}] assert engine._heuristic_extraction(seq) is None # --------------------------------------------------------------------------- # Deduplication # --------------------------------------------------------------------------- class TestDeduplication: def test_not_duplicate(self, engine): skill = DiscoveredSkill(name="Unique Skill") assert not engine._is_duplicate(skill) def test_is_duplicate(self, engine): skill = DiscoveredSkill(name="Duplicate Check") engine._save_skill(skill) new_skill = DiscoveredSkill(name="Duplicate Check") assert engine._is_duplicate(new_skill) def test_rejected_not_duplicate(self, engine): skill = DiscoveredSkill(name="Rejected Skill", status="rejected") engine._save_skill(skill) new_skill = DiscoveredSkill(name="Rejected Skill") assert not engine._is_duplicate(new_skill) # --------------------------------------------------------------------------- # Format actions # --------------------------------------------------------------------------- class TestFormatActions: def test_format_tool_call(self, engine): seq = [{"type": "tool_call", "tool": "shell", "result": "output text"}] text = engine._format_actions(seq) assert "shell" in text assert "output text" in text def test_format_message(self, engine): seq = [{"type": "message", "role": "timmy", "content": "I analyzed the code"}] text = engine._format_actions(seq) assert "I analyzed the code" in text def test_format_decision(self, engine): seq = [{"type": "decision", "decision": "Use async"}] text = engine._format_actions(seq) assert "Use async" in text # --------------------------------------------------------------------------- # Scan integration (mocked) # --------------------------------------------------------------------------- class TestScan: @pytest.mark.asyncio async def test_scan_too_few_actions(self, engine): with patch.object(engine, "_load_recent_successful_actions", return_value=[]): result = await engine.scan() assert result == [] @pytest.mark.asyncio async def test_scan_discovers_skill(self, engine): now = datetime.now() entries = [ { "type": "tool_call", "tool": "search", "result": "found results", "timestamp": now.isoformat(), }, { "type": "tool_call", "tool": "analyze", "result": "analysis complete", "timestamp": (now + timedelta(seconds=10)).isoformat(), }, { "type": "tool_call", "tool": "report", "result": "report generated", "timestamp": (now + timedelta(seconds=20)).isoformat(), }, ] llm_response = json.dumps( { "name": "Research Pipeline", "description": "Search, analyze, and report", "category": "research", "template": "1. Search\n2. Analyze\n3. Report", "confidence": 0.9, } ) with ( patch.object(engine, "_load_recent_successful_actions", return_value=entries), patch( "infrastructure.router.cascade.get_router", return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})), ), patch.object(engine, "_notify", new_callable=AsyncMock), patch.object(engine, "_write_skill_file"), ): result = await engine.scan() assert len(result) == 1 assert result[0].name == "Research Pipeline" assert result[0].confidence == 0.9 @pytest.mark.asyncio async def test_scan_skips_low_confidence(self, engine): now = datetime.now() entries = [ { "type": "tool_call", "tool": "a", "result": "ok", "timestamp": now.isoformat(), }, { "type": "tool_call", "tool": "b", "result": "ok", "timestamp": (now + timedelta(seconds=10)).isoformat(), }, ] llm_response = json.dumps( {"name": "Low Conf", "confidence": 0.3, "category": "general", "template": "..."} ) with ( patch.object(engine, "_load_recent_successful_actions", return_value=entries), patch( "infrastructure.router.cascade.get_router", return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})), ), ): result = await engine.scan() assert result == [] @pytest.mark.asyncio async def test_scan_falls_back_to_heuristic(self, engine): engine.confidence_threshold = 0.4 # Lower for heuristic now = datetime.now() entries = [ { "type": "tool_call", "tool": "deploy", "result": "ok", "timestamp": now.isoformat(), }, { "type": "tool_call", "tool": "deploy", "result": "ok", "timestamp": (now + timedelta(seconds=10)).isoformat(), }, ] with ( patch.object(engine, "_load_recent_successful_actions", return_value=entries), patch( "infrastructure.router.cascade.get_router", return_value=MagicMock( complete=AsyncMock(side_effect=Exception("LLM unavailable")) ), ), patch.object(engine, "_notify", new_callable=AsyncMock), patch.object(engine, "_write_skill_file"), ): result = await engine.scan() assert len(result) == 1 assert "Deploy" in result[0].name assert result[0].confidence == 0.5