forked from Rockachopa/Timmy-time-dashboard
Compare commits
1 Commits
claude/iss
...
gemini/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0bf778197 |
@@ -289,14 +289,6 @@ class Settings(BaseSettings):
|
||||
thinking_memory_check_every: int = 50 # check memory status every Nth thought
|
||||
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
|
||||
|
||||
# ── Dreaming Mode ─────────────────────────────────────────────────
|
||||
# When enabled, the agent replays past sessions during idle time to
|
||||
# simulate alternative actions and propose behavioural rules.
|
||||
dreaming_enabled: bool = True
|
||||
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
|
||||
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
|
||||
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
|
||||
|
||||
# ── Gitea Integration ─────────────────────────────────────────────
|
||||
# Local Gitea instance for issue tracking and self-improvement.
|
||||
# These values are passed as env vars to the gitea-mcp server process.
|
||||
|
||||
@@ -35,7 +35,6 @@ from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
|
||||
from dashboard.routes.daily_run import router as daily_run_router
|
||||
from dashboard.routes.db_explorer import router as db_explorer_router
|
||||
from dashboard.routes.discord import router as discord_router
|
||||
from dashboard.routes.dreaming import router as dreaming_router
|
||||
from dashboard.routes.experiments import router as experiments_router
|
||||
from dashboard.routes.grok import router as grok_router
|
||||
from dashboard.routes.health import router as health_router
|
||||
@@ -220,36 +219,6 @@ async def _loop_qa_scheduler() -> None:
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
async def _dreaming_scheduler() -> None:
|
||||
"""Background task: run idle-time dreaming cycles.
|
||||
|
||||
When the system has been idle for ``dreaming_idle_threshold_minutes``,
|
||||
the dreaming engine replays a past session and simulates alternatives.
|
||||
"""
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
await asyncio.sleep(15) # Stagger after loop QA scheduler
|
||||
|
||||
while True:
|
||||
try:
|
||||
if settings.dreaming_enabled:
|
||||
await asyncio.wait_for(
|
||||
dreaming_engine.dream_once(),
|
||||
timeout=settings.dreaming_timeout_seconds + 10,
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
"Dreaming cycle timed out after %ds",
|
||||
settings.dreaming_timeout_seconds,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Dreaming scheduler error: %s", exc)
|
||||
|
||||
await asyncio.sleep(settings.dreaming_cycle_seconds)
|
||||
|
||||
|
||||
_PRESENCE_POLL_SECONDS = 30
|
||||
_PRESENCE_INITIAL_DELAY = 3
|
||||
|
||||
@@ -410,7 +379,6 @@ def _startup_background_tasks() -> list[asyncio.Task]:
|
||||
asyncio.create_task(_briefing_scheduler()),
|
||||
asyncio.create_task(_thinking_scheduler()),
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_dreaming_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
asyncio.create_task(_start_chat_integrations_background()),
|
||||
]
|
||||
@@ -673,7 +641,6 @@ app.include_router(daily_run_router)
|
||||
app.include_router(quests_router)
|
||||
app.include_router(scorecards_router)
|
||||
app.include_router(sovereignty_metrics_router)
|
||||
app.include_router(dreaming_router)
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
"""Dreaming mode dashboard routes.
|
||||
|
||||
GET /dreaming/api/status — JSON status of the dreaming engine
|
||||
GET /dreaming/api/recent — JSON list of recent dream records
|
||||
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
|
||||
GET /dreaming/partial — HTMX partial: dreaming status panel
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
|
||||
from dashboard.templating import templates
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
|
||||
|
||||
|
||||
@router.get("/api/status", response_class=JSONResponse)
|
||||
async def dreaming_status():
|
||||
"""Return current dreaming engine status as JSON."""
|
||||
return dreaming_engine.get_status()
|
||||
|
||||
|
||||
@router.get("/api/recent", response_class=JSONResponse)
|
||||
async def dreaming_recent(limit: int = 10):
|
||||
"""Return recent dream records as JSON."""
|
||||
dreams = dreaming_engine.get_recent_dreams(limit=limit)
|
||||
return [
|
||||
{
|
||||
"id": d.id,
|
||||
"session_excerpt": d.session_excerpt[:200],
|
||||
"decision_point": d.decision_point[:200],
|
||||
"simulation": d.simulation,
|
||||
"proposed_rule": d.proposed_rule,
|
||||
"created_at": d.created_at,
|
||||
}
|
||||
for d in dreams
|
||||
]
|
||||
|
||||
|
||||
@router.post("/api/trigger", response_class=JSONResponse)
|
||||
async def dreaming_trigger():
|
||||
"""Manually trigger a dream cycle (bypasses idle check).
|
||||
|
||||
Useful for testing and manual inspection. Forces idle state temporarily.
|
||||
"""
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
from config import settings
|
||||
|
||||
# Temporarily back-date last activity to appear idle
|
||||
original_time = dreaming_engine._last_activity_time
|
||||
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
|
||||
minutes=settings.dreaming_idle_threshold_minutes + 1
|
||||
)
|
||||
|
||||
try:
|
||||
dream = await dreaming_engine.dream_once()
|
||||
finally:
|
||||
dreaming_engine._last_activity_time = original_time
|
||||
|
||||
if dream:
|
||||
return {
|
||||
"status": "ok",
|
||||
"dream_id": dream.id,
|
||||
"proposed_rule": dream.proposed_rule,
|
||||
"simulation": dream.simulation[:200],
|
||||
}
|
||||
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
|
||||
|
||||
|
||||
@router.get("/partial", response_class=HTMLResponse)
|
||||
async def dreaming_partial(request: Request):
|
||||
"""HTMX partial: dreaming status panel for the dashboard."""
|
||||
status = dreaming_engine.get_status()
|
||||
recent = dreaming_engine.get_recent_dreams(limit=5)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/dreaming_status.html",
|
||||
{"status": status, "recent_dreams": recent},
|
||||
)
|
||||
@@ -1,32 +0,0 @@
|
||||
{% if not status.enabled %}
|
||||
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
|
||||
{% elif status.dreaming %}
|
||||
<div class="dream-active">
|
||||
<span class="dream-pulse"></span>
|
||||
<span class="dream-label">DREAMING</span>
|
||||
<div class="dream-summary">{{ status.current_summary }}</div>
|
||||
</div>
|
||||
{% elif status.idle %}
|
||||
<div class="dream-idle">
|
||||
<span class="dream-dot dream-dot-idle"></span>
|
||||
<span class="dream-label-idle">IDLE</span>
|
||||
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
|
||||
</div>
|
||||
{% else %}
|
||||
<div class="dream-standby">
|
||||
<span class="dream-dot dream-dot-standby"></span>
|
||||
<span class="dream-label-standby">STANDBY</span>
|
||||
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if recent_dreams %}
|
||||
<div class="dream-history mt-2">
|
||||
{% for d in recent_dreams %}
|
||||
<div class="dream-record">
|
||||
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
|
||||
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
@@ -1,801 +0,0 @@
|
||||
"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself.
|
||||
|
||||
Timmy's dispatch system: knows what agents are available, what they're good
|
||||
at, and how to send them work. Uses Gitea labels and issue comments to assign
|
||||
tasks and track completion.
|
||||
|
||||
Dispatch flow:
|
||||
1. Match task type to agent strengths
|
||||
2. Check agent availability (idle or working?)
|
||||
3. Dispatch task with full context (issue link, requirements, criteria)
|
||||
4. Log assignment as a Gitea comment
|
||||
5. Monitor for completion or timeout
|
||||
6. Review output quality
|
||||
7. If output fails QA → reassign or escalate
|
||||
|
||||
Agent interfaces:
|
||||
- Claude Code → ``claude-ready`` Gitea label + issue comment
|
||||
- Kimi Code → ``kimi-ready`` Gitea label + issue comment
|
||||
- Agent APIs → HTTP POST to external endpoint
|
||||
- Timmy (self) → direct local invocation
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.dispatcher import dispatch_task, TaskType, AgentType
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
title="Design the LLM router",
|
||||
description="We need a cascade router...",
|
||||
acceptance_criteria=["Failover works", "Metrics exposed"],
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Enumerations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class AgentType(str, Enum):
|
||||
"""Known agents in the swarm."""
|
||||
|
||||
CLAUDE_CODE = "claude_code"
|
||||
KIMI_CODE = "kimi_code"
|
||||
AGENT_API = "agent_api"
|
||||
TIMMY = "timmy"
|
||||
|
||||
|
||||
class TaskType(str, Enum):
|
||||
"""Categories of engineering work."""
|
||||
|
||||
# Claude Code strengths
|
||||
ARCHITECTURE = "architecture"
|
||||
REFACTORING = "refactoring"
|
||||
COMPLEX_REASONING = "complex_reasoning"
|
||||
CODE_REVIEW = "code_review"
|
||||
|
||||
# Kimi Code strengths
|
||||
PARALLEL_IMPLEMENTATION = "parallel_implementation"
|
||||
ROUTINE_CODING = "routine_coding"
|
||||
FAST_ITERATION = "fast_iteration"
|
||||
|
||||
# Agent API strengths
|
||||
RESEARCH = "research"
|
||||
ANALYSIS = "analysis"
|
||||
SPECIALIZED = "specialized"
|
||||
|
||||
# Timmy strengths
|
||||
TRIAGE = "triage"
|
||||
PLANNING = "planning"
|
||||
CREATIVE = "creative"
|
||||
ORCHESTRATION = "orchestration"
|
||||
|
||||
|
||||
class DispatchStatus(str, Enum):
|
||||
"""Lifecycle state of a dispatched task."""
|
||||
|
||||
PENDING = "pending"
|
||||
ASSIGNED = "assigned"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
ESCALATED = "escalated"
|
||||
TIMED_OUT = "timed_out"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class AgentSpec:
|
||||
"""Capabilities and limits for a single agent."""
|
||||
|
||||
name: AgentType
|
||||
display_name: str
|
||||
strengths: frozenset[TaskType]
|
||||
gitea_label: str | None # label to apply when dispatching
|
||||
max_concurrent: int = 1
|
||||
interface: str = "gitea" # "gitea" | "api" | "local"
|
||||
api_endpoint: str | None = None # for interface="api"
|
||||
|
||||
|
||||
#: Authoritative agent registry — all known agents and their capabilities.
|
||||
AGENT_REGISTRY: dict[AgentType, AgentSpec] = {
|
||||
AgentType.CLAUDE_CODE: AgentSpec(
|
||||
name=AgentType.CLAUDE_CODE,
|
||||
display_name="Claude Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.ARCHITECTURE,
|
||||
TaskType.REFACTORING,
|
||||
TaskType.COMPLEX_REASONING,
|
||||
TaskType.CODE_REVIEW,
|
||||
}
|
||||
),
|
||||
gitea_label="claude-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.KIMI_CODE: AgentSpec(
|
||||
name=AgentType.KIMI_CODE,
|
||||
display_name="Kimi Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.PARALLEL_IMPLEMENTATION,
|
||||
TaskType.ROUTINE_CODING,
|
||||
TaskType.FAST_ITERATION,
|
||||
}
|
||||
),
|
||||
gitea_label="kimi-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.AGENT_API: AgentSpec(
|
||||
name=AgentType.AGENT_API,
|
||||
display_name="Agent API",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.RESEARCH,
|
||||
TaskType.ANALYSIS,
|
||||
TaskType.SPECIALIZED,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=5,
|
||||
interface="api",
|
||||
),
|
||||
AgentType.TIMMY: AgentSpec(
|
||||
name=AgentType.TIMMY,
|
||||
display_name="Timmy",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.TRIAGE,
|
||||
TaskType.PLANNING,
|
||||
TaskType.CREATIVE,
|
||||
TaskType.ORCHESTRATION,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=1,
|
||||
interface="local",
|
||||
),
|
||||
}
|
||||
|
||||
#: Map from task type to preferred agent (primary routing table).
|
||||
_TASK_ROUTING: dict[TaskType, AgentType] = {
|
||||
TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE,
|
||||
TaskType.REFACTORING: AgentType.CLAUDE_CODE,
|
||||
TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE,
|
||||
TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE,
|
||||
TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE,
|
||||
TaskType.ROUTINE_CODING: AgentType.KIMI_CODE,
|
||||
TaskType.FAST_ITERATION: AgentType.KIMI_CODE,
|
||||
TaskType.RESEARCH: AgentType.AGENT_API,
|
||||
TaskType.ANALYSIS: AgentType.AGENT_API,
|
||||
TaskType.SPECIALIZED: AgentType.AGENT_API,
|
||||
TaskType.TRIAGE: AgentType.TIMMY,
|
||||
TaskType.PLANNING: AgentType.TIMMY,
|
||||
TaskType.CREATIVE: AgentType.TIMMY,
|
||||
TaskType.ORCHESTRATION: AgentType.TIMMY,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch result
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Outcome of a dispatch call."""
|
||||
|
||||
task_type: TaskType
|
||||
agent: AgentType
|
||||
issue_number: int | None
|
||||
status: DispatchStatus
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
error: str | None = None
|
||||
retry_count: int = 0
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def success(self) -> bool: # noqa: D401
|
||||
return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routing logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def select_agent(task_type: TaskType) -> AgentType:
|
||||
"""Return the best agent for *task_type* based on the routing table.
|
||||
|
||||
Args:
|
||||
task_type: The category of engineering work to be done.
|
||||
|
||||
Returns:
|
||||
The :class:`AgentType` best suited to handle this task.
|
||||
"""
|
||||
return _TASK_ROUTING.get(task_type, AgentType.TIMMY)
|
||||
|
||||
|
||||
def infer_task_type(title: str, description: str = "") -> TaskType:
|
||||
"""Heuristic: guess the most appropriate :class:`TaskType` from text.
|
||||
|
||||
Scans *title* and *description* for keyword signals and returns the
|
||||
strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Longer task description (optional).
|
||||
|
||||
Returns:
|
||||
The inferred :class:`TaskType`.
|
||||
"""
|
||||
text = (title + " " + description).lower()
|
||||
|
||||
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
|
||||
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
|
||||
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
|
||||
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
|
||||
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
|
||||
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
|
||||
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
|
||||
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
|
||||
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
|
||||
(TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})),
|
||||
(TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})),
|
||||
(TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})),
|
||||
(TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})),
|
||||
]
|
||||
|
||||
for task_type, keywords in _SIGNALS:
|
||||
if any(kw in text for kw in keywords):
|
||||
return task_type
|
||||
|
||||
return TaskType.ROUTINE_CODING
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _post_gitea_comment(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
body: str,
|
||||
) -> int | None:
|
||||
"""Post a comment on a Gitea issue and return the comment ID."""
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
|
||||
headers=headers,
|
||||
json={"body": body},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
return resp.json().get("id")
|
||||
logger.warning(
|
||||
"Comment on #%s returned %s: %s",
|
||||
issue_number,
|
||||
resp.status_code,
|
||||
resp.text[:200],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post comment on #%s: %s", issue_number, exc)
|
||||
return None
|
||||
|
||||
|
||||
async def _apply_gitea_label(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
label_name: str,
|
||||
label_color: str = "#0075ca",
|
||||
) -> bool:
|
||||
"""Ensure *label_name* exists and apply it to an issue.
|
||||
|
||||
Returns True if the label was successfully applied.
|
||||
"""
|
||||
# Resolve or create the label
|
||||
label_id: int | None = None
|
||||
try:
|
||||
resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers)
|
||||
if resp.status_code == 200:
|
||||
for lbl in resp.json():
|
||||
if lbl.get("name") == label_name:
|
||||
label_id = lbl["id"]
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to list labels: %s", exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/labels",
|
||||
headers=headers,
|
||||
json={"name": label_name, "color": label_color},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
label_id = resp.json().get("id")
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create label %r: %s", label_name, exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
return False
|
||||
|
||||
# Apply label to the issue
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def _poll_issue_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Poll a Gitea issue until closed (completed) or timeout.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to watch.
|
||||
poll_interval: Seconds between polls.
|
||||
max_wait: Maximum total seconds to wait.
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` if the issue was closed,
|
||||
:attr:`DispatchStatus.TIMED_OUT` otherwise.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
logger.warning("poll_issue_completion: missing dependency: %s", exc)
|
||||
return DispatchStatus.FAILED
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
|
||||
|
||||
elapsed = 0
|
||||
while elapsed < max_wait:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(issue_url, headers=headers)
|
||||
if resp.status_code == 200 and resp.json().get("state") == "closed":
|
||||
logger.info("Issue #%s closed — task completed", issue_number)
|
||||
return DispatchStatus.COMPLETED
|
||||
except Exception as exc:
|
||||
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
|
||||
logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait)
|
||||
return DispatchStatus.TIMED_OUT
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core dispatch functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _dispatch_via_gitea(
|
||||
agent: AgentType,
|
||||
issue_number: int,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
) -> DispatchResult:
|
||||
"""Assign a task by applying a Gitea label and posting an assignment comment.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
issue_number: Gitea issue to assign.
|
||||
title: Short task title.
|
||||
description: Full task description.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
return DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"Missing dependency: {exc}",
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea integration not configured (no token or disabled).",
|
||||
)
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
# 1. Apply agent label (if applicable)
|
||||
if spec.gitea_label:
|
||||
ok = await _apply_gitea_label(
|
||||
client, base_url, repo, headers, issue_number, spec.gitea_label
|
||||
)
|
||||
if ok:
|
||||
label_applied = spec.gitea_label
|
||||
logger.info(
|
||||
"Applied label %r to issue #%s for %s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Could not apply label %r to issue #%s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
# 2. Post assignment comment
|
||||
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
|
||||
comment_body = (
|
||||
f"## Assigned to {spec.display_name}\n\n"
|
||||
f"**Task type:** `{task_type.value}`\n\n"
|
||||
f"**Description:**\n{description}\n\n"
|
||||
f"**Acceptance criteria:**\n{criteria_md}\n\n"
|
||||
f"---\n*Dispatched by Timmy agent dispatcher.*"
|
||||
)
|
||||
comment_id = await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, comment_body
|
||||
)
|
||||
|
||||
if comment_id is not None or label_applied is not None:
|
||||
logger.info(
|
||||
"Dispatched issue #%s to %s (label=%r, comment=%s)",
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
label_applied,
|
||||
comment_id,
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=comment_id,
|
||||
label_applied=label_applied,
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Failed to apply label and post comment — check Gitea connectivity.",
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_via_api(
|
||||
agent: AgentType,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
issue_number: int | None = None,
|
||||
endpoint: str | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Dispatch a task to an external HTTP API agent.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: List of acceptance criteria.
|
||||
issue_number: Optional Gitea issue for cross-referencing.
|
||||
endpoint: Override API endpoint URL (uses spec default if omitted).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
url = endpoint or spec.api_endpoint
|
||||
|
||||
if not url:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"No API endpoint configured for agent {agent.value}.",
|
||||
)
|
||||
|
||||
payload = {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"acceptance_criteria": acceptance_criteria,
|
||||
"issue_number": issue_number,
|
||||
"agent": agent.value,
|
||||
"task_type": task_type.value,
|
||||
}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(url, json=payload)
|
||||
|
||||
if resp.status_code in (200, 201, 202):
|
||||
logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"response": resp.json() if resp.content else {}},
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"API agent returned {resp.status_code}: {resp.text[:200]}",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("API dispatch to %s failed: %s", url, exc)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_local(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
issue_number: int | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Handle a task locally — Timmy processes it directly.
|
||||
|
||||
This is a lightweight stub. Real local execution should be wired
|
||||
into the agentic loop or a dedicated Timmy tool.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: Acceptance criteria list.
|
||||
issue_number: Optional Gitea issue number for logging.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` with ASSIGNED status (local execution is
|
||||
assumed to succeed at dispatch time).
|
||||
"""
|
||||
task_type = infer_task_type(title, description)
|
||||
logger.info(
|
||||
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"local": True, "description": description},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def dispatch_task(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
task_type: TaskType | None = None,
|
||||
agent: AgentType | None = None,
|
||||
issue_number: int | None = None,
|
||||
api_endpoint: str | None = None,
|
||||
max_retries: int = 1,
|
||||
) -> DispatchResult:
|
||||
"""Route a task to the best available agent.
|
||||
|
||||
This is the primary entry point. Callers can either specify the
|
||||
*agent* and *task_type* explicitly or let the dispatcher infer them
|
||||
from the *title* and *description*.
|
||||
|
||||
Args:
|
||||
title: Short human-readable task title.
|
||||
description: Full task description with context.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
task_type: Override automatic task type inference.
|
||||
agent: Override automatic agent selection.
|
||||
issue_number: Gitea issue number to log the assignment on.
|
||||
api_endpoint: Override API endpoint for AGENT_API dispatches.
|
||||
max_retries: Number of retry attempts on failure (default 1).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the final dispatch outcome.
|
||||
|
||||
Example::
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
title="Build the cascade LLM router",
|
||||
description="We need automatic failover...",
|
||||
acceptance_criteria=["Circuit breaker works", "Metrics exposed"],
|
||||
)
|
||||
if result.success:
|
||||
print(f"Assigned to {result.agent.value}")
|
||||
"""
|
||||
criteria = acceptance_criteria or []
|
||||
|
||||
if not title.strip():
|
||||
return DispatchResult(
|
||||
task_type=task_type or TaskType.ROUTINE_CODING,
|
||||
agent=agent or AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="`title` is required.",
|
||||
)
|
||||
|
||||
resolved_type = task_type or infer_task_type(title, description)
|
||||
resolved_agent = agent or select_agent(resolved_type)
|
||||
|
||||
logger.info(
|
||||
"Dispatching task %r → %s (type=%s, issue=#%s)",
|
||||
title[:60],
|
||||
resolved_agent.value,
|
||||
resolved_type.value,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[resolved_agent]
|
||||
|
||||
last_result: DispatchResult | None = None
|
||||
for attempt in range(max_retries + 1):
|
||||
if attempt > 0:
|
||||
logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60])
|
||||
|
||||
if spec.interface == "gitea" and issue_number is not None:
|
||||
result = await _dispatch_via_gitea(
|
||||
resolved_agent, issue_number, title, description, criteria
|
||||
)
|
||||
elif spec.interface == "api":
|
||||
result = await _dispatch_via_api(
|
||||
resolved_agent, title, description, criteria, issue_number, api_endpoint
|
||||
)
|
||||
else:
|
||||
result = await _dispatch_local(title, description, criteria, issue_number)
|
||||
|
||||
result.retry_count = attempt
|
||||
last_result = result
|
||||
|
||||
if result.success:
|
||||
return result
|
||||
|
||||
logger.warning(
|
||||
"Dispatch attempt %d failed for task %r: %s",
|
||||
attempt + 1,
|
||||
title[:60],
|
||||
result.error,
|
||||
)
|
||||
|
||||
# All attempts exhausted — escalate
|
||||
assert last_result is not None
|
||||
last_result.status = DispatchStatus.ESCALATED
|
||||
logger.error(
|
||||
"Task %r escalated after %d failed attempt(s): %s",
|
||||
title[:60],
|
||||
max_retries + 1,
|
||||
last_result.error,
|
||||
)
|
||||
|
||||
# Try to log the escalation on the issue
|
||||
if issue_number is not None:
|
||||
await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error")
|
||||
|
||||
return last_result
|
||||
|
||||
|
||||
async def _log_escalation(
|
||||
issue_number: int,
|
||||
agent: AgentType,
|
||||
error: str,
|
||||
) -> None:
|
||||
"""Post an escalation notice on the Gitea issue."""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
body = (
|
||||
f"## Dispatch Escalated\n\n"
|
||||
f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** "
|
||||
f"after {1} attempt(s).\n\n"
|
||||
f"**Error:** {error}\n\n"
|
||||
f"Manual intervention required.\n\n"
|
||||
f"---\n*Timmy agent dispatcher.*"
|
||||
)
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, body
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post escalation comment: %s", exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Monitoring helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def wait_for_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Block until the assigned Gitea issue is closed or the timeout fires.
|
||||
|
||||
Useful for synchronous orchestration where the caller wants to wait for
|
||||
the assigned agent to finish before proceeding.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to monitor.
|
||||
poll_interval: Seconds between status polls.
|
||||
max_wait: Maximum wait in seconds (default 2 hours).
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`.
|
||||
"""
|
||||
return await _poll_issue_completion(issue_number, poll_interval, max_wait)
|
||||
@@ -1,434 +0,0 @@
|
||||
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
|
||||
|
||||
When the dashboard has been idle for a configurable period, this engine
|
||||
selects a past chat session, identifies key agent response points, and
|
||||
asks the LLM to simulate alternative approaches. Insights are stored as
|
||||
proposed rules that can feed the auto-crystallizer or memory system.
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
# Run one dream cycle (called by the background scheduler)
|
||||
await dreaming_engine.dream_once()
|
||||
|
||||
# Query recent dreams
|
||||
dreams = dreaming_engine.get_recent_dreams(limit=10)
|
||||
|
||||
# Get current status dict for API/dashboard
|
||||
status = dreaming_engine.get_status()
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_DB = Path("data/dreams.db")
|
||||
|
||||
# Strip <think> tags from reasoning model output
|
||||
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
|
||||
|
||||
# Minimum messages in a session to be worth replaying
|
||||
_MIN_SESSION_MESSAGES = 3
|
||||
|
||||
# Gap in seconds between messages that signals a new session
|
||||
_SESSION_GAP_SECONDS = 1800 # 30 minutes
|
||||
|
||||
|
||||
@dataclass
|
||||
class DreamRecord:
|
||||
"""A single completed dream cycle."""
|
||||
|
||||
id: str
|
||||
session_excerpt: str # Short excerpt from the replayed session
|
||||
decision_point: str # The agent message that was re-simulated
|
||||
simulation: str # The alternative response generated
|
||||
proposed_rule: str # Rule extracted from the simulation
|
||||
created_at: str
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS dreams (
|
||||
id TEXT PRIMARY KEY,
|
||||
session_excerpt TEXT NOT NULL,
|
||||
decision_point TEXT NOT NULL,
|
||||
simulation TEXT NOT NULL,
|
||||
proposed_rule TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
|
||||
conn.commit()
|
||||
yield conn
|
||||
|
||||
|
||||
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
|
||||
return DreamRecord(
|
||||
id=row["id"],
|
||||
session_excerpt=row["session_excerpt"],
|
||||
decision_point=row["decision_point"],
|
||||
simulation=row["simulation"],
|
||||
proposed_rule=row["proposed_rule"],
|
||||
created_at=row["created_at"],
|
||||
)
|
||||
|
||||
|
||||
class DreamingEngine:
|
||||
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
|
||||
|
||||
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
|
||||
self._db_path = db_path
|
||||
self._last_activity_time: datetime = datetime.now(UTC)
|
||||
self._is_dreaming: bool = False
|
||||
self._current_dream_summary: str = ""
|
||||
self._dreaming_agent = None # Lazy-initialised
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────
|
||||
|
||||
def record_activity(self) -> None:
|
||||
"""Reset the idle timer — call this on every user/agent interaction."""
|
||||
self._last_activity_time = datetime.now(UTC)
|
||||
|
||||
def is_idle(self) -> bool:
|
||||
"""Return True if the system has been idle long enough to start dreaming."""
|
||||
threshold = settings.dreaming_idle_threshold_minutes
|
||||
if threshold <= 0:
|
||||
return False
|
||||
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
|
||||
|
||||
def get_status(self) -> dict[str, Any]:
|
||||
"""Return a status dict suitable for API/dashboard consumption."""
|
||||
return {
|
||||
"enabled": settings.dreaming_enabled,
|
||||
"dreaming": self._is_dreaming,
|
||||
"idle": self.is_idle(),
|
||||
"current_summary": self._current_dream_summary,
|
||||
"idle_minutes": int(
|
||||
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
|
||||
),
|
||||
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
|
||||
"dream_count": self.count_dreams(),
|
||||
}
|
||||
|
||||
async def dream_once(self) -> DreamRecord | None:
|
||||
"""Execute one dream cycle.
|
||||
|
||||
Returns the stored DreamRecord, or None if the cycle was skipped
|
||||
(not idle, dreaming disabled, no suitable session, or LLM error).
|
||||
"""
|
||||
if not settings.dreaming_enabled:
|
||||
return None
|
||||
|
||||
if not self.is_idle():
|
||||
logger.debug(
|
||||
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
|
||||
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
|
||||
settings.dreaming_idle_threshold_minutes,
|
||||
)
|
||||
return None
|
||||
|
||||
if self._is_dreaming:
|
||||
logger.debug("Dreaming skipped — cycle already in progress")
|
||||
return None
|
||||
|
||||
self._is_dreaming = True
|
||||
self._current_dream_summary = "Selecting a past session…"
|
||||
await self._broadcast_status()
|
||||
|
||||
try:
|
||||
return await self._run_dream_cycle()
|
||||
except Exception as exc:
|
||||
logger.warning("Dream cycle failed: %s", exc)
|
||||
return None
|
||||
finally:
|
||||
self._is_dreaming = False
|
||||
self._current_dream_summary = ""
|
||||
await self._broadcast_status()
|
||||
|
||||
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
|
||||
"""Retrieve the most recent dream records."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [_row_to_dream(r) for r in rows]
|
||||
|
||||
def count_dreams(self) -> int:
|
||||
"""Return total number of stored dream records."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
|
||||
return row["c"] if row else 0
|
||||
|
||||
# ── Private helpers ───────────────────────────────────────────────────
|
||||
|
||||
async def _run_dream_cycle(self) -> DreamRecord | None:
|
||||
"""Core dream logic: select → simulate → store."""
|
||||
# 1. Select a past session from the chat log
|
||||
session = await self._select_session()
|
||||
if not session:
|
||||
logger.debug("No suitable chat session found for dreaming")
|
||||
self._current_dream_summary = "No past sessions to replay"
|
||||
return None
|
||||
|
||||
decision_point, session_excerpt = session
|
||||
|
||||
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}…"
|
||||
await self._broadcast_status()
|
||||
|
||||
# 2. Simulate an alternative response
|
||||
simulation = await self._simulate_alternative(decision_point, session_excerpt)
|
||||
if not simulation:
|
||||
logger.debug("Dream simulation produced no output")
|
||||
return None
|
||||
|
||||
# 3. Extract a proposed rule
|
||||
proposed_rule = await self._extract_rule(decision_point, simulation)
|
||||
|
||||
# 4. Store and broadcast
|
||||
dream = self._store_dream(
|
||||
session_excerpt=session_excerpt,
|
||||
decision_point=decision_point,
|
||||
simulation=simulation,
|
||||
proposed_rule=proposed_rule,
|
||||
)
|
||||
|
||||
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
|
||||
|
||||
logger.info(
|
||||
"Dream [%s]: replayed session, proposed rule: %s",
|
||||
dream.id[:8],
|
||||
proposed_rule[:80] if proposed_rule else "(none)",
|
||||
)
|
||||
|
||||
await self._broadcast_status()
|
||||
await self._broadcast_dream(dream)
|
||||
return dream
|
||||
|
||||
async def _select_session(self) -> tuple[str, str] | None:
|
||||
"""Select a past chat session and return (decision_point, session_excerpt).
|
||||
|
||||
Uses the SQLite chat store. Groups messages into sessions by time
|
||||
gap. Picks a random session with enough messages, then selects one
|
||||
agent response as the decision point.
|
||||
"""
|
||||
try:
|
||||
from infrastructure.chat_store import DB_PATH
|
||||
|
||||
if not DB_PATH.exists():
|
||||
return None
|
||||
|
||||
import asyncio
|
||||
rows = await asyncio.to_thread(self._load_chat_rows)
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
sessions = self._group_into_sessions(rows)
|
||||
if not sessions:
|
||||
return None
|
||||
|
||||
# Filter sessions with enough messages
|
||||
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
|
||||
if not valid:
|
||||
return None
|
||||
|
||||
import random
|
||||
session = random.choice(valid) # noqa: S311 (not cryptographic)
|
||||
|
||||
# Build a short text excerpt (last N messages)
|
||||
excerpt_msgs = session[-6:]
|
||||
excerpt = "\n".join(
|
||||
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
|
||||
)
|
||||
|
||||
# Find agent responses as candidate decision points
|
||||
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
|
||||
if not agent_msgs:
|
||||
return None
|
||||
|
||||
decision = random.choice(agent_msgs) # noqa: S311
|
||||
return decision["content"], excerpt
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Session selection failed: %s", exc)
|
||||
return None
|
||||
|
||||
def _load_chat_rows(self) -> list[dict]:
|
||||
"""Synchronously load chat messages from SQLite."""
|
||||
from infrastructure.chat_store import DB_PATH
|
||||
|
||||
with closing(sqlite3.connect(str(DB_PATH))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"SELECT role, content, timestamp FROM chat_messages "
|
||||
"ORDER BY timestamp ASC"
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
|
||||
"""Group chat rows into sessions based on time gaps."""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
sessions: list[list[dict]] = []
|
||||
current: list[dict] = [rows[0]]
|
||||
|
||||
for prev, curr in zip(rows, rows[1:], strict=False):
|
||||
try:
|
||||
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
|
||||
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
|
||||
gap = (t_curr - t_prev).total_seconds()
|
||||
except Exception:
|
||||
gap = 0
|
||||
|
||||
if gap > _SESSION_GAP_SECONDS:
|
||||
sessions.append(current)
|
||||
current = [curr]
|
||||
else:
|
||||
current.append(curr)
|
||||
|
||||
sessions.append(current)
|
||||
return sessions
|
||||
|
||||
async def _simulate_alternative(
|
||||
self, decision_point: str, session_excerpt: str
|
||||
) -> str:
|
||||
"""Ask the LLM to simulate an alternative response."""
|
||||
prompt = (
|
||||
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
|
||||
"You are replaying a past conversation and exploring what you could "
|
||||
"have done differently at a key decision point.\n\n"
|
||||
"PAST SESSION EXCERPT:\n"
|
||||
f"{session_excerpt}\n\n"
|
||||
"KEY DECISION POINT (your past response):\n"
|
||||
f"{decision_point[:500]}\n\n"
|
||||
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
|
||||
"you could have taken at this decision point that would have been "
|
||||
"more helpful, more accurate, or more efficient.\n"
|
||||
"Be specific — reference the actual content of the conversation.\n"
|
||||
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
|
||||
"Alternative approach:"
|
||||
)
|
||||
|
||||
raw = await self._call_agent(prompt)
|
||||
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
|
||||
|
||||
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
|
||||
"""Extract a proposed behaviour rule from the simulation."""
|
||||
prompt = (
|
||||
"Given this pair of agent responses:\n\n"
|
||||
f"ORIGINAL: {decision_point[:300]}\n\n"
|
||||
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
|
||||
"Extract ONE concise rule (max 20 words) that captures what to do "
|
||||
"differently next time. Format: 'When X, do Y instead of Z.'\n"
|
||||
"Rule:"
|
||||
)
|
||||
|
||||
raw = await self._call_agent(prompt)
|
||||
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
|
||||
# Keep only the first sentence/line
|
||||
rule = rule.split("\n")[0].strip().rstrip(".")
|
||||
return rule[:200] # Safety cap
|
||||
|
||||
async def _call_agent(self, prompt: str) -> str:
|
||||
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
|
||||
import asyncio
|
||||
|
||||
if self._dreaming_agent is None:
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
self._dreaming_agent = create_timmy(skip_mcp=True)
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(settings.dreaming_timeout_seconds):
|
||||
run = await self._dreaming_agent.arun(prompt, stream=False)
|
||||
except TimeoutError:
|
||||
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
|
||||
return ""
|
||||
except Exception as exc:
|
||||
logger.warning("Dreaming LLM call failed: %s", exc)
|
||||
return ""
|
||||
|
||||
raw = run.content if hasattr(run, "content") else str(run)
|
||||
return raw or ""
|
||||
|
||||
def _store_dream(
|
||||
self,
|
||||
*,
|
||||
session_excerpt: str,
|
||||
decision_point: str,
|
||||
simulation: str,
|
||||
proposed_rule: str,
|
||||
) -> DreamRecord:
|
||||
dream = DreamRecord(
|
||||
id=str(uuid.uuid4()),
|
||||
session_excerpt=session_excerpt,
|
||||
decision_point=decision_point,
|
||||
simulation=simulation,
|
||||
proposed_rule=proposed_rule,
|
||||
created_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
with _get_conn(self._db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO dreams
|
||||
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
dream.id,
|
||||
dream.session_excerpt,
|
||||
dream.decision_point,
|
||||
dream.simulation,
|
||||
dream.proposed_rule,
|
||||
dream.created_at,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
return dream
|
||||
|
||||
async def _broadcast_status(self) -> None:
|
||||
"""Push current dreaming status via WebSocket."""
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast("dreaming_state", self.get_status())
|
||||
except Exception as exc:
|
||||
logger.debug("Dreaming status broadcast failed: %s", exc)
|
||||
|
||||
async def _broadcast_dream(self, dream: DreamRecord) -> None:
|
||||
"""Push a completed dream record via WebSocket."""
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast(
|
||||
"dreaming_complete",
|
||||
{
|
||||
"id": dream.id,
|
||||
"proposed_rule": dream.proposed_rule,
|
||||
"simulation": dream.simulation[:200],
|
||||
"created_at": dream.created_at,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Dreaming complete broadcast failed: %s", exc)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
dreaming_engine = DreamingEngine()
|
||||
@@ -2547,44 +2547,3 @@
|
||||
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
|
||||
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
|
||||
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
|
||||
|
||||
/* ═══════════════════════════════════════════════════════════════
|
||||
Dreaming Mode
|
||||
═══════════════════════════════════════════════════════════════ */
|
||||
|
||||
.dream-active {
|
||||
display: flex; align-items: center; gap: 8px;
|
||||
padding: 6px 0;
|
||||
}
|
||||
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
|
||||
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
|
||||
|
||||
.dream-pulse {
|
||||
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
|
||||
background: var(--purple);
|
||||
animation: dream-pulse 1.8s ease-in-out infinite;
|
||||
}
|
||||
@keyframes dream-pulse {
|
||||
0%, 100% { opacity: 1; transform: scale(1); }
|
||||
50% { opacity: 0.4; transform: scale(0.7); }
|
||||
}
|
||||
|
||||
.dream-dot {
|
||||
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
|
||||
}
|
||||
.dream-dot-idle { background: var(--amber); }
|
||||
.dream-dot-standby { background: var(--text-dim); }
|
||||
|
||||
.dream-idle, .dream-standby {
|
||||
display: flex; align-items: center; gap: 6px; padding: 4px 0;
|
||||
}
|
||||
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
|
||||
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
|
||||
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
|
||||
|
||||
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
|
||||
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
|
||||
.dream-record:last-child { border-bottom: none; }
|
||||
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
|
||||
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
|
||||
|
||||
|
||||
@@ -1,503 +0,0 @@
|
||||
"""Tests for the agent dispatcher (timmy.dispatcher)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.dispatcher import (
|
||||
AGENT_REGISTRY,
|
||||
AgentType,
|
||||
DispatchResult,
|
||||
DispatchStatus,
|
||||
TaskType,
|
||||
_dispatch_local,
|
||||
_dispatch_via_api,
|
||||
_dispatch_via_gitea,
|
||||
dispatch_task,
|
||||
infer_task_type,
|
||||
select_agent,
|
||||
wait_for_completion,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAgentRegistry:
|
||||
def test_all_agents_present(self):
|
||||
for member in AgentType:
|
||||
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
|
||||
|
||||
def test_agent_specs_have_display_names(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.display_name, f"{agent} has empty display_name"
|
||||
|
||||
def test_gitea_agents_have_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface == "gitea":
|
||||
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
|
||||
|
||||
def test_non_gitea_agents_have_no_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface not in ("gitea",):
|
||||
# api and local agents may have no label
|
||||
assert spec.gitea_label is None or spec.interface == "gitea"
|
||||
|
||||
def test_max_concurrent_positive(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# select_agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSelectAgent:
|
||||
def test_architecture_routes_to_claude(self):
|
||||
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_refactoring_routes_to_claude(self):
|
||||
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_code_review_routes_to_claude(self):
|
||||
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_routine_coding_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
|
||||
|
||||
def test_fast_iteration_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
|
||||
|
||||
def test_research_routes_to_agent_api(self):
|
||||
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
|
||||
|
||||
def test_triage_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
|
||||
|
||||
def test_planning_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# infer_task_type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInferTaskType:
|
||||
def test_architecture_keyword(self):
|
||||
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
|
||||
|
||||
def test_refactor_keyword(self):
|
||||
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
|
||||
|
||||
def test_code_review_keyword(self):
|
||||
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
|
||||
|
||||
def test_research_keyword(self):
|
||||
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
|
||||
|
||||
def test_triage_keyword(self):
|
||||
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
|
||||
|
||||
def test_planning_keyword(self):
|
||||
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
|
||||
|
||||
def test_fallback_returns_routine_coding(self):
|
||||
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
|
||||
|
||||
def test_description_contributes_to_inference(self):
|
||||
result = infer_task_type("Implement feature", "We need to refactor the old code")
|
||||
assert result == TaskType.REFACTORING
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DispatchResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchResult:
|
||||
def test_success_when_assigned(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_success_when_completed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.COMPLETED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_not_success_when_failed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
def test_not_success_when_escalated(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ESCALATED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_local
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchLocal:
|
||||
async def test_returns_assigned(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the migration",
|
||||
description="We need a plan.",
|
||||
acceptance_criteria=["Plan is documented"],
|
||||
issue_number=42,
|
||||
)
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.issue_number == 42
|
||||
|
||||
async def test_infers_task_type(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the sprint",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.task_type == TaskType.PLANNING
|
||||
|
||||
async def test_no_issue_number(self):
|
||||
result = await _dispatch_local(title="Do something", description="")
|
||||
assert result.issue_number is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_api
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchViaApi:
|
||||
async def test_no_endpoint_returns_failed(self):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "No API endpoint" in (result.error or "")
|
||||
|
||||
async def test_successful_api_call(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 202
|
||||
mock_resp.content = b'{"ok": true}'
|
||||
mock_resp.json.return_value = {"ok": True}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="Look at the logs",
|
||||
acceptance_criteria=["Report produced"],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.AGENT_API
|
||||
|
||||
async def test_api_error_returns_failed(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
mock_resp.text = "Internal Server Error"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "500" in (result.error or "")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_gitea
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_GITEA_SETTINGS = MagicMock(
|
||||
gitea_enabled=True,
|
||||
gitea_token="test-token",
|
||||
gitea_url="http://gitea.test",
|
||||
gitea_repo="owner/repo",
|
||||
)
|
||||
|
||||
|
||||
class TestDispatchViaGitea:
|
||||
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
|
||||
"""Build a mock httpx.AsyncClient for Gitea interactions."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = label_list or []
|
||||
|
||||
create_label_resp = MagicMock()
|
||||
create_label_resp.status_code = label_create_status
|
||||
create_label_resp.json.return_value = {"id": 99}
|
||||
|
||||
apply_label_resp = MagicMock()
|
||||
apply_label_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = comment_status
|
||||
comment_resp.json.return_value = {"id": 7}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
|
||||
return client
|
||||
|
||||
async def test_successful_gitea_dispatch(self):
|
||||
client = self._make_client()
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1072,
|
||||
title="Design the router",
|
||||
description="We need a cascade router.",
|
||||
acceptance_criteria=["Failover works"],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.issue_number == 1072
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
|
||||
async def test_no_gitea_token_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "not configured" in (result.error or "").lower()
|
||||
|
||||
async def test_gitea_disabled_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
|
||||
async def test_existing_label_reused(self):
|
||||
"""When the label already exists, it should be reused (no creation call)."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
|
||||
|
||||
apply_resp = MagicMock()
|
||||
apply_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 201
|
||||
comment_resp.json.return_value = {"id": 8}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=10,
|
||||
title="Architecture task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
# Should only have 2 POST calls: apply label + comment (no label creation)
|
||||
assert client.post.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatch_task (integration-style)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchTask:
|
||||
async def test_empty_title_returns_failed(self):
|
||||
result = await dispatch_task(title=" ")
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "`title` is required" in (result.error or "")
|
||||
|
||||
async def test_local_dispatch_for_timmy_task(self):
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
description="We have 40 open issues.",
|
||||
acceptance_criteria=["Issues are labelled"],
|
||||
task_type=TaskType.TRIAGE,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.success
|
||||
|
||||
async def test_explicit_agent_override(self):
|
||||
"""Caller can force a specific agent regardless of task type."""
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
agent=AgentType.TIMMY,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
|
||||
async def test_gitea_dispatch_when_issue_provided(self):
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
|
||||
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
|
||||
apply_resp = MagicMock(status_code=201)
|
||||
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
|
||||
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await dispatch_task(
|
||||
title="Design the cascade router",
|
||||
description="Architecture task.",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1072,
|
||||
)
|
||||
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.success
|
||||
|
||||
async def test_escalation_after_max_retries(self):
|
||||
"""If all attempts fail, the result is ESCALATED."""
|
||||
with (
|
||||
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
|
||||
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
|
||||
):
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea offline",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ESCALATED
|
||||
assert mock_dispatch.call_count == 2 # initial + 1 retry
|
||||
|
||||
async def test_no_retry_on_success(self):
|
||||
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=42,
|
||||
label_applied="claude-ready",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=2,
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert mock_dispatch.call_count == 1 # no retries needed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# wait_for_completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWaitForCompletion:
|
||||
async def test_returns_completed_when_issue_closed(self):
|
||||
closed_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "closed"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=closed_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
|
||||
|
||||
assert status == DispatchStatus.COMPLETED
|
||||
|
||||
async def test_returns_timed_out_when_still_open(self):
|
||||
open_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "open"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=open_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
patch("asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
|
||||
|
||||
assert status == DispatchStatus.TIMED_OUT
|
||||
@@ -1,217 +0,0 @@
|
||||
"""Unit tests for the Dreaming mode engine."""
|
||||
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.dreaming import _SESSION_GAP_SECONDS, DreamingEngine, DreamRecord
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def tmp_dreams_db(tmp_path):
|
||||
"""Return a temporary path for the dreams database."""
|
||||
return tmp_path / "dreams.db"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def engine(tmp_dreams_db):
|
||||
"""DreamingEngine backed by a temp database."""
|
||||
return DreamingEngine(db_path=tmp_dreams_db)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def chat_db(tmp_path):
|
||||
"""Create a minimal chat database with some messages."""
|
||||
db_path = tmp_path / "chat.db"
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS chat_messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
role TEXT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
source TEXT NOT NULL DEFAULT 'browser'
|
||||
)
|
||||
""")
|
||||
now = datetime.now(UTC)
|
||||
messages = [
|
||||
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
|
||||
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
|
||||
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
|
||||
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
|
||||
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
|
||||
]
|
||||
conn.executemany(
|
||||
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
|
||||
messages,
|
||||
)
|
||||
conn.commit()
|
||||
return db_path
|
||||
|
||||
|
||||
# ── Idle detection ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestIdleDetection:
|
||||
def test_not_idle_immediately(self, engine):
|
||||
assert engine.is_idle() is False
|
||||
|
||||
def test_idle_after_threshold(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.is_idle() is True
|
||||
|
||||
def test_not_idle_when_threshold_zero(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 0
|
||||
assert engine.is_idle() is False
|
||||
|
||||
def test_record_activity_resets_timer(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
|
||||
engine.record_activity()
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.is_idle() is False
|
||||
|
||||
|
||||
# ── Status dict ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetStatus:
|
||||
def test_status_shape(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
status = engine.get_status()
|
||||
assert "enabled" in status
|
||||
assert "dreaming" in status
|
||||
assert "idle" in status
|
||||
assert "dream_count" in status
|
||||
assert "idle_minutes" in status
|
||||
|
||||
def test_dream_count_starts_at_zero(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.get_status()["dream_count"] == 0
|
||||
|
||||
|
||||
# ── Session grouping ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGroupIntoSessions:
|
||||
def test_single_session(self, engine):
|
||||
now = datetime.now(UTC)
|
||||
rows = [
|
||||
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
|
||||
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
|
||||
]
|
||||
sessions = engine._group_into_sessions(rows)
|
||||
assert len(sessions) == 1
|
||||
assert len(sessions[0]) == 2
|
||||
|
||||
def test_splits_on_large_gap(self, engine):
|
||||
now = datetime.now(UTC)
|
||||
gap = _SESSION_GAP_SECONDS + 100
|
||||
rows = [
|
||||
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
|
||||
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
|
||||
]
|
||||
sessions = engine._group_into_sessions(rows)
|
||||
assert len(sessions) == 2
|
||||
|
||||
def test_empty_input(self, engine):
|
||||
assert engine._group_into_sessions([]) == []
|
||||
|
||||
|
||||
# ── Dream storage ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDreamStorage:
|
||||
def test_store_and_retrieve(self, engine):
|
||||
dream = engine._store_dream(
|
||||
session_excerpt="User asked about Python.",
|
||||
decision_point="Python uses try/except blocks.",
|
||||
simulation="I could have given a code example.",
|
||||
proposed_rule="When explaining errors, include a code snippet.",
|
||||
)
|
||||
assert dream.id
|
||||
assert dream.proposed_rule == "When explaining errors, include a code snippet."
|
||||
|
||||
retrieved = engine.get_recent_dreams(limit=1)
|
||||
assert len(retrieved) == 1
|
||||
assert retrieved[0].id == dream.id
|
||||
|
||||
def test_count_increments(self, engine):
|
||||
assert engine.count_dreams() == 0
|
||||
engine._store_dream(
|
||||
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
|
||||
)
|
||||
assert engine.count_dreams() == 1
|
||||
|
||||
|
||||
# ── dream_once integration ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDreamOnce:
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_disabled(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = False
|
||||
result = await engine.dream_once()
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_not_idle(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 60
|
||||
result = await engine.dream_once()
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_already_dreaming(self, engine):
|
||||
engine._is_dreaming = True
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 0
|
||||
result = await engine.dream_once()
|
||||
# Reset for cleanliness
|
||||
engine._is_dreaming = False
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dream_produces_record_when_idle(self, engine, chat_db):
|
||||
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
|
||||
|
||||
with (
|
||||
patch("timmy.dreaming.settings") as mock_settings,
|
||||
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
|
||||
patch("infrastructure.chat_store.DB_PATH", chat_db),
|
||||
):
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
mock_settings.dreaming_timeout_seconds = 30
|
||||
mock_agent.side_effect = [
|
||||
"I could have provided a concrete try/except example.", # simulation
|
||||
"When explaining errors, always include a runnable code snippet.", # rule
|
||||
]
|
||||
|
||||
result = await engine.dream_once()
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, DreamRecord)
|
||||
assert result.simulation
|
||||
assert result.proposed_rule
|
||||
assert engine.count_dreams() == 1
|
||||
15
tox.ini
15
tox.ini
@@ -47,10 +47,12 @@ commands =
|
||||
# ── Test Environments ────────────────────────────────────────────────────────
|
||||
|
||||
[testenv:unit]
|
||||
description = Fast unit tests — only tests marked @pytest.mark.unit
|
||||
description = Fast tests — excludes e2e, functional, and external services
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
-m "unit and not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
--ignore=tests/e2e \
|
||||
--ignore=tests/functional \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-n auto --dist worksteal
|
||||
|
||||
[testenv:integration]
|
||||
@@ -127,6 +129,15 @@ commands =
|
||||
-p no:xdist \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not slow"
|
||||
|
||||
[testenv:coverage-parallel]
|
||||
description = Parallel coverage report
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
--cov=src \
|
||||
--cov-report=term-missing \
|
||||
-n auto --dist worksteal \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not slow"
|
||||
|
||||
# ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────
|
||||
|
||||
[testenv:pre-push]
|
||||
|
||||
Reference in New Issue
Block a user