Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
92c677f029 feat: add automated skill discovery pipeline
Some checks failed
Tests / lint (pull_request) Successful in 13s
Tests / test (pull_request) Failing after 6m25s
Implements a background process that monitors session logs for successful
agent action sequences, uses the LLM router to extract reusable skill
templates, and stores them in a SQLite database. Discovered skills are
surfaced via dashboard notifications (push + WebSocket + event bus) and
a new /skills page with HTMX polling. Users can confirm, reject, or
archive discovered skills.

- src/timmy/skill_discovery.py: Core engine with LLM analysis + heuristic fallback
- src/dashboard/routes/skills.py: CRUD routes for skill management
- src/dashboard/templates/skills.html: Main skills page
- src/dashboard/templates/partials/skills_list.html: HTMX partial
- Background scheduler in app.py runs every 10 minutes
- 31 unit tests covering DB ops, clustering, parsing, dedup, and scan

Fixes #1011

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 19:56:35 -04:00
c0f6ca9fc2 [claude] Add web_fetch tool (trafilatura) for full-page content extraction (#973) (#1004)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Tests / lint (pull_request) Successful in 19s
Tests / test (pull_request) Failing after 25m3s
2026-03-22 23:03:38 +00:00
9656a5e0d0 [claude] Add connection leak and pragma unit tests for db_pool.py (#944) (#1001)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:56:58 +00:00
Alexander Whitestone
e35a23cefa [claude] Add research prompt template library (#974) (#999)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Alexander Whitestone <alexpaynex@gmail.com>
Co-committed-by: Alexander Whitestone <alexpaynex@gmail.com>
2026-03-22 22:44:02 +00:00
18 changed files with 1903 additions and 1052 deletions

View File

@@ -50,6 +50,7 @@ sounddevice = { version = ">=0.4.6", optional = true }
sentence-transformers = { version = ">=2.0.0", optional = true }
numpy = { version = ">=1.24.0", optional = true }
requests = { version = ">=2.31.0", optional = true }
trafilatura = { version = ">=1.6.0", optional = true }
GitPython = { version = ">=3.1.40", optional = true }
pytest = { version = ">=8.0.0", optional = true }
pytest-asyncio = { version = ">=0.24.0", optional = true }
@@ -67,6 +68,7 @@ voice = ["pyttsx3", "openai-whisper", "piper-tts", "sounddevice"]
celery = ["celery"]
embeddings = ["sentence-transformers", "numpy"]
git = ["GitPython"]
research = ["requests", "trafilatura"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
[tool.poetry.group.dev.dependencies]

View File

@@ -0,0 +1,67 @@
---
name: Architecture Spike
type: research
typical_query_count: 2-4
expected_output_length: 600-1200 words
cascade_tier: groq_preferred
description: >
Investigate how to connect two systems or components. Produces an integration
architecture with sequence diagram, key decisions, and a proof-of-concept outline.
---
# Architecture Spike: Connect {system_a} to {system_b}
## Context
We need to integrate **{system_a}** with **{system_b}** in the context of
**{project_context}**. This spike answers: what is the best way to wire them
together, and what are the trade-offs?
## Constraints
- Prefer approaches that avoid adding new infrastructure dependencies.
- The integration should be **{sync_or_async}** (synchronous / asynchronous).
- Must work within: {environment_constraints}.
## Research Steps
1. Identify the APIs / protocols exposed by both systems.
2. List all known integration patterns (direct API, message queue, webhook, SDK, etc.).
3. Evaluate each pattern for complexity, reliability, and latency.
4. Select the recommended approach and outline a proof-of-concept.
## Output Format
### Integration Options
| Pattern | Complexity | Reliability | Latency | Notes |
|---------|-----------|-------------|---------|-------|
| ... | ... | ... | ... | ... |
### Recommended Approach
**Pattern:** {pattern_name}
**Why:** One paragraph explaining the choice.
### Sequence Diagram
```
{system_a} -> {middleware} -> {system_b}
```
Describe the data flow step by step:
1. {system_a} does X...
2. {middleware} transforms / routes...
3. {system_b} receives Y...
### Proof-of-Concept Outline
- Files to create or modify
- Key libraries / dependencies needed
- Estimated effort: {effort_estimate}
### Open Questions
Bullet list of decisions that need human input before proceeding.

View File

@@ -0,0 +1,74 @@
---
name: Competitive Scan
type: research
typical_query_count: 3-5
expected_output_length: 800-1500 words
cascade_tier: groq_preferred
description: >
Compare a project against its alternatives. Produces a feature matrix,
strengths/weaknesses analysis, and positioning summary.
---
# Competitive Scan: {project} vs Alternatives
## Context
Compare **{project}** against **{alternatives}** (comma-separated list of
competitors). The goal is to understand where {project} stands and identify
differentiation opportunities.
## Constraints
- Comparison date: {date}.
- Focus areas: {focus_areas} (e.g., features, pricing, community, performance).
- Perspective: {perspective} (user, developer, business).
## Research Steps
1. Gather key facts about {project} (features, pricing, community size, release cadence).
2. Gather the same data for each alternative in {alternatives}.
3. Build a feature comparison matrix.
4. Identify strengths and weaknesses for each entry.
5. Summarize positioning and recommend next steps.
## Output Format
### Overview
One paragraph: what space does {project} compete in, and who are the main players?
### Feature Matrix
| Feature / Attribute | {project} | {alt_1} | {alt_2} | {alt_3} |
|--------------------|-----------|---------|---------|---------|
| {feature_1} | ... | ... | ... | ... |
| {feature_2} | ... | ... | ... | ... |
| Pricing | ... | ... | ... | ... |
| License | ... | ... | ... | ... |
| Community Size | ... | ... | ... | ... |
| Last Major Release | ... | ... | ... | ... |
### Strengths & Weaknesses
#### {project}
- **Strengths:** ...
- **Weaknesses:** ...
#### {alt_1}
- **Strengths:** ...
- **Weaknesses:** ...
_(Repeat for each alternative)_
### Positioning Map
Describe where each project sits along the key dimensions (e.g., simplicity
vs power, free vs paid, niche vs general).
### Recommendations
Bullet list of actions based on the competitive landscape:
- **Differentiate on:** {differentiator}
- **Watch out for:** {threat}
- **Consider adopting from {alt}:** {feature_or_approach}

View File

@@ -0,0 +1,68 @@
---
name: Game Analysis
type: research
typical_query_count: 2-3
expected_output_length: 600-1000 words
cascade_tier: local_ok
description: >
Evaluate a game for AI agent playability. Assesses API availability,
observation/action spaces, and existing bot ecosystems.
---
# Game Analysis: {game}
## Context
Evaluate **{game}** to determine whether an AI agent can play it effectively.
Focus on programmatic access, observation space, action space, and existing
bot/AI ecosystems.
## Constraints
- Platform: {platform} (PC, console, mobile, browser).
- Agent type: {agent_type} (reinforcement learning, rule-based, LLM-driven, hybrid).
- Budget for API/licenses: {budget}.
## Research Steps
1. Identify official APIs, modding support, or programmatic access methods for {game}.
2. Characterize the observation space (screen pixels, game state JSON, memory reading, etc.).
3. Characterize the action space (keyboard/mouse, API calls, controller inputs).
4. Survey existing bots, AI projects, or research papers for {game}.
5. Assess feasibility and difficulty for the target agent type.
## Output Format
### Game Profile
| Property | Value |
|-------------------|------------------------|
| Game | {game} |
| Genre | {genre} |
| Platform | {platform} |
| API Available | Yes / No / Partial |
| Mod Support | Yes / No / Limited |
| Existing AI Work | Extensive / Some / None|
### Observation Space
Describe what data the agent can access and how (API, screen capture, memory hooks, etc.).
### Action Space
Describe how the agent can interact with the game (input methods, timing constraints, etc.).
### Existing Ecosystem
List known bots, frameworks, research papers, or communities working on AI for {game}.
### Feasibility Assessment
- **Difficulty:** Easy / Medium / Hard / Impractical
- **Best approach:** {recommended_agent_type}
- **Key challenges:** Bullet list
- **Estimated time to MVP:** {time_estimate}
### Recommendation
One paragraph: should we proceed, and if so, what is the first step?

View File

@@ -0,0 +1,79 @@
---
name: Integration Guide
type: research
typical_query_count: 3-5
expected_output_length: 1000-2000 words
cascade_tier: groq_preferred
description: >
Step-by-step guide to wire a specific tool into an existing stack,
complete with code samples, configuration, and testing steps.
---
# Integration Guide: Wire {tool} into {stack}
## Context
Integrate **{tool}** into our **{stack}** stack. The goal is to
**{integration_goal}** (e.g., "add vector search to the dashboard",
"send notifications via Telegram").
## Constraints
- Must follow existing project conventions (see CLAUDE.md).
- No new cloud AI dependencies unless explicitly approved.
- Environment config via `pydantic-settings` / `config.py`.
## Research Steps
1. Review {tool}'s official documentation for installation and setup.
2. Identify the minimal dependency set required.
3. Map {tool}'s API to our existing patterns (singletons, graceful degradation).
4. Write integration code with proper error handling.
5. Define configuration variables and their defaults.
## Output Format
### Prerequisites
- Dependencies to install (with versions)
- External services or accounts required
- Environment variables to configure
### Configuration
```python
# In config.py — add these fields to Settings:
{config_fields}
```
### Implementation
```python
# {file_path}
{implementation_code}
```
### Graceful Degradation
Describe how the integration behaves when {tool} is unavailable:
| Scenario | Behavior | Log Level |
|-----------------------|--------------------|-----------|
| {tool} not installed | {fallback} | WARNING |
| {tool} unreachable | {fallback} | WARNING |
| Invalid credentials | {fallback} | ERROR |
### Testing
```python
# tests/unit/test_{tool_snake}.py
{test_code}
```
### Verification Checklist
- [ ] Dependency added to pyproject.toml
- [ ] Config fields added with sensible defaults
- [ ] Graceful degradation tested (service down)
- [ ] Unit tests pass (`tox -e unit`)
- [ ] No new linting errors (`tox -e lint`)

View File

@@ -0,0 +1,67 @@
---
name: State of the Art
type: research
typical_query_count: 4-6
expected_output_length: 1000-2000 words
cascade_tier: groq_preferred
description: >
Comprehensive survey of what currently exists in a given field or domain.
Produces a structured landscape overview with key players, trends, and gaps.
---
# State of the Art: {field} (as of {date})
## Context
Survey the current landscape of **{field}**. Identify key players, recent
developments, dominant approaches, and notable gaps. This is a point-in-time
snapshot intended to inform decision-making.
## Constraints
- Focus on developments from the last {timeframe} (e.g., 12 months, 2 years).
- Prioritize {priority} (open-source, commercial, academic, or all).
- Target audience: {audience} (technical team, leadership, general).
## Research Steps
1. Identify the major categories or sub-domains within {field}.
2. For each category, list the leading projects, companies, or research groups.
3. Note recent milestones, releases, or breakthroughs.
4. Identify emerging trends and directions.
5. Highlight gaps — things that don't exist yet but should.
## Output Format
### Executive Summary
Two to three sentences: what is the state of {field} right now?
### Landscape Map
| Category | Key Players | Maturity | Trend |
|---------------|--------------------------|-------------|-------------|
| {category_1} | {player_a}, {player_b} | Early / GA | Growing / Stable / Declining |
| {category_2} | {player_c}, {player_d} | Early / GA | Growing / Stable / Declining |
### Recent Milestones
Chronological list of notable events in the last {timeframe}:
- **{date_1}:** {event_description}
- **{date_2}:** {event_description}
### Trends
Numbered list of the top 3-5 trends shaping {field}:
1. **{trend_name}** — {one-line description}
2. **{trend_name}** — {one-line description}
### Gaps & Opportunities
Bullet list of things that are missing, underdeveloped, or ripe for innovation.
### Implications for Us
One paragraph: what does this mean for our project? What should we do next?

View File

@@ -0,0 +1,52 @@
---
name: Tool Evaluation
type: research
typical_query_count: 3-5
expected_output_length: 800-1500 words
cascade_tier: groq_preferred
description: >
Discover and evaluate all shipping tools/libraries/services in a given domain.
Produces a ranked comparison table with pros, cons, and recommendation.
---
# Tool Evaluation: {domain}
## Context
You are researching tools, libraries, and services for **{domain}**.
The goal is to find everything that is currently shipping (not vaporware)
and produce a structured comparison.
## Constraints
- Only include tools that have public releases or hosted services available today.
- If a tool is in beta/preview, note that clearly.
- Focus on {focus_criteria} when evaluating (e.g., cost, ease of integration, community size).
## Research Steps
1. Identify all actively-maintained tools in the **{domain}** space.
2. For each tool, gather: name, URL, license/pricing, last release date, language/platform.
3. Evaluate each tool against the focus criteria.
4. Rank by overall fit for the use case: **{use_case}**.
## Output Format
### Summary
One paragraph: what the landscape looks like and the top recommendation.
### Comparison Table
| Tool | License / Price | Last Release | Language | {focus_criteria} Score | Notes |
|------|----------------|--------------|----------|----------------------|-------|
| ... | ... | ... | ... | ... | ... |
### Top Pick
- **Recommended:** {tool_name} — {one-line reason}
- **Runner-up:** {tool_name} — {one-line reason}
### Risks & Gaps
Bullet list of things to watch out for (missing features, vendor lock-in, etc.).

View File

@@ -45,6 +45,7 @@ from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.skills import router as skills_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -218,6 +219,32 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
_SKILL_DISCOVERY_INTERVAL = 600 # 10 minutes
async def _skill_discovery_scheduler() -> None:
"""Background task: scan session logs for reusable skill patterns."""
await asyncio.sleep(20) # Stagger after other schedulers
while True:
try:
from timmy.skill_discovery import get_skill_discovery_engine
engine = get_skill_discovery_engine()
discovered = await engine.scan()
if discovered:
logger.info(
"Skill discovery: found %d new skill(s)",
len(discovered),
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Skill discovery scheduler error: %s", exc)
await asyncio.sleep(_SKILL_DISCOVERY_INTERVAL)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -380,6 +407,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_skill_discovery_scheduler()),
]
@@ -631,6 +659,7 @@ app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(skills_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,82 @@
"""Skill Discovery routes — view and manage auto-discovered skills."""
import logging
from fastapi import APIRouter, Form, HTTPException, Request
from fastapi.responses import HTMLResponse
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/skills", tags=["skills"])
@router.get("", response_class=HTMLResponse)
async def skills_page(request: Request):
"""Main skill discovery page."""
from timmy.skill_discovery import get_skill_discovery_engine
engine = get_skill_discovery_engine()
skills = engine.list_skills(limit=50)
counts = engine.skill_count()
return templates.TemplateResponse(
request,
"skills.html",
{"skills": skills, "counts": counts},
)
@router.get("/list", response_class=HTMLResponse)
async def skills_list_partial(request: Request, status: str = ""):
"""HTMX partial: return skill list for polling."""
from timmy.skill_discovery import get_skill_discovery_engine
engine = get_skill_discovery_engine()
skills = engine.list_skills(status=status or None, limit=50)
counts = engine.skill_count()
return templates.TemplateResponse(
request,
"partials/skills_list.html",
{"skills": skills, "counts": counts},
)
@router.post("/{skill_id}/status", response_class=HTMLResponse)
async def update_skill_status(request: Request, skill_id: str, status: str = Form(...)):
"""Update a skill's status (confirm / reject / archive)."""
from timmy.skill_discovery import get_skill_discovery_engine
engine = get_skill_discovery_engine()
if not engine.update_status(skill_id, status):
raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
skills = engine.list_skills(limit=50)
counts = engine.skill_count()
return templates.TemplateResponse(
request,
"partials/skills_list.html",
{"skills": skills, "counts": counts},
)
@router.post("/scan", response_class=HTMLResponse)
async def trigger_scan(request: Request):
"""Manually trigger a skill discovery scan."""
from timmy.skill_discovery import get_skill_discovery_engine
engine = get_skill_discovery_engine()
try:
discovered = await engine.scan()
msg = f"Scan complete: {len(discovered)} new skill(s) found."
except Exception as exc:
logger.warning("Manual skill scan failed: %s", exc)
msg = f"Scan failed: {exc}"
skills = engine.list_skills(limit=50)
counts = engine.skill_count()
return templates.TemplateResponse(
request,
"partials/skills_list.html",
{"skills": skills, "counts": counts, "scan_message": msg},
)

View File

@@ -0,0 +1,74 @@
{% if scan_message is defined and scan_message %}
<div class="alert alert-info mb-3" style="border-color: var(--green); background: var(--bg-card); color: var(--text);">
{{ scan_message }}
</div>
{% endif %}
{% if skills %}
<div class="table-responsive">
<table class="table table-sm" style="color: var(--text);">
<thead>
<tr style="color: var(--text-dim); border-bottom: 1px solid var(--border);">
<th>Name</th>
<th>Category</th>
<th>Confidence</th>
<th>Status</th>
<th>Discovered</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for skill in skills %}
<tr style="border-bottom: 1px solid var(--border);">
<td>
<strong>{{ skill.name }}</strong>
{% if skill.description %}
<br><small class="mc-muted">{{ skill.description[:100] }}</small>
{% endif %}
</td>
<td><span class="badge" style="background: var(--bg-panel); color: var(--text-dim);">{{ skill.category }}</span></td>
<td>
{% set conf = skill.confidence * 100 %}
<span style="color: {% if conf >= 80 %}var(--green){% elif conf >= 60 %}var(--amber){% else %}var(--red){% endif %};">
{{ "%.0f"|format(conf) }}%
</span>
</td>
<td>
{% if skill.status == 'confirmed' %}
<span style="color: var(--green);">confirmed</span>
{% elif skill.status == 'rejected' %}
<span style="color: var(--red);">rejected</span>
{% elif skill.status == 'archived' %}
<span class="mc-muted">archived</span>
{% else %}
<span style="color: var(--amber);">discovered</span>
{% endif %}
</td>
<td class="mc-muted">{{ skill.created_at[:10] if skill.created_at else '' }}</td>
<td>
{% if skill.status == 'discovered' %}
<form style="display:inline;" hx-post="/skills/{{ skill.id }}/status" hx-target="#skills-list" hx-swap="innerHTML">
<input type="hidden" name="status" value="confirmed">
<button type="submit" class="btn btn-sm btn-outline-success" title="Confirm">&#10003;</button>
</form>
<form style="display:inline;" hx-post="/skills/{{ skill.id }}/status" hx-target="#skills-list" hx-swap="innerHTML">
<input type="hidden" name="status" value="rejected">
<button type="submit" class="btn btn-sm btn-outline-danger" title="Reject">&#10007;</button>
</form>
{% elif skill.status == 'confirmed' %}
<form style="display:inline;" hx-post="/skills/{{ skill.id }}/status" hx-target="#skills-list" hx-swap="innerHTML">
<input type="hidden" name="status" value="archived">
<button type="submit" class="btn btn-sm btn-outline-secondary" title="Archive">&#9744;</button>
</form>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="mc-muted text-center py-4">
No skills discovered yet. Click "Scan Now" to analyze recent activity.
</div>
{% endif %}

View File

@@ -0,0 +1,38 @@
{% extends "base.html" %}
{% block title %}Skill Discovery - Timmy Time{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="py-3">
{% from "macros.html" import panel %}
{% call panel("SKILL DISCOVERY", id="skills-panel") %}
<div class="d-flex justify-content-between align-items-center mb-3">
<div>
<span class="mc-muted">
Discovered: {{ counts.get('discovered', 0) }} |
Confirmed: {{ counts.get('confirmed', 0) }} |
Archived: {{ counts.get('archived', 0) }}
</span>
</div>
<button class="btn btn-sm btn-outline-light"
hx-post="/skills/scan"
hx-target="#skills-list"
hx-swap="innerHTML">
Scan Now
</button>
</div>
<div id="skills-list"
hx-get="/skills/list"
hx-trigger="every 30s"
hx-swap="innerHTML">
{% include "partials/skills_list.html" %}
</div>
{% endcall %}
</div>
{% endblock %}

View File

@@ -1,555 +0,0 @@
"""ResearchOrchestrator — autonomous research pipeline.
Chains: Check Local → Generate Queries → Search → Fetch → Synthesize →
Crystallize → Write Artifact into an end-to-end research workflow.
Usage:
from timmy.research import ResearchOrchestrator, run_research
orchestrator = ResearchOrchestrator(cascade=router, memory=memory_fns)
result = await orchestrator.run("Bitcoin Lightning Network scaling")
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Data structures ──────────────────────────────────────────────────────────
CONFIDENCE_THRESHOLD = 0.85
DEFAULT_QUERIES_PER_TOPIC = 8
DEFAULT_RESULTS_PER_QUERY = 5
DEFAULT_PAGES_TO_FETCH = 10
DEFAULT_FETCH_TOKEN_LIMIT = 3000
DEFAULT_SYNTHESIS_MAX_TOKENS = 4000
@dataclass
class ResearchResult:
"""Output of a completed research pipeline run."""
topic: str
report: str
queries_generated: list[str] = field(default_factory=list)
sources: list[dict[str, str]] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
cache_hit: bool = False
duration_ms: float = 0.0
metrics: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@dataclass
class SearchSnippet:
"""A single search result snippet."""
title: str
url: str
snippet: str
relevance: float = 0.0
@dataclass
class FetchedPage:
"""A fetched and truncated web page."""
url: str
title: str
content: str
token_estimate: int = 0
# ── Memory interface ─────────────────────────────────────────────────────────
@dataclass
class MemoryInterface:
"""Abstraction over the memory system for research.
Accepts callables so the orchestrator doesn't depend on a specific
memory implementation. Defaults wire to timmy.memory_system.
"""
search_fn: Any = None # (query, limit) -> list[MemoryEntry]
store_fn: Any = None # (content, source, context_type, ...) -> MemoryEntry
def __post_init__(self):
if self.search_fn is None or self.store_fn is None:
self._load_defaults()
def _load_defaults(self):
try:
from timmy.memory_system import search_memories, store_memory
if self.search_fn is None:
self.search_fn = search_memories
if self.store_fn is None:
self.store_fn = store_memory
except ImportError:
logger.warning("Memory system not available — research will skip caching")
if self.search_fn is None:
self.search_fn = lambda query, **kw: []
if self.store_fn is None:
self.store_fn = lambda content, source, **kw: None
# ── Tool interface ───────────────────────────────────────────────────────────
@dataclass
class ResearchTools:
"""Web search and fetch callables.
These are async callables:
web_search(query: str, limit: int) -> list[dict]
web_fetch(url: str, max_tokens: int) -> str
"""
web_search: Any = None
web_fetch: Any = None
# ── Orchestrator ─────────────────────────────────────────────────────────────
class ResearchOrchestrator:
"""Pipeline that chains research steps into an autonomous workflow.
Steps:
0. CHECK LOCAL KNOWLEDGE — search memory, return cached if confident
1. GENERATE QUERIES — ask LLM to produce search queries
2. SEARCH — execute queries via web_search tool
3. FETCH — rank snippets, fetch top pages
4. SYNTHESIZE — produce structured report via LLM
5. CRYSTALLIZE — store result in semantic memory
6. WRITE ARTIFACT — create Gitea issues from action items
"""
def __init__(
self,
cascade: Any,
memory: MemoryInterface | None = None,
tools: ResearchTools | None = None,
) -> None:
self.cascade = cascade
self.memory = memory or MemoryInterface()
self.tools = tools or ResearchTools()
self._metrics: dict[str, int] = {
"research_cache_hit": 0,
"research_api_call": 0,
}
async def run(
self,
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Execute the full research pipeline.
Args:
topic: The research topic or question.
template: Optional prompt template for synthesis.
context: Additional context dict (cascade_tier hint, etc.).
Returns:
ResearchResult with report, sources, and action items.
"""
start = time.monotonic()
context = context or {}
cascade_tier = context.get("cascade_tier")
# Step 0: Check local knowledge
cached = await self._check_local_knowledge(topic)
if cached is not None:
self._metrics["research_cache_hit"] += 1
cached.duration_ms = (time.monotonic() - start) * 1000
return cached
self._metrics["research_api_call"] += 1
# Step 1: Generate queries
queries = await self._generate_queries(topic, template, cascade_tier)
# Step 2: Search
snippets = await self._search(queries)
# Step 3: Fetch top pages
pages = await self._fetch(snippets)
# Step 4: Synthesize
report = await self._synthesize(topic, template, pages, cascade_tier)
# Step 5: Extract action items
action_items = _extract_action_items(report)
# Build result
sources = [{"url": p.url, "title": p.title} for p in pages]
result = ResearchResult(
topic=topic,
report=report,
queries_generated=queries,
sources=sources,
action_items=action_items,
cache_hit=False,
duration_ms=(time.monotonic() - start) * 1000,
metrics=dict(self._metrics),
)
# Step 6: Crystallize — store in memory
await self._crystallize(topic, result)
# Step 7: Write artifact — create Gitea issues
await self._write_artifact(result)
return result
# ── Pipeline steps ───────────────────────────────────────────────────
async def _check_local_knowledge(self, topic: str) -> ResearchResult | None:
"""Search semantic memory for existing research on this topic."""
try:
results = self.memory.search_fn(
query=topic, limit=10, context_type="research"
)
if not results:
return None
# Check if top result has high confidence
top = results[0]
score = getattr(top, "relevance_score", 0.0) or 0.0
if score >= CONFIDENCE_THRESHOLD:
content = getattr(top, "content", str(top))
logger.info(
"Research cache hit for '%s' (score=%.2f)", topic, score
)
return ResearchResult(
topic=topic,
report=content,
cache_hit=True,
metrics={"research_cache_hit": 1},
)
except Exception as exc:
logger.warning("Local knowledge check failed: %s", exc)
return None
async def _generate_queries(
self,
topic: str,
template: str | None,
cascade_tier: str | None,
) -> list[str]:
"""Ask the LLM to generate search queries for the topic."""
prompt = (
f"Generate {DEFAULT_QUERIES_PER_TOPIC} diverse web search queries "
f"to thoroughly research the following topic. Return ONLY the "
f"queries, one per line, no numbering or bullets.\n\n"
f"Topic: {topic}"
)
if template:
prompt += f"\n\nResearch template context:\n{template}"
messages = [
{"role": "system", "content": "You are a research query generator."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {"messages": messages, "temperature": 0.7}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
raw = response.get("content", "")
queries = [
line.strip()
for line in raw.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# Clean numbering prefixes
cleaned = []
for q in queries:
q = re.sub(r"^\d+[\.\)]\s*", "", q)
q = re.sub(r"^[-*]\s*", "", q)
if q:
cleaned.append(q)
return cleaned[:DEFAULT_QUERIES_PER_TOPIC + 4] # slight over-generate
except Exception as exc:
logger.warning("Query generation failed: %s", exc)
# Fallback: use topic itself as a single query
return [topic]
async def _search(self, queries: list[str]) -> list[SearchSnippet]:
"""Execute search queries and collect snippets."""
if not self.tools.web_search:
logger.warning("No web_search tool configured — skipping search step")
return []
all_snippets: list[SearchSnippet] = []
async def _run_query(query: str) -> list[SearchSnippet]:
try:
results = await asyncio.to_thread(
self.tools.web_search, query, DEFAULT_RESULTS_PER_QUERY
)
snippets = []
for r in (results or []):
snippets.append(
SearchSnippet(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("snippet", ""),
)
)
return snippets
except Exception as exc:
logger.warning("Search failed for query '%s': %s", query, exc)
return []
# Run searches concurrently
tasks = [_run_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for snippets in results:
all_snippets.extend(snippets)
# Deduplicate by URL
seen_urls: set[str] = set()
unique: list[SearchSnippet] = []
for s in all_snippets:
if s.url and s.url not in seen_urls:
seen_urls.add(s.url)
unique.append(s)
return unique
async def _fetch(self, snippets: list[SearchSnippet]) -> list[FetchedPage]:
"""Fetch top pages from search snippets."""
if not self.tools.web_fetch:
logger.warning("No web_fetch tool configured — skipping fetch step")
return []
# Take top N snippets
to_fetch = snippets[:DEFAULT_PAGES_TO_FETCH]
pages: list[FetchedPage] = []
async def _fetch_one(snippet: SearchSnippet) -> FetchedPage | None:
try:
content = await asyncio.to_thread(
self.tools.web_fetch, snippet.url, DEFAULT_FETCH_TOKEN_LIMIT
)
if content:
return FetchedPage(
url=snippet.url,
title=snippet.title,
content=content[:DEFAULT_FETCH_TOKEN_LIMIT * 4],
token_estimate=len(content.split()),
)
except Exception as exc:
logger.warning("Fetch failed for %s: %s", snippet.url, exc)
return None
tasks = [_fetch_one(s) for s in to_fetch]
results = await asyncio.gather(*tasks)
for page in results:
if page is not None:
pages.append(page)
return pages
async def _synthesize(
self,
topic: str,
template: str | None,
pages: list[FetchedPage],
cascade_tier: str | None,
) -> str:
"""Synthesize fetched pages into a structured research report."""
# Build context from fetched pages
context_parts = []
for i, page in enumerate(pages, 1):
context_parts.append(
f"--- Source {i}: {page.title} ({page.url}) ---\n"
f"{page.content[:DEFAULT_FETCH_TOKEN_LIMIT * 4]}\n"
)
sources_text = "\n".join(context_parts) if context_parts else "(no sources fetched)"
if template:
prompt = (
f"{template}\n\n"
f"Topic: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Synthesize a comprehensive report based on the sources above."
)
else:
prompt = (
f"Write a comprehensive research report on: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Structure your report with:\n"
f"- Executive summary\n"
f"- Key findings\n"
f"- Analysis\n"
f"- Action items (prefix each with 'ACTION:')\n"
f"- Sources cited"
)
messages = [
{"role": "system", "content": "You are a research analyst producing structured reports."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {
"messages": messages,
"temperature": 0.3,
"max_tokens": DEFAULT_SYNTHESIS_MAX_TOKENS,
}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
return response.get("content", "")
except Exception as exc:
logger.error("Synthesis failed: %s", exc)
# Fallback: return raw source summaries
return (
f"# Research: {topic}\n\n"
f"Synthesis failed ({exc}). Raw sources:\n\n{sources_text}"
)
async def _crystallize(self, topic: str, result: ResearchResult) -> None:
"""Store the research result in semantic memory."""
try:
self.memory.store_fn(
content=result.report,
source="research_orchestrator",
context_type="research",
metadata={
"topic": topic,
"sources": result.sources,
"action_items": result.action_items,
"cache_hit": result.cache_hit,
"duration_ms": result.duration_ms,
},
)
logger.info("Crystallized research on '%s' into memory", topic)
except Exception as exc:
logger.warning("Failed to crystallize research: %s", exc)
async def _write_artifact(self, result: ResearchResult) -> None:
"""Create Gitea issues from action items."""
if not result.action_items:
return
try:
await asyncio.to_thread(_create_gitea_issues, result)
except Exception as exc:
logger.warning("Failed to create Gitea issues: %s", exc)
def get_metrics(self) -> dict[str, int]:
"""Return current research pipeline metrics."""
return dict(self._metrics)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _extract_action_items(report: str) -> list[str]:
"""Extract action items from a research report.
Looks for lines prefixed with ACTION:, TODO:, or - [ ].
"""
items: list[str] = []
for line in report.splitlines():
stripped = line.strip()
# ACTION: prefix
match = re.match(r"^(?:ACTION|TODO)\s*:\s*(.+)", stripped, re.IGNORECASE)
if match:
items.append(match.group(1).strip())
continue
# Markdown checkbox
match = re.match(r"^-\s*\[\s*\]\s*(.+)", stripped)
if match:
items.append(match.group(1).strip())
return items
def _create_gitea_issues(result: ResearchResult) -> None:
"""Create Gitea issues for action items (runs in thread)."""
if not settings.gitea_token or not settings.gitea_url:
logger.debug("Gitea not configured — skipping issue creation")
return
try:
import requests
except ImportError:
logger.debug("requests not available — skipping Gitea issue creation")
return
base_url = settings.gitea_url.rstrip("/")
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
for item in result.action_items:
try:
payload = {
"title": f"[research] {item[:100]}",
"body": (
f"Auto-generated from research on: **{result.topic}**\n\n"
f"Action item: {item}\n\n"
f"---\n"
f"_Created by ResearchOrchestrator_"
),
}
resp = requests.post(
f"{base_url}/api/v1/repos/{repo}/issues",
headers=headers,
json=payload,
timeout=10,
)
if resp.status_code in (200, 201):
logger.info("Created Gitea issue: %s", item[:60])
else:
logger.warning(
"Gitea issue creation failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to create issue '%s': %s", item[:60], exc)
# ── Convenience function ─────────────────────────────────────────────────────
async def run_research(
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Convenience function to run research with default dependencies.
Creates a ResearchOrchestrator with the cascade router singleton
and default memory, then executes the pipeline.
"""
from infrastructure.router.cascade import get_router
cascade = get_router()
orchestrator = ResearchOrchestrator(cascade=cascade)
return await orchestrator.run(topic, template=template, context=context)

View File

@@ -0,0 +1,495 @@
"""Automated Skill Discovery Pipeline.
Monitors the agent's session logs for high-confidence successful outcomes,
uses the LLM router to deconstruct successful action sequences into
reusable skill templates, and stores discovered skills with metadata.
Notifies the dashboard when new skills are crystallized.
"""
import json
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
DB_PATH = Path(settings.repo_root) / "data" / "skills.db"
_SCHEMA = """
CREATE TABLE IF NOT EXISTS discovered_skills (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT DEFAULT '',
category TEXT DEFAULT 'general',
source_entries TEXT DEFAULT '[]',
template TEXT DEFAULT '',
confidence REAL DEFAULT 0.0,
status TEXT DEFAULT 'discovered',
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_skills_status ON discovered_skills(status);
CREATE INDEX IF NOT EXISTS idx_skills_category ON discovered_skills(category);
CREATE INDEX IF NOT EXISTS idx_skills_created ON discovered_skills(created_at);
"""
VALID_STATUSES = {"discovered", "confirmed", "rejected", "archived"}
@contextmanager
def _get_db() -> Generator[sqlite3.Connection, None, None]:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout = {settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
yield conn
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class DiscoveredSkill:
"""A skill extracted from successful agent actions."""
id: str = field(default_factory=lambda: f"skill_{uuid.uuid4().hex[:12]}")
name: str = ""
description: str = ""
category: str = "general"
source_entries: list[dict] = field(default_factory=list)
template: str = ""
confidence: float = 0.0
status: str = "discovered"
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
updated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"category": self.category,
"source_entries": self.source_entries,
"template": self.template,
"confidence": self.confidence,
"status": self.status,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
# ---------------------------------------------------------------------------
# Prompt template for LLM analysis
# ---------------------------------------------------------------------------
_ANALYSIS_PROMPT = """\
You are a skill extraction engine. Analyze the following sequence of \
successful agent actions and extract a reusable skill template.
Actions:
{actions}
Respond with a JSON object containing:
- "name": short skill name (2-5 words)
- "description": one-sentence description of what this skill does
- "category": one of "research", "coding", "devops", "communication", "analysis", "general"
- "template": a step-by-step template that generalizes this action sequence
- "confidence": your confidence that this is a genuinely reusable skill (0.0-1.0)
Respond ONLY with valid JSON, no markdown fences or extra text."""
# ---------------------------------------------------------------------------
# Core engine
# ---------------------------------------------------------------------------
class SkillDiscoveryEngine:
"""Scans session logs for successful action patterns and extracts skills."""
def __init__(
self,
confidence_threshold: float = 0.7,
min_actions: int = 2,
):
self.confidence_threshold = confidence_threshold
self.min_actions = min_actions
# -- Public API ---------------------------------------------------------
async def scan(self) -> list[DiscoveredSkill]:
"""Scan recent session logs and discover new skills.
Returns a list of newly discovered skills.
"""
entries = self._load_recent_successful_actions()
if len(entries) < self.min_actions:
logger.debug(
"Skill discovery: only %d actions found (need %d), skipping",
len(entries),
self.min_actions,
)
return []
# Group entries into action sequences (tool calls clustered together)
sequences = self._cluster_action_sequences(entries)
discovered: list[DiscoveredSkill] = []
for seq in sequences:
if len(seq) < self.min_actions:
continue
skill = await self._analyze_sequence(seq)
if skill and skill.confidence >= self.confidence_threshold:
# Check for duplicates
if not self._is_duplicate(skill):
self._save_skill(skill)
await self._notify(skill)
discovered.append(skill)
logger.info(
"Discovered skill: %s (confidence=%.2f)",
skill.name,
skill.confidence,
)
return discovered
def list_skills(
self,
status: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
"""Return discovered skills from the database."""
with _get_db() as conn:
if status and status in VALID_STATUSES:
rows = conn.execute(
"SELECT * FROM discovered_skills WHERE status = ? "
"ORDER BY created_at DESC LIMIT ?",
(status, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM discovered_skills ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_skill(self, skill_id: str) -> dict[str, Any] | None:
"""Get a single skill by ID."""
with _get_db() as conn:
row = conn.execute(
"SELECT * FROM discovered_skills WHERE id = ?",
(skill_id,),
).fetchone()
return dict(row) if row else None
def update_status(self, skill_id: str, new_status: str) -> bool:
"""Update a skill's status (confirm, reject, archive)."""
if new_status not in VALID_STATUSES:
return False
with _get_db() as conn:
conn.execute(
"UPDATE discovered_skills SET status = ?, updated_at = ? WHERE id = ?",
(new_status, datetime.now(UTC).isoformat(), skill_id),
)
conn.commit()
return True
def skill_count(self) -> dict[str, int]:
"""Return counts of skills by status."""
with _get_db() as conn:
rows = conn.execute(
"SELECT status, COUNT(*) as cnt FROM discovered_skills GROUP BY status"
).fetchall()
return {r["status"]: r["cnt"] for r in rows}
# -- Internal -----------------------------------------------------------
def _load_recent_successful_actions(self, limit: int = 100) -> list[dict]:
"""Load recent successful tool calls from session logs."""
try:
from timmy.session_logger import get_session_logger
sl = get_session_logger()
entries = sl.get_recent_entries(limit=limit)
# Filter for successful tool calls and high-confidence messages
return [
e
for e in entries
if (e.get("type") == "tool_call")
or (
e.get("type") == "message"
and e.get("role") == "timmy"
and (e.get("confidence") or 0) >= 0.7
)
]
except Exception as exc:
logger.warning("Failed to load session entries: %s", exc)
return []
def _cluster_action_sequences(
self,
entries: list[dict],
max_gap_seconds: int = 300,
) -> list[list[dict]]:
"""Group entries into sequences based on temporal proximity."""
if not entries:
return []
from datetime import datetime as dt
sequences: list[list[dict]] = []
current_seq: list[dict] = [entries[0]]
for entry in entries[1:]:
try:
prev_ts = dt.fromisoformat(current_seq[-1].get("timestamp", ""))
curr_ts = dt.fromisoformat(entry.get("timestamp", ""))
gap = abs((curr_ts - prev_ts).total_seconds())
except (ValueError, TypeError):
gap = max_gap_seconds + 1
if gap <= max_gap_seconds:
current_seq.append(entry)
else:
if current_seq:
sequences.append(current_seq)
current_seq = [entry]
if current_seq:
sequences.append(current_seq)
return sequences
async def _analyze_sequence(self, sequence: list[dict]) -> DiscoveredSkill | None:
"""Use the LLM router to analyze an action sequence."""
actions_text = self._format_actions(sequence)
prompt = _ANALYSIS_PROMPT.format(actions=actions_text)
try:
from infrastructure.router.cascade import get_router
router = get_router()
response = await router.complete(
messages=[
{
"role": "system",
"content": "You extract reusable skills from agent actions.",
},
{"role": "user", "content": prompt},
],
)
content = response.get("content", "")
return self._parse_llm_response(content, sequence)
except Exception as exc:
logger.warning("LLM analysis failed, using heuristic: %s", exc)
return self._heuristic_extraction(sequence)
def _format_actions(self, sequence: list[dict]) -> str:
"""Format action sequence for the LLM prompt."""
lines = []
for i, entry in enumerate(sequence, 1):
etype = entry.get("type", "unknown")
if etype == "tool_call":
tool = entry.get("tool", "unknown")
result = (entry.get("result") or "")[:200]
lines.append(f"{i}. Tool: {tool}{result}")
elif etype == "message":
content = (entry.get("content") or "")[:200]
lines.append(f"{i}. Response: {content}")
elif etype == "decision":
decision = (entry.get("decision") or "")[:200]
lines.append(f"{i}. Decision: {decision}")
return "\n".join(lines)
def _parse_llm_response(
self,
content: str,
source_entries: list[dict],
) -> DiscoveredSkill | None:
"""Parse LLM JSON response into a DiscoveredSkill."""
try:
# Strip markdown fences if present
cleaned = content.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
data = json.loads(cleaned)
return DiscoveredSkill(
name=data.get("name", "Unnamed Skill"),
description=data.get("description", ""),
category=data.get("category", "general"),
template=data.get("template", ""),
confidence=float(data.get("confidence", 0.0)),
source_entries=source_entries[:5], # Keep first 5 for reference
)
except (json.JSONDecodeError, ValueError, TypeError) as exc:
logger.debug("Failed to parse LLM response: %s", exc)
return None
def _heuristic_extraction(self, sequence: list[dict]) -> DiscoveredSkill | None:
"""Fallback: extract skill from action patterns without LLM."""
tool_calls = [e for e in sequence if e.get("type") == "tool_call"]
if not tool_calls:
return None
# Name from the dominant tool
tool_names = [e.get("tool", "unknown") for e in tool_calls]
dominant_tool = max(set(tool_names), key=tool_names.count)
# Simple template from the tool sequence
steps = []
for i, tc in enumerate(tool_calls[:10], 1):
steps.append(f"Step {i}: Use {tc.get('tool', 'unknown')}")
return DiscoveredSkill(
name=f"{dominant_tool.replace('_', ' ').title()} Pattern",
description=f"Automated pattern using {dominant_tool} ({len(tool_calls)} steps)",
category="general",
template="\n".join(steps),
confidence=0.5, # Lower confidence for heuristic
source_entries=sequence[:5],
)
def _is_duplicate(self, skill: DiscoveredSkill) -> bool:
"""Check if a similar skill already exists."""
with _get_db() as conn:
rows = conn.execute(
"SELECT name FROM discovered_skills WHERE name = ? AND status != 'rejected'",
(skill.name,),
).fetchall()
return len(rows) > 0
def _save_skill(self, skill: DiscoveredSkill) -> None:
"""Persist a discovered skill to the database."""
with _get_db() as conn:
conn.execute(
"""INSERT INTO discovered_skills
(id, name, description, category, source_entries,
template, confidence, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
skill.id,
skill.name,
skill.description,
skill.category,
json.dumps(skill.source_entries),
skill.template,
skill.confidence,
skill.status,
skill.created_at,
skill.updated_at,
),
)
conn.commit()
def _write_skill_file(self, skill: DiscoveredSkill) -> Path:
"""Write a skill template to the skills/ directory."""
skills_dir = Path(settings.repo_root) / "skills" / "discovered"
skills_dir.mkdir(parents=True, exist_ok=True)
filename = skill.name.lower().replace(" ", "_") + ".md"
filepath = skills_dir / filename
content = f"""# {skill.name}
**Category:** {skill.category}
**Confidence:** {skill.confidence:.0%}
**Discovered:** {skill.created_at[:10]}
**Status:** {skill.status}
## Description
{skill.description}
## Template
{skill.template}
"""
filepath.write_text(content)
logger.info("Wrote skill file: %s", filepath)
return filepath
async def _notify(self, skill: DiscoveredSkill) -> None:
"""Notify the dashboard about a newly discovered skill."""
# Push notification
try:
from infrastructure.notifications.push import notifier
notifier.notify(
title="Skill Discovered",
message=f"{skill.name} (confidence: {skill.confidence:.0%})",
category="system",
)
except Exception as exc:
logger.debug("Push notification failed: %s", exc)
# WebSocket broadcast
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"skill_discovered",
{
"id": skill.id,
"name": skill.name,
"confidence": skill.confidence,
"category": skill.category,
},
)
except Exception as exc:
logger.debug("WebSocket broadcast failed: %s", exc)
# Event bus
try:
from infrastructure.events.bus import Event, get_event_bus
await get_event_bus().publish(
Event(
type="skill.discovered",
source="skill_discovery",
data=skill.to_dict(),
)
)
except Exception as exc:
logger.debug("Event bus publish failed: %s", exc)
# Write skill file to skills/ directory
try:
self._write_skill_file(skill)
except Exception as exc:
logger.debug("Skill file write failed: %s", exc)
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_engine: SkillDiscoveryEngine | None = None
def get_skill_discovery_engine() -> SkillDiscoveryEngine:
"""Get or create the global skill discovery engine."""
global _engine
if _engine is None:
_engine = SkillDiscoveryEngine()
return _engine

View File

@@ -473,6 +473,69 @@ def consult_grok(query: str) -> str:
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
"""Register the web_fetch tool for full-page content extraction."""
try:
toolkit.register(web_fetch, name="web_fetch")
except Exception as exc:
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
"""Register core execution and file tools."""
# Python execution
@@ -672,6 +735,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
_register_core_tools(toolkit, base_path)
_register_web_fetch_tool(toolkit)
_register_grok_tool(toolkit)
_register_memory_tools(toolkit)
_register_agentic_loop_tool(toolkit)
@@ -829,6 +893,11 @@ def _analysis_tool_catalog() -> dict:
"description": "Evaluate mathematical expressions with exact results",
"available_in": ["orchestrator"],
},
"web_fetch": {
"name": "Web Fetch",
"description": "Fetch a web page and extract clean readable text (trafilatura)",
"available_in": ["orchestrator"],
},
}

View File

@@ -242,6 +242,145 @@ class TestCloseAll:
conn.execute("SELECT 1")
class TestConnectionLeaks:
"""Test that connections do not leak."""
def test_get_connection_after_close_returns_fresh_connection(self, tmp_path):
"""After close, get_connection() returns a new working connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn1 = pool.get_connection()
pool.close_connection()
conn2 = pool.get_connection()
assert conn2 is not conn1
# New connection must be usable
cursor = conn2.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
pool.close_connection()
def test_context_manager_does_not_leak_connection(self, tmp_path):
"""After context manager exit, thread-local conn is cleared."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection():
pass
# Thread-local should be cleaned up
assert pool._local.conn is None
def test_context_manager_exception_does_not_leak_connection(self, tmp_path):
"""Connection is cleaned up even when an exception occurs."""
pool = ConnectionPool(tmp_path / "test.db")
try:
with pool.connection():
raise RuntimeError("boom")
except RuntimeError:
pass
assert pool._local.conn is None
def test_threads_do_not_leak_into_each_other(self, tmp_path):
"""A connection opened in one thread is invisible to another."""
pool = ConnectionPool(tmp_path / "test.db")
# Open a connection on main thread
pool.get_connection()
visible_from_other_thread = []
def check():
has_conn = hasattr(pool._local, "conn") and pool._local.conn is not None
visible_from_other_thread.append(has_conn)
t = threading.Thread(target=check)
t.start()
t.join()
assert visible_from_other_thread == [False]
pool.close_connection()
def test_repeated_open_close_cycles(self, tmp_path):
"""Repeated open/close cycles do not accumulate leaked connections."""
pool = ConnectionPool(tmp_path / "test.db")
for _ in range(50):
with pool.connection() as conn:
conn.execute("SELECT 1")
# After each cycle, connection should be cleaned up
assert pool._local.conn is None
class TestPragmaApplication:
"""Test that SQLite pragmas can be applied and persist on pooled connections.
The codebase uses WAL journal mode and busy_timeout pragmas on connections
obtained from the pool. These tests verify that pattern works correctly.
"""
def test_wal_journal_mode_persists(self, tmp_path):
"""WAL journal mode set on a pooled connection persists for its lifetime."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
conn.execute("PRAGMA journal_mode=WAL")
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal"
# Same connection should retain the pragma
same_conn = pool.get_connection()
mode2 = same_conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode2 == "wal"
pool.close_connection()
def test_busy_timeout_persists(self, tmp_path):
"""busy_timeout pragma set on a pooled connection persists."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
conn.execute("PRAGMA busy_timeout=5000")
timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
assert timeout == 5000
pool.close_connection()
def test_pragmas_apply_per_connection(self, tmp_path):
"""Pragmas set on one thread's connection are independent of another's."""
pool = ConnectionPool(tmp_path / "test.db")
conn_main = pool.get_connection()
conn_main.execute("PRAGMA cache_size=9999")
other_cache = []
def check_pragma():
conn = pool.get_connection()
# Don't set cache_size — should get the default, not 9999
val = conn.execute("PRAGMA cache_size").fetchone()[0]
other_cache.append(val)
pool.close_connection()
t = threading.Thread(target=check_pragma)
t.start()
t.join()
# Other thread's connection should NOT have our custom cache_size
assert other_cache[0] != 9999
pool.close_connection()
def test_session_pragma_resets_on_new_connection(self, tmp_path):
"""Session-level pragmas (cache_size) reset on a new connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn1 = pool.get_connection()
conn1.execute("PRAGMA cache_size=9999")
assert conn1.execute("PRAGMA cache_size").fetchone()[0] == 9999
pool.close_connection()
conn2 = pool.get_connection()
cache = conn2.execute("PRAGMA cache_size").fetchone()[0]
# New connection gets default cache_size, not the previous value
assert cache != 9999
pool.close_connection()
def test_wal_mode_via_context_manager(self, tmp_path):
"""WAL mode can be set within a context manager block."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn:
conn.execute("PRAGMA journal_mode=WAL")
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal"
class TestIntegration:
"""Integration tests for real-world usage patterns."""

View File

@@ -0,0 +1,158 @@
"""Unit tests for the web_fetch tool in timmy.tools."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from timmy.tools import web_fetch
class TestWebFetch:
"""Tests for web_fetch function."""
def test_invalid_url_no_scheme(self):
"""URLs without http(s) scheme are rejected."""
result = web_fetch("example.com")
assert "Error: invalid URL" in result
def test_invalid_url_empty(self):
"""Empty URL is rejected."""
result = web_fetch("")
assert "Error: invalid URL" in result
def test_invalid_url_ftp(self):
"""Non-HTTP schemes are rejected."""
result = web_fetch("ftp://example.com")
assert "Error: invalid URL" in result
@patch("timmy.tools.trafilatura", create=True)
@patch("timmy.tools._requests", create=True)
def test_successful_fetch(self, mock_requests, mock_trafilatura):
"""Happy path: fetch + extract returns text."""
# We need to patch at import level inside the function
mock_resp = MagicMock()
mock_resp.text = "<html><body><p>Hello world</p></body></html>"
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = "Hello world"
result = web_fetch("https://example.com")
assert result == "Hello world"
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
def test_truncation(self):
"""Long text is truncated to max_tokens * 4 chars."""
import sys
mock_trafilatura = sys.modules["trafilatura"]
mock_requests = sys.modules["requests"]
long_text = "a" * 20000
mock_resp = MagicMock()
mock_resp.text = "<html><body>" + long_text + "</body></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = long_text
result = web_fetch("https://example.com", max_tokens=100)
# 100 tokens * 4 chars = 400 chars max
assert len(result) < 500
assert "[…truncated" in result
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
def test_extraction_failure(self):
"""Returns error when trafilatura can't extract text."""
import sys
mock_trafilatura = sys.modules["trafilatura"]
mock_requests = sys.modules["requests"]
mock_resp = MagicMock()
mock_resp.text = "<html></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = None
result = web_fetch("https://example.com")
assert "Error: could not extract" in result
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
def test_timeout(self):
"""Timeout errors are handled gracefully."""
mock_requests = MagicMock()
exc_mod = _make_exceptions()
mock_requests.exceptions = exc_mod
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
with patch.dict("sys.modules", {"requests": mock_requests}):
result = web_fetch("https://example.com")
assert "timed out" in result
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
def test_http_error(self):
"""HTTP errors (404, 500, etc.) are handled gracefully."""
mock_requests = MagicMock()
exc_mod = _make_exceptions()
mock_requests.exceptions = exc_mod
mock_response = MagicMock()
mock_response.status_code = 404
mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError(
response=mock_response
)
with patch.dict("sys.modules", {"requests": mock_requests}):
result = web_fetch("https://example.com/nope")
assert "404" in result
def test_missing_requests(self):
"""Graceful error when requests not installed."""
with patch.dict("sys.modules", {"requests": None}):
result = web_fetch("https://example.com")
assert "requests" in result and "not installed" in result
def test_missing_trafilatura(self):
"""Graceful error when trafilatura not installed."""
mock_requests = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}):
result = web_fetch("https://example.com")
assert "trafilatura" in result and "not installed" in result
def test_catalog_entry_exists(self):
"""web_fetch should appear in the tool catalog."""
from timmy.tools import get_all_available_tools
catalog = get_all_available_tools()
assert "web_fetch" in catalog
assert "orchestrator" in catalog["web_fetch"]["available_in"]
def _make_exceptions():
"""Create a mock exceptions module with real exception classes."""
class Timeout(Exception):
pass
class HTTPError(Exception):
def __init__(self, *args, response=None, **kwargs):
super().__init__(*args, **kwargs)
self.response = response
class RequestException(Exception):
pass
mod = MagicMock()
mod.Timeout = Timeout
mod.HTTPError = HTTPError
mod.RequestException = RequestException
return mod

View File

@@ -1,497 +0,0 @@
"""Unit tests for timmy.research — ResearchOrchestrator pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.research import (
DEFAULT_QUERIES_PER_TOPIC,
MemoryInterface,
ResearchOrchestrator,
ResearchResult,
ResearchTools,
SearchSnippet,
_extract_action_items,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestResearchResult:
def test_defaults(self):
r = ResearchResult(topic="test", report="content")
assert r.topic == "test"
assert r.report == "content"
assert r.cache_hit is False
assert r.queries_generated == []
assert r.sources == []
assert r.action_items == []
assert r.duration_ms == 0.0
assert r.timestamp # non-empty
def test_with_data(self):
r = ResearchResult(
topic="AI",
report="report text",
queries_generated=["q1", "q2"],
sources=[{"url": "http://example.com", "title": "Test"}],
action_items=["Do X"],
cache_hit=True,
duration_ms=42.5,
)
assert r.cache_hit is True
assert len(r.sources) == 1
assert r.duration_ms == 42.5
class TestSearchSnippet:
def test_fields(self):
s = SearchSnippet(title="T", url="http://x.com", snippet="text")
assert s.relevance == 0.0
# ── _extract_action_items ────────────────────────────────────────────────────
class TestExtractActionItems:
def test_action_prefix(self):
report = "Some text\nACTION: Do the thing\nMore text"
items = _extract_action_items(report)
assert items == ["Do the thing"]
def test_todo_prefix(self):
report = "TODO: Fix the bug\nTodo: Also this"
items = _extract_action_items(report)
assert items == ["Fix the bug", "Also this"]
def test_checkbox(self):
report = "- [ ] Implement feature\n- [x] Already done"
items = _extract_action_items(report)
assert items == ["Implement feature"]
def test_mixed(self):
report = "ACTION: First\n- [ ] Second\nTODO: Third"
items = _extract_action_items(report)
assert items == ["First", "Second", "Third"]
def test_empty(self):
assert _extract_action_items("No actions here") == []
assert _extract_action_items("") == []
# ── MemoryInterface ──────────────────────────────────────────────────────────
class TestMemoryInterface:
def test_custom_fns(self):
search = MagicMock(return_value=[])
store = MagicMock()
mi = MemoryInterface(search_fn=search, store_fn=store)
assert mi.search_fn is search
assert mi.store_fn is store
def test_defaults_when_import_fails(self):
with patch.dict("sys.modules", {"timmy.memory_system": None}):
mi = MemoryInterface()
# Should have fallback callables
assert callable(mi.search_fn)
assert callable(mi.store_fn)
# Fallback search returns empty
assert mi.search_fn("test") == []
# ── ResearchOrchestrator ─────────────────────────────────────────────────────
def _make_cascade(**overrides):
"""Create a mock cascade router."""
cascade = AsyncMock()
cascade.complete = AsyncMock(
return_value={"content": overrides.get("content", "query1\nquery2\nquery3")}
)
return cascade
def _make_memory(search_results=None, score=0.0):
"""Create a mock memory interface."""
if search_results is None:
search_results = []
search_fn = MagicMock(return_value=search_results)
store_fn = MagicMock()
return MemoryInterface(search_fn=search_fn, store_fn=store_fn)
def _make_tools(search_results=None, fetch_content="Page content"):
"""Create mock research tools."""
web_search = MagicMock(
return_value=search_results
or [
{"title": "Result 1", "url": "http://a.com", "snippet": "Snippet 1"},
{"title": "Result 2", "url": "http://b.com", "snippet": "Snippet 2"},
]
)
web_fetch = MagicMock(return_value=fetch_content)
return ResearchTools(web_search=web_search, web_fetch=web_fetch)
class TestResearchOrchestratorInit:
def test_basic_init(self):
cascade = _make_cascade()
memory = _make_memory()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
assert orch.cascade is cascade
assert orch.memory is memory
assert orch.tools is tools
assert orch._metrics["research_cache_hit"] == 0
assert orch._metrics["research_api_call"] == 0
class TestCheckLocalKnowledge:
@pytest.mark.asyncio
async def test_cache_hit(self):
"""High-confidence memory result returns cached ResearchResult."""
entry = MagicMock()
entry.relevance_score = 0.90
entry.content = "Cached report"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is not None
assert result.cache_hit is True
assert result.report == "Cached report"
@pytest.mark.asyncio
async def test_cache_miss_low_score(self):
"""Low-confidence result returns None."""
entry = MagicMock()
entry.relevance_score = 0.5
entry.content = "Weak match"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss_empty(self):
"""No memory results returns None."""
memory = _make_memory(search_results=[])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_exception_returns_none(self):
"""Memory search exception returns None gracefully."""
memory = MemoryInterface(
search_fn=MagicMock(side_effect=RuntimeError("db error")),
store_fn=MagicMock(),
)
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
class TestGenerateQueries:
@pytest.mark.asyncio
async def test_parses_queries(self):
cascade = _make_cascade(content="query one\nquery two\nquery three")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("AI safety", None, None)
assert queries == ["query one", "query two", "query three"]
@pytest.mark.asyncio
async def test_strips_numbering(self):
cascade = _make_cascade(content="1. First query\n2. Second query\n3) Third")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("topic", None, None)
assert "First query" in queries
assert "Second query" in queries
assert "Third" in queries
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM down"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("fallback topic", None, None)
assert queries == ["fallback topic"]
@pytest.mark.asyncio
async def test_passes_cascade_tier(self):
cascade = _make_cascade(content="q1\nq2")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
await orch._generate_queries("topic", None, "gpt-4")
call_kwargs = cascade.complete.call_args.kwargs
assert call_kwargs.get("model") == "gpt-4"
class TestSearch:
@pytest.mark.asyncio
async def test_collects_snippets(self):
tools = _make_tools()
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1", "q2"])
# 2 results per query, 2 queries, but deduplicated by URL
assert len(snippets) == 2 # same URLs returned for both queries
@pytest.mark.asyncio
async def test_no_search_tool(self):
tools = ResearchTools(web_search=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
@pytest.mark.asyncio
async def test_search_error_handled(self):
tools = ResearchTools(
web_search=MagicMock(side_effect=RuntimeError("network error"))
)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
class TestFetch:
@pytest.mark.asyncio
async def test_fetches_pages(self):
tools = _make_tools(fetch_content="Page body here")
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = [
SearchSnippet(title="P1", url="http://a.com", snippet="s1"),
SearchSnippet(title="P2", url="http://b.com", snippet="s2"),
]
pages = await orch._fetch(snippets)
assert len(pages) == 2
assert pages[0].content == "Page body here"
@pytest.mark.asyncio
async def test_no_fetch_tool(self):
tools = ResearchTools(web_fetch=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
pages = await orch._fetch([SearchSnippet("T", "http://x.com", "s")])
assert pages == []
class TestSynthesize:
@pytest.mark.asyncio
async def test_produces_report(self):
cascade = _make_cascade(content="# Report\nKey findings here")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Report" in report
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM error"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Synthesis failed" in report
assert "topic" in report
class TestCrystallize:
@pytest.mark.asyncio
async def test_stores_in_memory(self):
memory = _make_memory()
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report text")
await orch._crystallize("test", result)
memory.store_fn.assert_called_once()
call_kwargs = memory.store_fn.call_args
assert call_kwargs.kwargs.get("context_type") == "research"
assert call_kwargs.kwargs.get("source") == "research_orchestrator"
@pytest.mark.asyncio
async def test_store_error_handled(self):
memory = MemoryInterface(
search_fn=MagicMock(return_value=[]),
store_fn=MagicMock(side_effect=RuntimeError("db error")),
)
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report")
# Should not raise
await orch._crystallize("test", result)
class TestWriteArtifact:
@pytest.mark.asyncio
async def test_no_action_items_skips(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(topic="test", report="r", action_items=[])
# Should complete without any calls
await orch._write_artifact(result)
@pytest.mark.asyncio
async def test_creates_issues(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(
topic="test", report="r", action_items=["Fix the thing"]
)
with patch("timmy.research._create_gitea_issues") as mock_create:
await orch._write_artifact(result)
mock_create.assert_called_once_with(result)
class TestFullPipeline:
@pytest.mark.asyncio
async def test_cache_hit_short_circuits(self):
"""When memory has a high-confidence match, skip web search."""
entry = MagicMock()
entry.relevance_score = 0.95
entry.content = "Previously researched content"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
result = await orch.run("cached topic")
assert result.cache_hit is True
assert result.report == "Previously researched content"
# Cascade should NOT have been called (no query generation or synthesis)
cascade.complete.assert_not_called()
assert orch._metrics["research_cache_hit"] == 1
@pytest.mark.asyncio
async def test_full_pipeline_no_tools(self):
"""Pipeline completes even without web tools (graceful degradation)."""
memory = _make_memory()
cascade = AsyncMock()
# First call: generate queries, second: synthesize
cascade.complete = AsyncMock(
side_effect=[
{"content": "query 1\nquery 2"},
{"content": "# Report\nACTION: Do something"},
]
)
tools = ResearchTools() # No web tools
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert "Report" in result.report
assert result.action_items == ["Do something"]
assert result.duration_ms > 0
assert orch._metrics["research_api_call"] == 1
memory.store_fn.assert_called_once()
@pytest.mark.asyncio
async def test_full_pipeline_with_tools(self):
"""Full pipeline with search and fetch tools."""
memory = _make_memory()
cascade = AsyncMock()
cascade.complete = AsyncMock(
side_effect=[
{"content": "search query 1\nsearch query 2"},
{"content": "# Full Report\nTODO: Review findings"},
]
)
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert len(result.queries_generated) == 2
assert len(result.sources) > 0
assert result.action_items == ["Review findings"]
@pytest.mark.asyncio
async def test_get_metrics(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
metrics = orch.get_metrics()
assert "research_cache_hit" in metrics
assert "research_api_call" in metrics
class TestCreateGiteaIssues:
def test_no_token_skips(self):
"""No Gitea token configured — silently skips."""
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="t", report="r", action_items=["item"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = ""
mock_settings.gitea_url = ""
with patch("timmy.research.settings", mock_settings):
# Should not raise
_create_gitea_issues(result)
def test_creates_issue_on_success(self):
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="AI", report="r", action_items=["Deploy model"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_requests_mod = MagicMock()
mock_requests_mod.post.return_value = mock_resp
with (
patch("timmy.research.settings", mock_settings),
patch.dict("sys.modules", {"requests": mock_requests_mod}),
):
_create_gitea_issues(result)
mock_requests_mod.post.assert_called_once()
call_kwargs = mock_requests_mod.post.call_args
assert "[research]" in call_kwargs.kwargs["json"]["title"]

View File

@@ -0,0 +1,410 @@
"""Unit tests for the skill discovery pipeline.
Tests the discovery engine's core logic: action clustering, skill extraction,
database persistence, deduplication, and status management.
"""
from __future__ import annotations
import json
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.skill_discovery import (
DiscoveredSkill,
SkillDiscoveryEngine,
)
@pytest.fixture
def engine():
"""Create a fresh SkillDiscoveryEngine for each test."""
return SkillDiscoveryEngine(confidence_threshold=0.7, min_actions=2)
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
"""Use a temporary database for each test."""
db_path = tmp_path / "skills.db"
monkeypatch.setattr("timmy.skill_discovery.DB_PATH", db_path)
return db_path
# ---------------------------------------------------------------------------
# DiscoveredSkill dataclass
# ---------------------------------------------------------------------------
class TestDiscoveredSkill:
def test_defaults(self):
skill = DiscoveredSkill()
assert skill.name == ""
assert skill.status == "discovered"
assert skill.confidence == 0.0
assert skill.id.startswith("skill_")
def test_to_dict(self):
skill = DiscoveredSkill(name="Test Skill", confidence=0.85)
d = skill.to_dict()
assert d["name"] == "Test Skill"
assert d["confidence"] == 0.85
assert "id" in d
assert "created_at" in d
def test_custom_fields(self):
skill = DiscoveredSkill(
name="Code Review",
category="coding",
confidence=0.92,
template="Step 1: Read code\nStep 2: Analyze",
)
assert skill.category == "coding"
assert "Step 1" in skill.template
# ---------------------------------------------------------------------------
# Database operations
# ---------------------------------------------------------------------------
class TestDatabase:
def test_save_and_list(self, engine):
skill = DiscoveredSkill(
name="Git Workflow",
description="Automates git operations",
category="devops",
confidence=0.88,
)
engine._save_skill(skill)
skills = engine.list_skills()
assert len(skills) == 1
assert skills[0]["name"] == "Git Workflow"
assert skills[0]["category"] == "devops"
def test_list_by_status(self, engine):
s1 = DiscoveredSkill(name="Skill A", status="discovered")
s2 = DiscoveredSkill(name="Skill B", status="confirmed")
engine._save_skill(s1)
engine._save_skill(s2)
discovered = engine.list_skills(status="discovered")
assert len(discovered) == 1
assert discovered[0]["name"] == "Skill A"
confirmed = engine.list_skills(status="confirmed")
assert len(confirmed) == 1
assert confirmed[0]["name"] == "Skill B"
def test_get_skill(self, engine):
skill = DiscoveredSkill(name="Find Me")
engine._save_skill(skill)
found = engine.get_skill(skill.id)
assert found is not None
assert found["name"] == "Find Me"
def test_get_skill_not_found(self, engine):
assert engine.get_skill("nonexistent") is None
def test_update_status(self, engine):
skill = DiscoveredSkill(name="Status Test")
engine._save_skill(skill)
assert engine.update_status(skill.id, "confirmed")
found = engine.get_skill(skill.id)
assert found["status"] == "confirmed"
def test_update_invalid_status(self, engine):
skill = DiscoveredSkill(name="Invalid Status")
engine._save_skill(skill)
assert not engine.update_status(skill.id, "bogus")
def test_skill_count(self, engine):
engine._save_skill(DiscoveredSkill(name="A", status="discovered"))
engine._save_skill(DiscoveredSkill(name="B", status="discovered"))
engine._save_skill(DiscoveredSkill(name="C", status="confirmed"))
counts = engine.skill_count()
assert counts["discovered"] == 2
assert counts["confirmed"] == 1
def test_list_limit(self, engine):
for i in range(5):
engine._save_skill(DiscoveredSkill(name=f"Skill {i}"))
assert len(engine.list_skills(limit=3)) == 3
# ---------------------------------------------------------------------------
# Action clustering
# ---------------------------------------------------------------------------
class TestActionClustering:
def test_empty_entries(self, engine):
assert engine._cluster_action_sequences([]) == []
def test_single_sequence(self, engine):
now = datetime.now()
entries = [
{"type": "tool_call", "tool": "read", "timestamp": now.isoformat()},
{
"type": "tool_call",
"tool": "write",
"timestamp": (now + timedelta(seconds=30)).isoformat(),
},
]
sequences = engine._cluster_action_sequences(entries)
assert len(sequences) == 1
assert len(sequences[0]) == 2
def test_split_by_gap(self, engine):
now = datetime.now()
entries = [
{"type": "tool_call", "tool": "read", "timestamp": now.isoformat()},
{
"type": "tool_call",
"tool": "write",
"timestamp": (now + timedelta(seconds=600)).isoformat(),
},
]
sequences = engine._cluster_action_sequences(entries, max_gap_seconds=300)
assert len(sequences) == 2
def test_bad_timestamps(self, engine):
entries = [
{"type": "tool_call", "tool": "read", "timestamp": "not-a-date"},
{"type": "tool_call", "tool": "write", "timestamp": "also-bad"},
]
sequences = engine._cluster_action_sequences(entries)
# Should still produce sequences (split on bad parse)
assert len(sequences) >= 1
# ---------------------------------------------------------------------------
# LLM response parsing
# ---------------------------------------------------------------------------
class TestLLMParsing:
def test_parse_valid_json(self, engine):
response = json.dumps(
{
"name": "API Search",
"description": "Searches APIs efficiently",
"category": "research",
"template": "1. Identify API\n2. Call endpoint",
"confidence": 0.85,
}
)
skill = engine._parse_llm_response(response, [])
assert skill is not None
assert skill.name == "API Search"
assert skill.confidence == 0.85
assert skill.category == "research"
def test_parse_with_markdown_fences(self, engine):
response = '```json\n{"name": "Fenced", "confidence": 0.9}\n```'
skill = engine._parse_llm_response(response, [])
assert skill is not None
assert skill.name == "Fenced"
def test_parse_invalid_json(self, engine):
assert engine._parse_llm_response("not json", []) is None
def test_parse_empty(self, engine):
assert engine._parse_llm_response("", []) is None
# ---------------------------------------------------------------------------
# Heuristic extraction
# ---------------------------------------------------------------------------
class TestHeuristicExtraction:
def test_extract_from_tool_calls(self, engine):
seq = [
{"type": "tool_call", "tool": "git_commit", "result": "ok"},
{"type": "tool_call", "tool": "git_push", "result": "ok"},
{"type": "tool_call", "tool": "git_commit", "result": "ok"},
]
skill = engine._heuristic_extraction(seq)
assert skill is not None
assert "Git Commit" in skill.name
assert skill.confidence == 0.5
def test_extract_no_tool_calls(self, engine):
seq = [{"type": "message", "role": "user", "content": "hello"}]
assert engine._heuristic_extraction(seq) is None
# ---------------------------------------------------------------------------
# Deduplication
# ---------------------------------------------------------------------------
class TestDeduplication:
def test_not_duplicate(self, engine):
skill = DiscoveredSkill(name="Unique Skill")
assert not engine._is_duplicate(skill)
def test_is_duplicate(self, engine):
skill = DiscoveredSkill(name="Duplicate Check")
engine._save_skill(skill)
new_skill = DiscoveredSkill(name="Duplicate Check")
assert engine._is_duplicate(new_skill)
def test_rejected_not_duplicate(self, engine):
skill = DiscoveredSkill(name="Rejected Skill", status="rejected")
engine._save_skill(skill)
new_skill = DiscoveredSkill(name="Rejected Skill")
assert not engine._is_duplicate(new_skill)
# ---------------------------------------------------------------------------
# Format actions
# ---------------------------------------------------------------------------
class TestFormatActions:
def test_format_tool_call(self, engine):
seq = [{"type": "tool_call", "tool": "shell", "result": "output text"}]
text = engine._format_actions(seq)
assert "shell" in text
assert "output text" in text
def test_format_message(self, engine):
seq = [{"type": "message", "role": "timmy", "content": "I analyzed the code"}]
text = engine._format_actions(seq)
assert "I analyzed the code" in text
def test_format_decision(self, engine):
seq = [{"type": "decision", "decision": "Use async"}]
text = engine._format_actions(seq)
assert "Use async" in text
# ---------------------------------------------------------------------------
# Scan integration (mocked)
# ---------------------------------------------------------------------------
class TestScan:
@pytest.mark.asyncio
async def test_scan_too_few_actions(self, engine):
with patch.object(engine, "_load_recent_successful_actions", return_value=[]):
result = await engine.scan()
assert result == []
@pytest.mark.asyncio
async def test_scan_discovers_skill(self, engine):
now = datetime.now()
entries = [
{
"type": "tool_call",
"tool": "search",
"result": "found results",
"timestamp": now.isoformat(),
},
{
"type": "tool_call",
"tool": "analyze",
"result": "analysis complete",
"timestamp": (now + timedelta(seconds=10)).isoformat(),
},
{
"type": "tool_call",
"tool": "report",
"result": "report generated",
"timestamp": (now + timedelta(seconds=20)).isoformat(),
},
]
llm_response = json.dumps(
{
"name": "Research Pipeline",
"description": "Search, analyze, and report",
"category": "research",
"template": "1. Search\n2. Analyze\n3. Report",
"confidence": 0.9,
}
)
with (
patch.object(engine, "_load_recent_successful_actions", return_value=entries),
patch(
"infrastructure.router.cascade.get_router",
return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})),
),
patch.object(engine, "_notify", new_callable=AsyncMock),
patch.object(engine, "_write_skill_file"),
):
result = await engine.scan()
assert len(result) == 1
assert result[0].name == "Research Pipeline"
assert result[0].confidence == 0.9
@pytest.mark.asyncio
async def test_scan_skips_low_confidence(self, engine):
now = datetime.now()
entries = [
{
"type": "tool_call",
"tool": "a",
"result": "ok",
"timestamp": now.isoformat(),
},
{
"type": "tool_call",
"tool": "b",
"result": "ok",
"timestamp": (now + timedelta(seconds=10)).isoformat(),
},
]
llm_response = json.dumps(
{"name": "Low Conf", "confidence": 0.3, "category": "general", "template": "..."}
)
with (
patch.object(engine, "_load_recent_successful_actions", return_value=entries),
patch(
"infrastructure.router.cascade.get_router",
return_value=MagicMock(complete=AsyncMock(return_value={"content": llm_response})),
),
):
result = await engine.scan()
assert result == []
@pytest.mark.asyncio
async def test_scan_falls_back_to_heuristic(self, engine):
engine.confidence_threshold = 0.4 # Lower for heuristic
now = datetime.now()
entries = [
{
"type": "tool_call",
"tool": "deploy",
"result": "ok",
"timestamp": now.isoformat(),
},
{
"type": "tool_call",
"tool": "deploy",
"result": "ok",
"timestamp": (now + timedelta(seconds=10)).isoformat(),
},
]
with (
patch.object(engine, "_load_recent_successful_actions", return_value=entries),
patch(
"infrastructure.router.cascade.get_router",
return_value=MagicMock(
complete=AsyncMock(side_effect=Exception("LLM unavailable"))
),
),
patch.object(engine, "_notify", new_callable=AsyncMock),
patch.object(engine, "_write_skill_file"),
):
result = await engine.scan()
assert len(result) == 1
assert "Deploy" in result[0].name
assert result[0].confidence == 0.5