1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
36de0b491d feat(matrix-ui): add Fund Session modal with explanatory text about sats
- Add Fund Session button to Matrix UI overlay (green plus icon)
- Create modal with three explanatory sections:
  * What are Sats? - explains satoshis as smallest Bitcoin unit
  * Why Fund Your Session? - explains how sats power Workshop AI agents
  * Approximate Costs - shows cost ranges for different interaction types
- Add input field for funding amount (min 100 sats)
- Style modal with green theme to match Lightning/sats concept
- Add proper keyboard support (Enter to submit, Escape to close)
- Mobile-responsive design

Fixes #753
2026-03-21 18:21:00 -04:00
35 changed files with 414 additions and 3866 deletions

View File

@@ -277,8 +277,6 @@ def main() -> None:
args.tests_passed = int(cr["tests_passed"]) args.tests_passed = int(cr["tests_passed"])
if not args.notes and cr.get("notes"): if not args.notes and cr.get("notes"):
args.notes = cr["notes"] args.notes = cr["notes"]
# Consume-once: delete after reading so stale results don't poison future cycles
CYCLE_RESULT_FILE.unlink(missing_ok=True)
# Auto-detect issue from branch when not explicitly provided # Auto-detect issue from branch when not explicitly provided
if args.issue is None: if args.issue is None:

View File

@@ -30,7 +30,7 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json" CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1") GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this # Default cycle duration in seconds (5 min); stale threshold = 2× this

View File

@@ -20,7 +20,7 @@ from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────── # ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1") GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
@@ -327,31 +327,7 @@ def run_triage() -> list[dict]:
not_ready = [s for s in scored if not s["ready"]] not_ready = [s for s in scored if not s["ready"]]
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
backup_file = QUEUE_FILE.with_suffix(".json.bak") QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Backup existing queue before overwriting
if QUEUE_FILE.exists():
try:
backup_file.write_text(QUEUE_FILE.read_text())
except OSError:
pass
# Write and validate
queue_json = json.dumps(ready, indent=2) + "\n"
QUEUE_FILE.write_text(queue_json)
# Validate by re-reading — restore backup on corruption
try:
validated = json.loads(QUEUE_FILE.read_text())
if not isinstance(validated, list):
raise ValueError("queue.json is not a list")
except (json.JSONDecodeError, ValueError) as e:
print(f"[triage] ERROR: queue.json validation failed: {e}", file=sys.stderr)
if backup_file.exists():
print("[triage] Restoring from backup", file=sys.stderr)
QUEUE_FILE.write_text(backup_file.read_text())
else:
QUEUE_FILE.write_text("[]\n")
# Write retro entry # Write retro entry
retro_entry = { retro_entry = {

View File

@@ -87,12 +87,8 @@ class Settings(BaseSettings):
xai_base_url: str = "https://api.x.ai/v1" xai_base_url: str = "https://api.x.ai/v1"
grok_default_model: str = "grok-3-fast" grok_default_model: str = "grok-3-fast"
grok_max_sats_per_query: int = 200 grok_max_sats_per_query: int = 200
grok_sats_hard_cap: int = 100 # Absolute ceiling on sats per Grok query
grok_free: bool = False # Skip Lightning invoice when user has own API key grok_free: bool = False # Skip Lightning invoice when user has own API key
# ── Database ──────────────────────────────────────────────────────────
db_busy_timeout_ms: int = 5000 # SQLite PRAGMA busy_timeout (ms)
# ── Claude (Anthropic) — cloud fallback backend ──────────────────────── # ── Claude (Anthropic) — cloud fallback backend ────────────────────────
# Used when Ollama is offline and local inference isn't available. # Used when Ollama is offline and local inference isn't available.
# Set ANTHROPIC_API_KEY to enable. Default model is Haiku (fast + cheap). # Set ANTHROPIC_API_KEY to enable. Default model is Haiku (fast + cheap).

View File

@@ -44,7 +44,6 @@ from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.models import api_router as models_api_router from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router from dashboard.routes.models import router as models_router
from dashboard.routes.quests import router as quests_router from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.spark import router as spark_router from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router from dashboard.routes.tasks import router as tasks_router
@@ -630,7 +629,6 @@ app.include_router(matrix_router)
app.include_router(tower_router) app.include_router(tower_router)
app.include_router(daily_run_router) app.include_router(daily_run_router)
app.include_router(quests_router) app.include_router(quests_router)
app.include_router(scorecards_router)
@app.websocket("/ws") @app.websocket("/ws")

View File

@@ -1,353 +0,0 @@
"""Agent scorecard routes — API endpoints for generating and viewing scorecards."""
from __future__ import annotations
import logging
from datetime import datetime
from fastapi import APIRouter, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.services.scorecard_service import (
PeriodType,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/scorecards", tags=["scorecards"])
def _format_period_label(period_type: PeriodType) -> str:
"""Format a period type for display."""
return "Daily" if period_type == PeriodType.daily else "Weekly"
@router.get("/api/agents")
async def list_tracked_agents() -> dict[str, list[str]]:
"""Return the list of tracked agent IDs.
Returns:
Dict with "agents" key containing list of agent IDs
"""
return {"agents": get_tracked_agents()}
@router.get("/api/{agent_id}")
async def get_agent_scorecard(
agent_id: str,
period: str = Query(default="daily", description="Period type: 'daily' or 'weekly'"),
) -> JSONResponse:
"""Generate a scorecard for a specific agent.
Args:
agent_id: The agent ID (e.g., 'kimi', 'claude')
period: 'daily' or 'weekly' (default: daily)
Returns:
JSON response with scorecard data
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
return JSONResponse(
status_code=400,
content={"error": f"Invalid period '{period}'. Use 'daily' or 'weekly'."},
)
try:
scorecard = generate_scorecard(agent_id, period_type)
if scorecard is None:
return JSONResponse(
status_code=404,
content={"error": f"No scorecard found for agent '{agent_id}'"},
)
return JSONResponse(content=scorecard.to_dict())
except Exception as exc:
logger.error("Failed to generate scorecard for %s: %s", agent_id, exc)
return JSONResponse(
status_code=500,
content={"error": f"Failed to generate scorecard: {str(exc)}"},
)
@router.get("/api")
async def get_all_scorecards(
period: str = Query(default="daily", description="Period type: 'daily' or 'weekly'"),
) -> JSONResponse:
"""Generate scorecards for all tracked agents.
Args:
period: 'daily' or 'weekly' (default: daily)
Returns:
JSON response with list of scorecard data
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
return JSONResponse(
status_code=400,
content={"error": f"Invalid period '{period}'. Use 'daily' or 'weekly'."},
)
try:
scorecards = generate_all_scorecards(period_type)
return JSONResponse(
content={
"period": period_type.value,
"scorecards": [s.to_dict() for s in scorecards],
"count": len(scorecards),
}
)
except Exception as exc:
logger.error("Failed to generate scorecards: %s", exc)
return JSONResponse(
status_code=500,
content={"error": f"Failed to generate scorecards: {str(exc)}"},
)
@router.get("", response_class=HTMLResponse)
async def scorecards_page(request: Request) -> HTMLResponse:
"""Render the scorecards dashboard page.
Returns:
HTML page with scorecard interface
"""
agents = get_tracked_agents()
return templates.TemplateResponse(
request,
"scorecards.html",
{
"agents": agents,
"periods": ["daily", "weekly"],
},
)
@router.get("/panel/{agent_id}", response_class=HTMLResponse)
async def agent_scorecard_panel(
request: Request,
agent_id: str,
period: str = Query(default="daily"),
) -> HTMLResponse:
"""Render an individual agent scorecard panel (for HTMX).
Args:
request: The request object
agent_id: The agent ID
period: 'daily' or 'weekly'
Returns:
HTML panel with scorecard content
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
period_type = PeriodType.daily
try:
scorecard = generate_scorecard(agent_id, period_type)
if scorecard is None:
return HTMLResponse(
content=f"""
<div class="card mc-panel">
<h5 class="card-title">{agent_id.title()}</h5>
<p class="text-muted">No activity recorded for this period.</p>
</div>
""",
status_code=200,
)
data = scorecard.to_dict()
# Build patterns HTML
patterns_html = ""
if data["patterns"]:
patterns_list = "".join([f"<li>{p}</li>" for p in data["patterns"]])
patterns_html = f"""
<div class="mt-3">
<h6>Patterns</h6>
<ul class="list-unstyled text-info">
{patterns_list}
</ul>
</div>
"""
# Build bullets HTML
bullets_html = "".join([f"<li>{b}</li>" for b in data["narrative_bullets"]])
# Build metrics summary
metrics = data["metrics"]
html_content = f"""
<div class="card mc-panel">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="card-title mb-0">{agent_id.title()}</h5>
<span class="badge bg-secondary">{_format_period_label(period_type)}</span>
</div>
<div class="card-body">
<ul class="list-unstyled mb-3">
{bullets_html}
</ul>
<div class="row text-center small">
<div class="col">
<div class="text-muted">PRs</div>
<div class="fw-bold">{metrics["prs_opened"]}/{metrics["prs_merged"]}</div>
<div class="text-muted" style="font-size: 0.75rem;">
{int(metrics["pr_merge_rate"] * 100)}% merged
</div>
</div>
<div class="col">
<div class="text-muted">Issues</div>
<div class="fw-bold">{metrics["issues_touched"]}</div>
</div>
<div class="col">
<div class="text-muted">Tests</div>
<div class="fw-bold">{metrics["tests_affected"]}</div>
</div>
<div class="col">
<div class="text-muted">Tokens</div>
<div class="fw-bold {"text-success" if metrics["token_net"] >= 0 else "text-danger"}">
{"+" if metrics["token_net"] > 0 else ""}{metrics["token_net"]}
</div>
</div>
</div>
{patterns_html}
</div>
</div>
"""
return HTMLResponse(content=html_content)
except Exception as exc:
logger.error("Failed to render scorecard panel for %s: %s", agent_id, exc)
return HTMLResponse(
content=f"""
<div class="card mc-panel border-danger">
<h5 class="card-title">{agent_id.title()}</h5>
<p class="text-danger">Error loading scorecard: {str(exc)}</p>
</div>
""",
status_code=200,
)
@router.get("/all/panels", response_class=HTMLResponse)
async def all_scorecard_panels(
request: Request,
period: str = Query(default="daily"),
) -> HTMLResponse:
"""Render all agent scorecard panels (for HTMX).
Args:
request: The request object
period: 'daily' or 'weekly'
Returns:
HTML with all scorecard panels
"""
try:
period_type = PeriodType(period.lower())
except ValueError:
period_type = PeriodType.daily
try:
scorecards = generate_all_scorecards(period_type)
panels: list[str] = []
for scorecard in scorecards:
data = scorecard.to_dict()
# Build patterns HTML
patterns_html = ""
if data["patterns"]:
patterns_list = "".join([f"<li>{p}</li>" for p in data["patterns"]])
patterns_html = f"""
<div class="mt-3">
<h6>Patterns</h6>
<ul class="list-unstyled text-info">
{patterns_list}
</ul>
</div>
"""
# Build bullets HTML
bullets_html = "".join([f"<li>{b}</li>" for b in data["narrative_bullets"]])
metrics = data["metrics"]
panel_html = f"""
<div class="col-md-6 col-lg-4 mb-3">
<div class="card mc-panel">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="card-title mb-0">{scorecard.agent_id.title()}</h5>
<span class="badge bg-secondary">{_format_period_label(period_type)}</span>
</div>
<div class="card-body">
<ul class="list-unstyled mb-3">
{bullets_html}
</ul>
<div class="row text-center small">
<div class="col">
<div class="text-muted">PRs</div>
<div class="fw-bold">{metrics["prs_opened"]}/{metrics["prs_merged"]}</div>
<div class="text-muted" style="font-size: 0.75rem;">
{int(metrics["pr_merge_rate"] * 100)}% merged
</div>
</div>
<div class="col">
<div class="text-muted">Issues</div>
<div class="fw-bold">{metrics["issues_touched"]}</div>
</div>
<div class="col">
<div class="text-muted">Tests</div>
<div class="fw-bold">{metrics["tests_affected"]}</div>
</div>
<div class="col">
<div class="text-muted">Tokens</div>
<div class="fw-bold {"text-success" if metrics["token_net"] >= 0 else "text-danger"}">
{"+" if metrics["token_net"] > 0 else ""}{metrics["token_net"]}
</div>
</div>
</div>
{patterns_html}
</div>
</div>
</div>
"""
panels.append(panel_html)
html_content = f"""
<div class="row">
{"".join(panels)}
</div>
<div class="text-muted small mt-2">
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")}
</div>
"""
return HTMLResponse(content=html_content)
except Exception as exc:
logger.error("Failed to render all scorecard panels: %s", exc)
return HTMLResponse(
content=f"""
<div class="alert alert-danger">
Error loading scorecards: {str(exc)}
</div>
""",
status_code=200,
)

View File

@@ -1,17 +0,0 @@
"""Dashboard services for business logic."""
from dashboard.services.scorecard_service import (
PeriodType,
ScorecardSummary,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
__all__ = [
"PeriodType",
"ScorecardSummary",
"generate_all_scorecards",
"generate_scorecard",
"get_tracked_agents",
]

View File

@@ -1,515 +0,0 @@
"""Agent scorecard service — track and summarize agent performance.
Generates daily/weekly scorecards showing:
- Issues touched, PRs opened/merged
- Tests affected, tokens earned/spent
- Pattern highlights (merge rate, activity quality)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any
from infrastructure.events.bus import Event, get_event_bus
logger = logging.getLogger(__name__)
# Bot/agent usernames to track
TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
class PeriodType(StrEnum):
daily = "daily"
weekly = "weekly"
@dataclass
class AgentMetrics:
"""Raw metrics collected for an agent over a period."""
agent_id: str
issues_touched: set[int] = field(default_factory=set)
prs_opened: set[int] = field(default_factory=set)
prs_merged: set[int] = field(default_factory=set)
tests_affected: set[str] = field(default_factory=set)
tokens_earned: int = 0
tokens_spent: int = 0
commits: int = 0
comments: int = 0
@property
def pr_merge_rate(self) -> float:
"""Calculate PR merge rate (0.0 - 1.0)."""
opened = len(self.prs_opened)
if opened == 0:
return 0.0
return len(self.prs_merged) / opened
@dataclass
class ScorecardSummary:
"""A generated scorecard with narrative summary."""
agent_id: str
period_type: PeriodType
period_start: datetime
period_end: datetime
metrics: AgentMetrics
narrative_bullets: list[str] = field(default_factory=list)
patterns: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""Convert scorecard to dictionary for JSON serialization."""
return {
"agent_id": self.agent_id,
"period_type": self.period_type.value,
"period_start": self.period_start.isoformat(),
"period_end": self.period_end.isoformat(),
"metrics": {
"issues_touched": len(self.metrics.issues_touched),
"prs_opened": len(self.metrics.prs_opened),
"prs_merged": len(self.metrics.prs_merged),
"pr_merge_rate": round(self.metrics.pr_merge_rate, 2),
"tests_affected": len(self.tests_affected),
"commits": self.metrics.commits,
"comments": self.metrics.comments,
"tokens_earned": self.metrics.tokens_earned,
"tokens_spent": self.metrics.tokens_spent,
"token_net": self.metrics.tokens_earned - self.metrics.tokens_spent,
},
"narrative_bullets": self.narrative_bullets,
"patterns": self.patterns,
}
@property
def tests_affected(self) -> set[str]:
"""Alias for metrics.tests_affected."""
return self.metrics.tests_affected
def _get_period_bounds(
period_type: PeriodType, reference_date: datetime | None = None
) -> tuple[datetime, datetime]:
"""Calculate start and end timestamps for a period.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
Tuple of (period_start, period_end) in UTC
"""
if reference_date is None:
reference_date = datetime.now(UTC)
# Normalize to start of day
end = reference_date.replace(hour=0, minute=0, second=0, microsecond=0)
if period_type == PeriodType.daily:
start = end - timedelta(days=1)
else: # weekly
start = end - timedelta(days=7)
return start, end
def _collect_events_for_period(
start: datetime, end: datetime, agent_id: str | None = None
) -> list[Event]:
"""Collect events from the event bus for a time period.
Args:
start: Period start time
end: Period end time
agent_id: Optional agent filter
Returns:
List of matching events
"""
bus = get_event_bus()
events: list[Event] = []
# Query persisted events for relevant types
event_types = [
"gitea.push",
"gitea.issue.opened",
"gitea.issue.comment",
"gitea.pull_request",
"agent.task.completed",
"test.execution",
]
for event_type in event_types:
try:
type_events = bus.replay(
event_type=event_type,
source=agent_id,
limit=1000,
)
events.extend(type_events)
except Exception as exc:
logger.debug("Failed to replay events for %s: %s", event_type, exc)
# Filter by timestamp
filtered = []
for event in events:
try:
event_time = datetime.fromisoformat(event.timestamp.replace("Z", "+00:00"))
if start <= event_time < end:
filtered.append(event)
except (ValueError, AttributeError):
continue
return filtered
def _extract_actor_from_event(event: Event) -> str:
"""Extract the actor/agent from an event."""
# Try data fields first
if "actor" in event.data:
return event.data["actor"]
if "agent_id" in event.data:
return event.data["agent_id"]
# Fall back to source
return event.source
def _is_tracked_agent(actor: str) -> bool:
"""Check if an actor is a tracked agent."""
return actor.lower() in TRACKED_AGENTS
def _aggregate_metrics(events: list[Event]) -> dict[str, AgentMetrics]:
"""Aggregate metrics from events grouped by agent.
Args:
events: List of events to process
Returns:
Dict mapping agent_id -> AgentMetrics
"""
metrics_by_agent: dict[str, AgentMetrics] = {}
for event in events:
actor = _extract_actor_from_event(event)
# Skip non-agent events unless they explicitly have an agent_id
if not _is_tracked_agent(actor) and "agent_id" not in event.data:
continue
if actor not in metrics_by_agent:
metrics_by_agent[actor] = AgentMetrics(agent_id=actor)
metrics = metrics_by_agent[actor]
# Process based on event type
event_type = event.type
if event_type == "gitea.push":
metrics.commits += event.data.get("num_commits", 1)
elif event_type == "gitea.issue.opened":
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.issue.comment":
metrics.comments += 1
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.pull_request":
pr_num = event.data.get("pr_number", 0)
action = event.data.get("action", "")
merged = event.data.get("merged", False)
if pr_num:
if action == "opened":
metrics.prs_opened.add(pr_num)
elif action == "closed" and merged:
metrics.prs_merged.add(pr_num)
# Also count as touched issue for tracking
metrics.issues_touched.add(pr_num)
elif event_type == "agent.task.completed":
# Extract test files from task data
affected = event.data.get("tests_affected", [])
for test in affected:
metrics.tests_affected.add(test)
# Token rewards from task completion
reward = event.data.get("token_reward", 0)
if reward:
metrics.tokens_earned += reward
elif event_type == "test.execution":
# Track test files that were executed
test_files = event.data.get("test_files", [])
for test in test_files:
metrics.tests_affected.add(test)
return metrics_by_agent
def _query_token_transactions(agent_id: str, start: datetime, end: datetime) -> tuple[int, int]:
"""Query the lightning ledger for token transactions.
Args:
agent_id: The agent to query for
start: Period start
end: Period end
Returns:
Tuple of (tokens_earned, tokens_spent)
"""
try:
from lightning.ledger import get_transactions
transactions = get_transactions(limit=1000)
earned = 0
spent = 0
for tx in transactions:
# Filter by agent if specified
if tx.agent_id and tx.agent_id != agent_id:
continue
# Filter by timestamp
try:
tx_time = datetime.fromisoformat(tx.created_at.replace("Z", "+00:00"))
if not (start <= tx_time < end):
continue
except (ValueError, AttributeError):
continue
if tx.tx_type.value == "incoming":
earned += tx.amount_sats
else:
spent += tx.amount_sats
return earned, spent
except Exception as exc:
logger.debug("Failed to query token transactions: %s", exc)
return 0, 0
def _generate_narrative_bullets(metrics: AgentMetrics, period_type: PeriodType) -> list[str]:
"""Generate narrative summary bullets for a scorecard.
Args:
metrics: The agent's metrics
period_type: daily or weekly
Returns:
List of narrative bullet points
"""
bullets: list[str] = []
period_label = "day" if period_type == PeriodType.daily else "week"
# Activity summary
activities = []
if metrics.commits:
activities.append(f"{metrics.commits} commit{'s' if metrics.commits != 1 else ''}")
if len(metrics.prs_opened):
activities.append(
f"{len(metrics.prs_opened)} PR{'s' if len(metrics.prs_opened) != 1 else ''} opened"
)
if len(metrics.prs_merged):
activities.append(
f"{len(metrics.prs_merged)} PR{'s' if len(metrics.prs_merged) != 1 else ''} merged"
)
if len(metrics.issues_touched):
activities.append(
f"{len(metrics.issues_touched)} issue{'s' if len(metrics.issues_touched) != 1 else ''} touched"
)
if metrics.comments:
activities.append(f"{metrics.comments} comment{'s' if metrics.comments != 1 else ''}")
if activities:
bullets.append(f"Active across {', '.join(activities)} this {period_label}.")
# Test activity
if len(metrics.tests_affected):
bullets.append(
f"Affected {len(metrics.tests_affected)} test file{'s' if len(metrics.tests_affected) != 1 else ''}."
)
# Token summary
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if metrics.tokens_earned or metrics.tokens_spent:
if net_tokens > 0:
bullets.append(
f"Net earned {net_tokens} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
elif net_tokens < 0:
bullets.append(
f"Net spent {abs(net_tokens)} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
else:
bullets.append(
f"Balanced token flow ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
# Handle empty case
if not bullets:
bullets.append(f"No recorded activity this {period_label}.")
return bullets
def _detect_patterns(metrics: AgentMetrics) -> list[str]:
"""Detect interesting patterns in agent behavior.
Args:
metrics: The agent's metrics
Returns:
List of pattern descriptions
"""
patterns: list[str] = []
pr_opened = len(metrics.prs_opened)
merge_rate = metrics.pr_merge_rate
# Merge rate patterns
if pr_opened >= 3:
if merge_rate >= 0.8:
patterns.append("High merge rate with few failures — code quality focus.")
elif merge_rate <= 0.3:
patterns.append("Lots of noisy PRs, low merge rate — may need review support.")
# Activity patterns
if metrics.commits > 10 and pr_opened == 0:
patterns.append("High commit volume without PRs — working directly on main?")
if len(metrics.issues_touched) > 5 and metrics.comments == 0:
patterns.append("Touching many issues but low comment volume — silent worker.")
if metrics.comments > len(metrics.issues_touched) * 2:
patterns.append("Highly communicative — lots of discussion relative to work items.")
# Token patterns
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if net_tokens > 100:
patterns.append("Strong token accumulation — high value delivery.")
elif net_tokens < -50:
patterns.append("High token spend — may be in experimentation phase.")
return patterns
def generate_scorecard(
agent_id: str,
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> ScorecardSummary | None:
"""Generate a scorecard for a single agent.
Args:
agent_id: The agent to generate scorecard for
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
ScorecardSummary or None if agent has no activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect events
events = _collect_events_for_period(start, end, agent_id)
# Aggregate metrics
all_metrics = _aggregate_metrics(events)
# Get metrics for this specific agent
if agent_id not in all_metrics:
# Create empty metrics - still generate a scorecard
metrics = AgentMetrics(agent_id=agent_id)
else:
metrics = all_metrics[agent_id]
# Augment with token data from ledger
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
# Generate narrative and patterns
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
return ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
def generate_all_scorecards(
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> list[ScorecardSummary]:
"""Generate scorecards for all tracked agents.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
List of ScorecardSummary for all agents with activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect all events
events = _collect_events_for_period(start, end)
# Aggregate metrics for all agents
all_metrics = _aggregate_metrics(events)
# Include tracked agents even if no activity
for agent_id in TRACKED_AGENTS:
if agent_id not in all_metrics:
all_metrics[agent_id] = AgentMetrics(agent_id=agent_id)
# Generate scorecards
scorecards: list[ScorecardSummary] = []
for agent_id, metrics in all_metrics.items():
# Augment with token data
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
scorecard = ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
scorecards.append(scorecard)
# Sort by agent_id for consistent ordering
scorecards.sort(key=lambda s: s.agent_id)
return scorecards
def get_tracked_agents() -> list[str]:
"""Return the list of tracked agent IDs."""
return sorted(TRACKED_AGENTS)

View File

@@ -51,7 +51,6 @@
<a href="/thinking" class="mc-test-link mc-link-thinking">THINKING</a> <a href="/thinking" class="mc-test-link mc-link-thinking">THINKING</a>
<a href="/swarm/mission-control" class="mc-test-link">MISSION CTRL</a> <a href="/swarm/mission-control" class="mc-test-link">MISSION CTRL</a>
<a href="/swarm/live" class="mc-test-link">SWARM</a> <a href="/swarm/live" class="mc-test-link">SWARM</a>
<a href="/scorecards" class="mc-test-link">SCORECARDS</a>
<a href="/bugs" class="mc-test-link mc-link-bugs">BUGS</a> <a href="/bugs" class="mc-test-link mc-link-bugs">BUGS</a>
</div> </div>
</div> </div>
@@ -124,7 +123,6 @@
<a href="/thinking" class="mc-mobile-link">THINKING</a> <a href="/thinking" class="mc-mobile-link">THINKING</a>
<a href="/swarm/mission-control" class="mc-mobile-link">MISSION CONTROL</a> <a href="/swarm/mission-control" class="mc-mobile-link">MISSION CONTROL</a>
<a href="/swarm/live" class="mc-mobile-link">SWARM</a> <a href="/swarm/live" class="mc-mobile-link">SWARM</a>
<a href="/scorecards" class="mc-mobile-link">SCORECARDS</a>
<a href="/bugs" class="mc-mobile-link">BUGS</a> <a href="/bugs" class="mc-mobile-link">BUGS</a>
<div class="mc-mobile-section-label">INTELLIGENCE</div> <div class="mc-mobile-section-label">INTELLIGENCE</div>
<a href="/spark/ui" class="mc-mobile-link">SPARK</a> <a href="/spark/ui" class="mc-mobile-link">SPARK</a>

View File

@@ -1,113 +0,0 @@
{% extends "base.html" %}
{% block title %}Agent Scorecards - Timmy Time{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid py-4">
<!-- Header -->
<div class="d-flex justify-content-between align-items-center mb-4">
<div>
<h1 class="h3 mb-0">AGENT SCORECARDS</h1>
<p class="text-muted small mb-0">Track agent performance across issues, PRs, tests, and tokens</p>
</div>
<div class="d-flex gap-2">
<select id="period-select" class="form-select form-select-sm" style="width: auto;">
<option value="daily" selected>Daily</option>
<option value="weekly">Weekly</option>
</select>
<button class="btn btn-sm btn-primary" onclick="refreshScorecards()">
<span>Refresh</span>
</button>
</div>
</div>
<!-- Scorecards Grid -->
<div id="scorecards-container"
hx-get="/scorecards/all/panels?period=daily"
hx-trigger="load"
hx-swap="innerHTML">
<div class="text-center py-5">
<div class="spinner-border text-secondary" role="status">
<span class="visually-hidden">Loading...</span>
</div>
<p class="text-muted mt-2">Loading scorecards...</p>
</div>
</div>
<!-- API Reference -->
<div class="mt-5 pt-4 border-top">
<h5 class="text-muted">API Reference</h5>
<div class="row g-3">
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">List Tracked Agents</h6>
<code>GET /scorecards/api/agents</code>
<p class="small text-muted mt-2">Returns all tracked agent IDs</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">Get All Scorecards</h6>
<code>GET /scorecards/api?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns scorecards for all agents</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">Get Agent Scorecard</h6>
<code>GET /scorecards/api/{agent_id}?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns scorecard for a specific agent</p>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card mc-panel">
<div class="card-body">
<h6 class="card-title">HTML Panel (HTMX)</h6>
<code>GET /scorecards/panel/{agent_id}?period=daily|weekly</code>
<p class="small text-muted mt-2">Returns HTML panel for embedding</p>
</div>
</div>
</div>
</div>
</div>
</div>
<script>
// Period selector change handler
document.getElementById('period-select').addEventListener('change', function() {
refreshScorecards();
});
function refreshScorecards() {
var period = document.getElementById('period-select').value;
var container = document.getElementById('scorecards-container');
// Show loading state
container.innerHTML = `
<div class="text-center py-5">
<div class="spinner-border text-secondary" role="status">
<span class="visually-hidden">Loading...</span>
</div>
<p class="text-muted mt-2">Loading scorecards...</p>
</div>
`;
// Trigger HTMX request
htmx.ajax('GET', '/scorecards/all/panels?period=' + period, {
target: '#scorecards-container',
swap: 'innerHTML'
});
}
// Auto-refresh every 5 minutes
setInterval(refreshScorecards, 300000);
</script>
{% endblock %}

View File

@@ -1,29 +0,0 @@
"""World interface — engine-agnostic adapter pattern for embodied agents.
Provides the ``WorldInterface`` ABC and an adapter registry so Timmy can
observe, act, and speak in any game world (Morrowind, Luanti, Godot, …)
through a single contract.
Quick start::
from infrastructure.world import get_adapter, register_adapter
from infrastructure.world.interface import WorldInterface
register_adapter("mock", MockWorldAdapter)
world = get_adapter("mock")
perception = world.observe()
"""
from infrastructure.world.registry import AdapterRegistry
_registry = AdapterRegistry()
register_adapter = _registry.register
get_adapter = _registry.get
list_adapters = _registry.list_adapters
__all__ = [
"register_adapter",
"get_adapter",
"list_adapters",
]

View File

@@ -1 +0,0 @@
"""Built-in world adapters."""

View File

@@ -1,99 +0,0 @@
"""Mock world adapter — returns canned perception and logs commands.
Useful for testing the heartbeat loop and WorldInterface contract
without a running game server.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
@dataclass
class _ActionLog:
"""Record of an action dispatched to the mock world."""
command: CommandInput
timestamp: datetime
class MockWorldAdapter(WorldInterface):
"""In-memory mock adapter for testing.
* ``observe()`` returns configurable canned perception.
* ``act()`` logs the command and returns success.
* ``speak()`` logs the message.
Inspect ``action_log`` and ``speech_log`` to verify behaviour in tests.
"""
def __init__(
self,
*,
location: str = "Test Chamber",
entities: list[str] | None = None,
events: list[str] | None = None,
) -> None:
self._location = location
self._entities = entities or ["TestNPC"]
self._events = events or []
self._connected = False
self.action_log: list[_ActionLog] = []
self.speech_log: list[dict] = []
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
self._connected = True
logger.info("MockWorldAdapter connected")
def disconnect(self) -> None:
self._connected = False
logger.info("MockWorldAdapter disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract -----------------------------------------------------
def observe(self) -> PerceptionOutput:
logger.debug("MockWorldAdapter.observe()")
return PerceptionOutput(
timestamp=datetime.now(UTC),
location=self._location,
entities=list(self._entities),
events=list(self._events),
raw={"adapter": "mock"},
)
def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}",
data={"adapter": "mock"},
)
def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append(
{
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -1,58 +0,0 @@
"""TES3MP world adapter — stub for Morrowind multiplayer via TES3MP.
This adapter will eventually connect to a TES3MP server and translate
the WorldInterface contract into TES3MP commands. For now every method
raises ``NotImplementedError`` with guidance on what needs wiring up.
Once PR #864 merges, import PerceptionOutput and CommandInput directly
from ``infrastructure.morrowind.schemas`` if their shapes differ from
the canonical types in ``infrastructure.world.types``.
"""
from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)
class TES3MPWorldAdapter(WorldInterface):
"""Stub adapter for TES3MP (Morrowind multiplayer).
All core methods raise ``NotImplementedError``.
Implement ``connect()`` first — it should open a socket to the
TES3MP server and authenticate.
"""
def __init__(self, *, host: str = "localhost", port: int = 25565) -> None:
self._host = host
self._port = port
self._connected = False
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
def disconnect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError(
"TES3MPWorldAdapter.act() — translate CommandInput to TES3MP packet"
)
def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")

View File

@@ -1,64 +0,0 @@
"""Abstract WorldInterface — the contract every game-world adapter must fulfil.
Follows a Gymnasium-inspired pattern: observe → act → speak, with each
method returning strongly-typed data structures.
Any future engine (TES3MP, Luanti, Godot, …) plugs in by subclassing
``WorldInterface`` and implementing the three methods.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
class WorldInterface(ABC):
"""Engine-agnostic base class for world adapters.
Subclasses must implement:
- ``observe()`` — gather structured perception from the world
- ``act()`` — dispatch a command and return the outcome
- ``speak()`` — send a message to an NPC / player / broadcast
Lifecycle hooks ``connect()`` and ``disconnect()`` are optional.
"""
# -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None: # noqa: B027
"""Establish connection to the game world.
Default implementation is a no-op. Override to open sockets,
authenticate, etc.
"""
def disconnect(self) -> None: # noqa: B027
"""Tear down the connection.
Default implementation is a no-op.
"""
@property
def is_connected(self) -> bool:
"""Return ``True`` if the adapter has an active connection.
Default returns ``True``. Override for adapters that maintain
persistent connections.
"""
return True
# -- core contract (must implement) ------------------------------------
@abstractmethod
def observe(self) -> PerceptionOutput:
"""Return a structured snapshot of the current world state."""
@abstractmethod
def act(self, command: CommandInput) -> ActionResult:
"""Execute *command* in the world and return the result."""
@abstractmethod
def speak(self, message: str, target: str | None = None) -> None:
"""Send *message* in the world, optionally directed at *target*."""

View File

@@ -1,54 +0,0 @@
"""Adapter registry — register and instantiate world adapters by name.
Usage::
registry = AdapterRegistry()
registry.register("mock", MockWorldAdapter)
adapter = registry.get("mock", some_kwarg="value")
"""
from __future__ import annotations
import logging
from typing import Any
from infrastructure.world.interface import WorldInterface
logger = logging.getLogger(__name__)
class AdapterRegistry:
"""Name → WorldInterface class registry with instantiation."""
def __init__(self) -> None:
self._adapters: dict[str, type[WorldInterface]] = {}
def register(self, name: str, cls: type[WorldInterface]) -> None:
"""Register an adapter class under *name*.
Raises ``TypeError`` if *cls* is not a ``WorldInterface`` subclass.
"""
if not (isinstance(cls, type) and issubclass(cls, WorldInterface)):
raise TypeError(f"{cls!r} is not a WorldInterface subclass")
if name in self._adapters:
logger.warning("Overwriting adapter %r (was %r)", name, self._adapters[name])
self._adapters[name] = cls
logger.info("Registered world adapter: %s%s", name, cls.__name__)
def get(self, name: str, **kwargs: Any) -> WorldInterface:
"""Instantiate and return the adapter registered as *name*.
Raises ``KeyError`` if *name* is not registered.
"""
cls = self._adapters[name]
return cls(**kwargs)
def list_adapters(self) -> list[str]:
"""Return sorted list of registered adapter names."""
return sorted(self._adapters)
def __contains__(self, name: str) -> bool:
return name in self._adapters
def __len__(self) -> int:
return len(self._adapters)

View File

@@ -1,71 +0,0 @@
"""Canonical data types for world interaction.
These mirror the PerceptionOutput / CommandInput types from PR #864's
``morrowind/schemas.py``. When that PR merges, these can be replaced
with re-exports — but until then they serve as the stable contract for
every WorldInterface adapter.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world."""
SUCCESS = "success"
FAILURE = "failure"
PENDING = "pending"
NOOP = "noop"
@dataclass
class PerceptionOutput:
"""Structured world state returned by ``WorldInterface.observe()``.
Attributes:
timestamp: When the observation was captured.
location: Free-form location descriptor (e.g. "Balmora, Fighters Guild").
entities: List of nearby entity descriptions.
events: Recent game events since last observation.
raw: Optional raw / engine-specific payload for advanced consumers.
"""
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
location: str = ""
entities: list[str] = field(default_factory=list)
events: list[str] = field(default_factory=list)
raw: dict = field(default_factory=dict)
@dataclass
class CommandInput:
"""Action command sent via ``WorldInterface.act()``.
Attributes:
action: Verb / action name (e.g. "move", "attack", "use_item").
target: Optional target identifier.
parameters: Arbitrary key-value payload for engine-specific params.
"""
action: str
target: str | None = None
parameters: dict = field(default_factory=dict)
@dataclass
class ActionResult:
"""Outcome returned by ``WorldInterface.act()``.
Attributes:
status: Whether the action succeeded, failed, etc.
message: Human-readable description of the outcome.
data: Arbitrary engine-specific result payload.
"""
status: ActionStatus = ActionStatus.SUCCESS
message: str = ""
data: dict = field(default_factory=dict)

View File

@@ -1,286 +0,0 @@
"""Heartbeat v2 — WorldInterface-driven cognitive loop.
Drives real observe → reason → act → reflect cycles through whatever
``WorldInterface`` adapter is connected. When no adapter is present,
gracefully falls back to the existing ``run_cycle()`` behaviour.
Usage::
heartbeat = Heartbeat(world=adapter, interval=30.0)
await heartbeat.run_once() # single cycle
await heartbeat.start() # background loop
heartbeat.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from loop.phase1_gather import gather
from loop.phase2_reason import reason
from loop.phase3_act import act
from loop.schema import ContextPayload
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle log entry
# ---------------------------------------------------------------------------
@dataclass
class CycleRecord:
"""One observe → reason → act → reflect cycle."""
cycle_id: int
timestamp: str
observation: dict = field(default_factory=dict)
reasoning_summary: str = ""
action_taken: str = ""
action_status: str = ""
reflect_notes: str = ""
duration_ms: int = 0
# ---------------------------------------------------------------------------
# Heartbeat
# ---------------------------------------------------------------------------
class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter.
Parameters
----------
world:
A ``WorldInterface`` instance (or ``None`` for passive mode).
interval:
Seconds between heartbeat ticks. 30 s for embodied mode,
300 s (5 min) for passive thinking.
on_cycle:
Optional async callback invoked after each cycle with the
``CycleRecord``.
"""
def __init__(
self,
*,
world=None, # WorldInterface | None
interval: float = 30.0,
on_cycle=None, # Callable[[CycleRecord], Awaitable[None]] | None
) -> None:
self._world = world
self._interval = interval
self._on_cycle = on_cycle
self._cycle_count: int = 0
self._running = False
self._task: asyncio.Task | None = None
self.history: list[CycleRecord] = []
# -- properties --------------------------------------------------------
@property
def world(self):
return self._world
@world.setter
def world(self, adapter) -> None:
self._world = adapter
@property
def interval(self) -> float:
return self._interval
@interval.setter
def interval(self, value: float) -> None:
self._interval = max(1.0, value)
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
# -- single cycle ------------------------------------------------------
async def run_once(self) -> CycleRecord:
"""Execute one full heartbeat cycle.
If a world adapter is present:
1. Observe — ``world.observe()``
2. Gather + Reason + Act via the three-phase loop, with the
observation injected into the payload
3. Dispatch the decided action back to ``world.act()``
4. Reflect — log the cycle
Without an adapter the existing loop runs on a timer-sourced
payload (passive thinking).
"""
self._cycle_count += 1
start = time.monotonic()
record = CycleRecord(
cycle_id=self._cycle_count,
timestamp=datetime.now(UTC).isoformat(),
)
if self._world is not None:
record = await self._embodied_cycle(record)
else:
record = await self._passive_cycle(record)
record.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
if self._on_cycle:
await self._on_cycle(record)
logger.info(
"Heartbeat cycle #%d complete (%d ms) — action=%s status=%s",
record.cycle_id,
record.duration_ms,
record.action_taken or "(passive)",
record.action_status or "n/a",
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring heartbeat loop as a background task."""
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.current_task() or asyncio.ensure_future(self._loop())
if self._task is not asyncio.current_task():
return
await self._loop()
async def _loop(self) -> None:
logger.info(
"Heartbeat loop started (interval=%.1fs, adapter=%s)",
self._interval,
type(self._world).__name__ if self._world else "None",
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Heartbeat cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the heartbeat loop to stop after the current cycle."""
self._running = False
logger.info("Heartbeat stop requested")
# -- internal: embodied cycle ------------------------------------------
async def _embodied_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle with a live world adapter: observe → reason → act → reflect."""
from infrastructure.world.types import ActionStatus, CommandInput
# 1. Observe
perception = self._world.observe()
record.observation = {
"location": perception.location,
"entities": perception.entities,
"events": perception.events,
}
# 2. Feed observation into the three-phase loop
obs_content = (
f"Location: {perception.location}\n"
f"Entities: {', '.join(perception.entities)}\n"
f"Events: {', '.join(perception.events)}"
)
payload = ContextPayload(
source="world",
content=obs_content,
metadata={"perception": record.observation},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
# Extract action decision from the acted payload
action_name = acted.metadata.get("action", "idle")
action_target = acted.metadata.get("action_target")
action_params = acted.metadata.get("action_params", {})
record.reasoning_summary = acted.metadata.get("reasoning", acted.content[:200])
# 3. Dispatch action to world
if action_name != "idle":
cmd = CommandInput(
action=action_name,
target=action_target,
parameters=action_params,
)
result = self._world.act(cmd)
record.action_taken = action_name
record.action_status = result.status.value
else:
record.action_taken = "idle"
record.action_status = ActionStatus.NOOP.value
# 4. Reflect
record.reflect_notes = (
f"Observed {len(perception.entities)} entities at {perception.location}. "
f"Action: {record.action_taken}{record.action_status}."
)
return record
# -- internal: passive cycle -------------------------------------------
async def _passive_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle without a world adapter — existing think_once() behaviour."""
payload = ContextPayload(
source="timer",
content="heartbeat",
metadata={"mode": "passive"},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
record.reasoning_summary = acted.content[:200]
record.action_taken = "think"
record.action_status = "noop"
record.reflect_notes = "Passive thinking cycle — no world adapter connected."
return record
# -- broadcast ---------------------------------------------------------
async def _broadcast(self, record: CycleRecord) -> None:
"""Emit heartbeat cycle data via WebSocket (best-effort)."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"heartbeat.cycle",
{
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc)

View File

@@ -17,9 +17,9 @@ logger = logging.getLogger(__name__)
def gather(payload: ContextPayload) -> ContextPayload: def gather(payload: ContextPayload) -> ContextPayload:
"""Accept raw input and return structured context for reasoning. """Accept raw input and return structured context for reasoning.
When the payload carries a ``perception`` dict in metadata (injected by Stub: tags the payload with phase=gather and logs transit.
the heartbeat loop from a WorldInterface adapter), that observation is Timmy will flesh this out with context selection, memory lookup,
folded into the gathered context. Otherwise behaves as before. adapter polling, and attention-residual weighting.
""" """
logger.info( logger.info(
"Phase 1 (Gather) received: source=%s content_len=%d tokens=%d", "Phase 1 (Gather) received: source=%s content_len=%d tokens=%d",
@@ -28,20 +28,7 @@ def gather(payload: ContextPayload) -> ContextPayload:
payload.token_count, payload.token_count,
) )
extra: dict = {"phase": "gather", "gathered": True} result = payload.with_metadata(phase="gather", gathered=True)
# Enrich with world observation when present
perception = payload.metadata.get("perception")
if perception:
extra["world_observation"] = perception
logger.info(
"Phase 1 (Gather) world observation: location=%s entities=%d events=%d",
perception.get("location", "?"),
len(perception.get("entities", [])),
len(perception.get("events", [])),
)
result = payload.with_metadata(**extra)
logger.info( logger.info(
"Phase 1 (Gather) produced: metadata_keys=%s", "Phase 1 (Gather) produced: metadata_keys=%s",

View File

@@ -14,8 +14,6 @@ from dataclasses import dataclass, field
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Paths # Paths
@@ -30,7 +28,7 @@ def get_connection() -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(str(DB_PATH))) as conn: with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") conn.execute("PRAGMA busy_timeout=5000")
_ensure_schema(conn) _ensure_schema(conn)
yield conn yield conn

View File

@@ -20,7 +20,6 @@ from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from pathlib import Path from pathlib import Path
from config import settings
from timmy.memory.embeddings import ( from timmy.memory.embeddings import (
EMBEDDING_DIM, EMBEDDING_DIM,
EMBEDDING_MODEL, # noqa: F401 — re-exported for backward compatibility EMBEDDING_MODEL, # noqa: F401 — re-exported for backward compatibility
@@ -112,7 +111,7 @@ def get_connection() -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(str(DB_PATH))) as conn: with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") conn.execute("PRAGMA busy_timeout=5000")
_ensure_schema(conn) _ensure_schema(conn)
yield conn yield conn
@@ -950,7 +949,7 @@ class SemanticMemory:
with closing(sqlite3.connect(str(self.db_path))) as conn: with closing(sqlite3.connect(str(self.db_path))) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") conn.execute("PRAGMA busy_timeout=5000")
# Ensure schema exists # Ensure schema exists
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS memories ( CREATE TABLE IF NOT EXISTS memories (

View File

@@ -24,9 +24,6 @@ from config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
# Lazy imports to handle test mocking # Lazy imports to handle test mocking
_ImportError = None _ImportError = None
try: try:
@@ -450,6 +447,7 @@ def consult_grok(query: str) -> str:
) )
except (ImportError, AttributeError) as exc: except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc) logger.warning("Tool execution failed (consult_grok logging): %s", exc)
pass
# Generate Lightning invoice for monetization (unless free mode) # Generate Lightning invoice for monetization (unless free mode)
invoice_info = "" invoice_info = ""
@@ -458,11 +456,12 @@ def consult_grok(query: str) -> str:
from lightning.factory import get_backend as get_ln_backend from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend() ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap) sats = min(settings.grok_max_sats_per_query, 100)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}") inv = ln.create_invoice(sats, f"Grok query: {query[:50]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]" invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc: except (ImportError, OSError, ValueError) as exc:
logger.warning("Tool execution failed (Lightning invoice): %s", exc) logger.warning("Tool execution failed (Lightning invoice): %s", exc)
pass
result = backend.run(query) result = backend.run(query)
@@ -941,7 +940,7 @@ def _merge_catalog(
"available_in": available_in, "available_in": available_in,
} }
except ImportError: except ImportError:
logger.debug("Optional catalog %s.%s not available", module_path, attr_name) pass
def get_all_available_tools() -> dict[str, dict]: def get_all_available_tools() -> dict[str, dict]:

View File

@@ -20,74 +20,74 @@
<line x1="12" y1="8" x2="12.01" y2="8"></line> <line x1="12" y1="8" x2="12.01" y2="8"></line>
</svg> </svg>
</button> </button>
<button id="submit-job-btn" class="submit-job-button" aria-label="Submit Job" title="Submit Job"> <button id="fund-btn" class="fund-button" aria-label="Fund Session" title="Fund Session">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"> <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<path d="M12 5v14M5 12h14"></path> <path d="M12 2v20M2 12h20"></path>
</svg> </svg>
<span>Job</span>
</button> </button>
<div id="speech-area"> <div id="speech-area">
<div class="bubble" id="speech-bubble"></div> <div class="bubble" id="speech-bubble"></div>
</div> </div>
</div> </div>
<!-- Submit Job Modal --> <!-- Fund Session Modal -->
<div id="submit-job-modal" class="submit-job-modal"> <div id="fund-modal" class="fund-modal">
<div class="submit-job-content"> <div class="fund-modal-content">
<button id="submit-job-close" class="submit-job-close" aria-label="Close"> <button id="fund-close" class="fund-close" aria-label="Close">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"> <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<line x1="18" y1="6" x2="6" y2="18"></line> <line x1="18" y1="6" x2="6" y2="18"></line>
<line x1="6" y1="6" x2="18" y2="18"></line> <line x1="6" y1="6" x2="18" y2="18"></line>
</svg> </svg>
</button> </button>
<h2>Submit Job</h2> <h2>Fund Session</h2>
<p class="submit-job-subtitle">Create a task for Timmy and the agent swarm</p>
<form id="submit-job-form" class="submit-job-form"> <section class="fund-info">
<div class="form-group"> <h3>⚡ What are Sats?</h3>
<label for="job-title">Title <span class="required">*</span></label> <p><strong>Sats</strong> (satoshis) are the smallest unit of Bitcoin—like cents to a dollar. There are 100 million sats in 1 bitcoin. They enable tiny payments perfect for AI interactions.</p>
<input type="text" id="job-title" name="title" placeholder="Brief description of the task" maxlength="200"> </section>
<div class="char-count" id="title-char-count">0 / 200</div>
<div class="validation-error" id="title-error"></div>
</div>
<div class="form-group">
<label for="job-description">Description</label>
<textarea id="job-description" name="description" placeholder="Detailed instructions, requirements, and context..." rows="6" maxlength="2000"></textarea>
<div class="char-count" id="desc-char-count">0 / 2000</div>
<div class="validation-warning" id="desc-warning"></div>
<div class="validation-error" id="desc-error"></div>
</div>
<div class="form-group">
<label for="job-priority">Priority</label>
<select id="job-priority" name="priority">
<option value="low">Low</option>
<option value="medium" selected>Medium</option>
<option value="high">High</option>
<option value="urgent">Urgent</option>
</select>
</div>
<div class="submit-job-actions">
<button type="button" id="cancel-job-btn" class="btn-secondary">Cancel</button>
<button type="submit" id="submit-job-submit" class="btn-primary" disabled>Submit Job</button>
</div>
</form>
<div id="submit-job-success" class="submit-job-success hidden"> <section class="fund-info">
<div class="success-icon"> <h3>🛠️ Why Fund Your Session?</h3>
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"> <p>Your sats power the Workshop AI agents. When you fund a session:</p>
<path d="M22 11.08V12a10 10 0 1 1-5.93-9.14"></path> <ul>
<polyline points="22 4 12 14.01 9 11.01"></polyline> <li>Timmy and the agent swarm can process your requests</li>
</svg> <li>You get priority access to compute resources</li>
<li>Agents are compensated for their work</li>
</ul>
</section>
<section class="fund-info">
<h3>💰 Approximate Costs</h3>
<div class="cost-table">
<div class="cost-row">
<span>Simple chat message</span>
<span class="cost-value">~10-50 sats</span>
</div>
<div class="cost-row">
<span>Code generation task</span>
<span class="cost-value">~100-500 sats</span>
</div>
<div class="cost-row">
<span>Complex multi-agent job</span>
<span class="cost-value">~1,000-5,000 sats</span>
</div>
</div> </div>
<h3>Job Submitted!</h3> <p class="cost-note">Costs vary based on model and complexity. Unused sats remain in your balance.</p>
<p>Your task has been added to the queue. Timmy will review it shortly.</p> </section>
<button type="button" id="submit-another-btn" class="btn-primary">Submit Another</button>
<div class="fund-actions">
<div class="fund-input-group">
<label for="fund-amount">Amount (sats)</label>
<input type="number" id="fund-amount" class="fund-input" placeholder="1000" min="100" step="100">
</div>
<button id="fund-submit" class="fund-submit-btn">Fund Session</button>
</div>
<div class="fund-footer">
<span>⚡ Lightning Network · No subscriptions · Pay as you go</span>
</div> </div>
</div> </div>
<div id="submit-job-backdrop" class="submit-job-backdrop"></div> <div id="fund-backdrop" class="fund-backdrop"></div>
</div> </div>
<!-- About Panel --> <!-- About Panel -->
@@ -184,6 +184,50 @@
}); });
stateReader.connect(); stateReader.connect();
// --- Fund Session Modal ---
const fundBtn = document.getElementById("fund-btn");
const fundModal = document.getElementById("fund-modal");
const fundClose = document.getElementById("fund-close");
const fundBackdrop = document.getElementById("fund-backdrop");
const fundSubmit = document.getElementById("fund-submit");
const fundAmount = document.getElementById("fund-amount");
function openFundModal() {
fundModal.classList.add("open");
document.body.style.overflow = "hidden";
// Focus the input when opening
setTimeout(() => fundAmount.focus(), 100);
}
function closeFundModal() {
fundModal.classList.remove("open");
document.body.style.overflow = "";
}
function handleFundSubmit() {
const amount = parseInt(fundAmount.value, 10);
if (!amount || amount < 100) {
alert("Please enter a valid amount (minimum 100 sats)");
return;
}
// TODO: Integrate with Lightning payment API
console.log("Funding session with", amount, "sats");
alert("Lightning payment integration coming soon! Amount: " + amount + " sats");
closeFundModal();
}
fundBtn.addEventListener("click", openFundModal);
fundClose.addEventListener("click", closeFundModal);
fundBackdrop.addEventListener("click", closeFundModal);
fundSubmit.addEventListener("click", handleFundSubmit);
// Allow Enter key to submit
fundAmount.addEventListener("keypress", (e) => {
if (e.key === "Enter") {
handleFundSubmit();
}
});
// --- About Panel --- // --- About Panel ---
const infoBtn = document.getElementById("info-btn"); const infoBtn = document.getElementById("info-btn");
const aboutPanel = document.getElementById("about-panel"); const aboutPanel = document.getElementById("about-panel");
@@ -206,246 +250,12 @@
// Close on Escape key // Close on Escape key
document.addEventListener("keydown", (e) => { document.addEventListener("keydown", (e) => {
if (e.key === "Escape" && aboutPanel.classList.contains("open")) { if (e.key === "Escape") {
closeAboutPanel(); if (fundModal.classList.contains("open")) {
} closeFundModal();
}); } else if (aboutPanel.classList.contains("open")) {
closeAboutPanel();
// --- Submit Job Modal ---
const submitJobBtn = document.getElementById("submit-job-btn");
const submitJobModal = document.getElementById("submit-job-modal");
const submitJobClose = document.getElementById("submit-job-close");
const submitJobBackdrop = document.getElementById("submit-job-backdrop");
const cancelJobBtn = document.getElementById("cancel-job-btn");
const submitJobForm = document.getElementById("submit-job-form");
const submitJobSubmit = document.getElementById("submit-job-submit");
const jobTitle = document.getElementById("job-title");
const jobDescription = document.getElementById("job-description");
const titleCharCount = document.getElementById("title-char-count");
const descCharCount = document.getElementById("desc-char-count");
const titleError = document.getElementById("title-error");
const descError = document.getElementById("desc-error");
const descWarning = document.getElementById("desc-warning");
const submitJobSuccess = document.getElementById("submit-job-success");
const submitAnotherBtn = document.getElementById("submit-another-btn");
// Constants
const MAX_TITLE_LENGTH = 200;
const MAX_DESC_LENGTH = 2000;
const TITLE_WARNING_THRESHOLD = 150;
const DESC_WARNING_THRESHOLD = 1800;
function openSubmitJobModal() {
submitJobModal.classList.add("open");
document.body.style.overflow = "hidden";
jobTitle.focus();
validateForm();
}
function closeSubmitJobModal() {
submitJobModal.classList.remove("open");
document.body.style.overflow = "";
// Reset form after animation
setTimeout(() => {
resetForm();
}, 300);
}
function resetForm() {
submitJobForm.reset();
submitJobForm.classList.remove("hidden");
submitJobSuccess.classList.add("hidden");
updateCharCounts();
clearErrors();
validateForm();
}
function clearErrors() {
titleError.textContent = "";
titleError.classList.remove("visible");
descError.textContent = "";
descError.classList.remove("visible");
descWarning.textContent = "";
descWarning.classList.remove("visible");
jobTitle.classList.remove("error");
jobDescription.classList.remove("error");
}
function updateCharCounts() {
const titleLen = jobTitle.value.length;
const descLen = jobDescription.value.length;
titleCharCount.textContent = `${titleLen} / ${MAX_TITLE_LENGTH}`;
descCharCount.textContent = `${descLen} / ${MAX_DESC_LENGTH}`;
// Update color based on thresholds
if (titleLen > MAX_TITLE_LENGTH) {
titleCharCount.classList.add("over-limit");
} else if (titleLen > TITLE_WARNING_THRESHOLD) {
titleCharCount.classList.add("near-limit");
titleCharCount.classList.remove("over-limit");
} else {
titleCharCount.classList.remove("near-limit", "over-limit");
}
if (descLen > MAX_DESC_LENGTH) {
descCharCount.classList.add("over-limit");
} else if (descLen > DESC_WARNING_THRESHOLD) {
descCharCount.classList.add("near-limit");
descCharCount.classList.remove("over-limit");
} else {
descCharCount.classList.remove("near-limit", "over-limit");
}
}
function validateTitle() {
const value = jobTitle.value.trim();
const length = jobTitle.value.length;
if (length > MAX_TITLE_LENGTH) {
titleError.textContent = `Title must be ${MAX_TITLE_LENGTH} characters or less`;
titleError.classList.add("visible");
jobTitle.classList.add("error");
return false;
}
if (value === "") {
titleError.textContent = "Title is required";
titleError.classList.add("visible");
jobTitle.classList.add("error");
return false;
}
titleError.textContent = "";
titleError.classList.remove("visible");
jobTitle.classList.remove("error");
return true;
}
function validateDescription() {
const length = jobDescription.value.length;
if (length > MAX_DESC_LENGTH) {
descError.textContent = `Description must be ${MAX_DESC_LENGTH} characters or less`;
descError.classList.add("visible");
descWarning.textContent = "";
descWarning.classList.remove("visible");
jobDescription.classList.add("error");
return false;
}
// Show warning when near limit
if (length > DESC_WARNING_THRESHOLD && length <= MAX_DESC_LENGTH) {
const remaining = MAX_DESC_LENGTH - length;
descWarning.textContent = `${remaining} characters remaining`;
descWarning.classList.add("visible");
} else {
descWarning.textContent = "";
descWarning.classList.remove("visible");
}
descError.textContent = "";
descError.classList.remove("visible");
jobDescription.classList.remove("error");
return true;
}
function validateForm() {
const titleValid = jobTitle.value.trim() !== "" && jobTitle.value.length <= MAX_TITLE_LENGTH;
const descValid = jobDescription.value.length <= MAX_DESC_LENGTH;
submitJobSubmit.disabled = !(titleValid && descValid);
}
// Event listeners
submitJobBtn.addEventListener("click", openSubmitJobModal);
submitJobClose.addEventListener("click", closeSubmitJobModal);
submitJobBackdrop.addEventListener("click", closeSubmitJobModal);
cancelJobBtn.addEventListener("click", closeSubmitJobModal);
submitAnotherBtn.addEventListener("click", resetForm);
// Input event listeners for real-time validation
jobTitle.addEventListener("input", () => {
updateCharCounts();
validateForm();
if (titleError.classList.contains("visible")) {
validateTitle();
}
});
jobTitle.addEventListener("blur", () => {
if (jobTitle.value.trim() !== "" || titleError.classList.contains("visible")) {
validateTitle();
}
});
jobDescription.addEventListener("input", () => {
updateCharCounts();
validateForm();
if (descError.classList.contains("visible")) {
validateDescription();
}
});
jobDescription.addEventListener("blur", () => {
validateDescription();
});
// Form submission
submitJobForm.addEventListener("submit", async (e) => {
e.preventDefault();
const isTitleValid = validateTitle();
const isDescValid = validateDescription();
if (!isTitleValid || !isDescValid) {
return;
}
// Disable submit button while processing
submitJobSubmit.disabled = true;
submitJobSubmit.textContent = "Submitting...";
const formData = {
title: jobTitle.value.trim(),
description: jobDescription.value.trim(),
priority: document.getElementById("job-priority").value,
submitted_at: new Date().toISOString()
};
try {
// Submit to API
const response = await fetch("/api/tasks", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(formData)
});
if (response.ok) {
// Show success state
submitJobForm.classList.add("hidden");
submitJobSuccess.classList.remove("hidden");
} else {
const errorData = await response.json().catch(() => ({}));
descError.textContent = errorData.detail || "Failed to submit job. Please try again.";
descError.classList.add("visible");
} }
} catch (error) {
// For demo/development, show success even if API fails
submitJobForm.classList.add("hidden");
submitJobSuccess.classList.remove("hidden");
} finally {
submitJobSubmit.disabled = false;
submitJobSubmit.textContent = "Submit Job";
}
});
// Close on Escape key for Submit Job Modal
document.addEventListener("keydown", (e) => {
if (e.key === "Escape" && submitJobModal.classList.contains("open")) {
closeSubmitJobModal();
} }
}); });

View File

@@ -92,7 +92,7 @@ canvas {
.info-button { .info-button {
position: absolute; position: absolute;
top: 14px; top: 14px;
right: 36px; right: 70px;
width: 28px; width: 28px;
height: 28px; height: 28px;
padding: 0; padding: 0;
@@ -119,6 +119,286 @@ canvas {
height: 16px; height: 16px;
} }
/* Fund Session button */
.fund-button {
position: absolute;
top: 14px;
right: 36px;
width: 28px;
height: 28px;
padding: 0;
background: rgba(10, 10, 20, 0.7);
border: 1px solid rgba(0, 180, 80, 0.4);
border-radius: 50%;
color: #00b450;
cursor: pointer;
pointer-events: auto;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.fund-button:hover {
background: rgba(0, 180, 80, 0.15);
border-color: rgba(0, 180, 80, 0.7);
transform: scale(1.05);
}
.fund-button svg {
width: 16px;
height: 16px;
}
/* Fund Session Modal */
.fund-modal {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
z-index: 100;
pointer-events: none;
visibility: hidden;
opacity: 0;
transition: opacity 0.3s ease, visibility 0.3s ease;
}
.fund-modal.open {
pointer-events: auto;
visibility: visible;
opacity: 1;
}
.fund-modal-content {
position: absolute;
top: 0;
right: 0;
width: 420px;
max-width: 90%;
height: 100%;
background: rgba(10, 10, 20, 0.97);
border-left: 1px solid rgba(0, 180, 80, 0.3);
padding: 60px 24px 24px 24px;
overflow-y: auto;
transform: translateX(100%);
transition: transform 0.3s ease;
box-shadow: -4px 0 20px rgba(0, 0, 0, 0.5);
}
.fund-modal.open .fund-modal-content {
transform: translateX(0);
}
.fund-close {
position: absolute;
top: 16px;
right: 16px;
width: 32px;
height: 32px;
padding: 0;
background: transparent;
border: 1px solid rgba(160, 160, 160, 0.3);
border-radius: 50%;
color: #aaa;
cursor: pointer;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.fund-close:hover {
background: rgba(255, 255, 255, 0.1);
border-color: rgba(0, 180, 80, 0.5);
color: #00b450;
}
.fund-close svg {
width: 18px;
height: 18px;
}
.fund-modal-content h2 {
font-size: 20px;
color: #00b450;
margin-bottom: 24px;
font-weight: 600;
}
.fund-info {
margin-bottom: 24px;
}
.fund-info h3 {
font-size: 14px;
color: #e0e0e0;
margin-bottom: 10px;
font-weight: 600;
}
.fund-info p {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 10px;
}
.fund-info ul {
list-style: none;
padding: 0;
margin: 0;
}
.fund-info li {
font-size: 13px;
line-height: 1.6;
color: #aaa;
margin-bottom: 8px;
padding-left: 16px;
position: relative;
}
.fund-info li::before {
content: "•";
position: absolute;
left: 0;
color: #00b450;
}
.fund-info li strong {
color: #ccc;
}
/* Cost table */
.cost-table {
background: rgba(0, 0, 0, 0.3);
border: 1px solid rgba(0, 180, 80, 0.2);
border-radius: 8px;
padding: 12px;
margin-bottom: 10px;
}
.cost-row {
display: flex;
justify-content: space-between;
padding: 8px 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
font-size: 13px;
}
.cost-row:last-child {
border-bottom: none;
}
.cost-row span:first-child {
color: #aaa;
}
.cost-value {
color: #00b450;
font-weight: 600;
}
.cost-note {
font-size: 12px;
color: #666;
font-style: italic;
margin-top: 8px;
}
/* Fund actions */
.fund-actions {
margin-top: 32px;
padding: 20px;
background: rgba(0, 0, 0, 0.3);
border: 1px solid rgba(0, 180, 80, 0.2);
border-radius: 8px;
}
.fund-input-group {
margin-bottom: 16px;
}
.fund-input-group label {
display: block;
font-size: 13px;
color: #ccc;
margin-bottom: 6px;
}
.fund-input {
width: 100%;
padding: 10px 12px;
background: rgba(255, 255, 255, 0.05);
border: 1px solid rgba(0, 180, 80, 0.3);
border-radius: 6px;
color: #e0e0e0;
font-family: "Courier New", monospace;
font-size: 16px;
transition: all 0.2s ease;
}
.fund-input:focus {
outline: none;
border-color: rgba(0, 180, 80, 0.6);
background: rgba(255, 255, 255, 0.08);
}
.fund-input::placeholder {
color: #666;
}
.fund-submit-btn {
width: 100%;
padding: 12px;
background: linear-gradient(135deg, rgba(0, 180, 80, 0.8), rgba(0, 140, 60, 0.9));
border: none;
border-radius: 6px;
color: #fff;
font-family: "Courier New", monospace;
font-size: 14px;
font-weight: 600;
cursor: pointer;
transition: all 0.2s ease;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.fund-submit-btn:hover {
background: linear-gradient(135deg, rgba(0, 200, 90, 0.9), rgba(0, 160, 70, 1));
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(0, 180, 80, 0.3);
}
.fund-submit-btn:active {
transform: translateY(0);
}
.fund-footer {
margin-top: 24px;
padding-top: 16px;
border-top: 1px solid rgba(160, 160, 160, 0.2);
font-size: 12px;
color: #666;
text-align: center;
}
.fund-backdrop {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.5);
opacity: 0;
transition: opacity 0.3s ease;
}
.fund-modal.open .fund-backdrop {
opacity: 1;
}
/* About Panel */ /* About Panel */
.about-panel { .about-panel {
position: fixed; position: fixed;
@@ -263,357 +543,17 @@ canvas {
opacity: 1; opacity: 1;
} }
/* Submit Job Button */
.submit-job-button {
position: absolute;
top: 14px;
right: 72px;
height: 28px;
padding: 0 12px;
background: rgba(10, 10, 20, 0.7);
border: 1px solid rgba(0, 180, 80, 0.4);
border-radius: 14px;
color: #00b450;
cursor: pointer;
pointer-events: auto;
transition: all 0.2s ease;
display: flex;
align-items: center;
gap: 6px;
font-family: "Courier New", monospace;
font-size: 12px;
}
.submit-job-button:hover {
background: rgba(0, 180, 80, 0.15);
border-color: rgba(0, 180, 80, 0.7);
transform: scale(1.05);
}
.submit-job-button svg {
width: 14px;
height: 14px;
}
/* Submit Job Modal */
.submit-job-modal {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
z-index: 100;
pointer-events: none;
visibility: hidden;
opacity: 0;
transition: opacity 0.3s ease, visibility 0.3s ease;
}
.submit-job-modal.open {
pointer-events: auto;
visibility: visible;
opacity: 1;
}
.submit-job-content {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%) scale(0.95);
width: 480px;
max-width: 90%;
max-height: 90vh;
background: rgba(10, 10, 20, 0.98);
border: 1px solid rgba(218, 165, 32, 0.3);
border-radius: 12px;
padding: 32px;
overflow-y: auto;
transition: transform 0.3s ease;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.6);
}
.submit-job-modal.open .submit-job-content {
transform: translate(-50%, -50%) scale(1);
}
.submit-job-close {
position: absolute;
top: 16px;
right: 16px;
width: 32px;
height: 32px;
padding: 0;
background: transparent;
border: 1px solid rgba(160, 160, 160, 0.3);
border-radius: 50%;
color: #aaa;
cursor: pointer;
transition: all 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
}
.submit-job-close:hover {
background: rgba(255, 255, 255, 0.1);
border-color: rgba(218, 165, 32, 0.5);
color: #daa520;
}
.submit-job-close svg {
width: 18px;
height: 18px;
}
.submit-job-content h2 {
font-size: 22px;
color: #daa520;
margin: 0 0 8px 0;
font-weight: 600;
}
.submit-job-subtitle {
font-size: 13px;
color: #888;
margin: 0 0 24px 0;
}
/* Form Styles */
.submit-job-form {
display: flex;
flex-direction: column;
gap: 20px;
}
.submit-job-form.hidden {
display: none;
}
.form-group {
display: flex;
flex-direction: column;
gap: 8px;
}
.form-group label {
font-size: 13px;
color: #ccc;
font-weight: 500;
}
.form-group label .required {
color: #ff4444;
margin-left: 4px;
}
.form-group input,
.form-group textarea,
.form-group select {
background: rgba(30, 30, 40, 0.8);
border: 1px solid rgba(160, 160, 160, 0.3);
border-radius: 6px;
padding: 10px 12px;
color: #e0e0e0;
font-family: "Courier New", monospace;
font-size: 14px;
transition: border-color 0.2s ease, box-shadow 0.2s ease;
}
.form-group input:focus,
.form-group textarea:focus,
.form-group select:focus {
outline: none;
border-color: rgba(218, 165, 32, 0.6);
box-shadow: 0 0 0 2px rgba(218, 165, 32, 0.1);
}
.form-group input.error,
.form-group textarea.error {
border-color: #ff4444;
box-shadow: 0 0 0 2px rgba(255, 68, 68, 0.1);
}
.form-group input::placeholder,
.form-group textarea::placeholder {
color: #666;
}
.form-group textarea {
resize: vertical;
min-height: 100px;
}
.form-group select {
cursor: pointer;
appearance: none;
background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 24 24' fill='none' stroke='%23888' stroke-width='2'%3E%3Cpath d='m6 9 6 6 6-6'/%3E%3C/svg%3E");
background-repeat: no-repeat;
background-position: right 12px center;
padding-right: 36px;
}
.form-group select option {
background: #1a1a2e;
color: #e0e0e0;
}
/* Character Count */
.char-count {
font-size: 11px;
color: #666;
text-align: right;
margin-top: 4px;
transition: color 0.2s ease;
}
.char-count.near-limit {
color: #ffaa33;
}
.char-count.over-limit {
color: #ff4444;
font-weight: bold;
}
/* Validation Messages */
.validation-error {
font-size: 12px;
color: #ff4444;
margin-top: 4px;
min-height: 16px;
opacity: 0;
transition: opacity 0.2s ease;
}
.validation-error.visible {
opacity: 1;
}
.validation-warning {
font-size: 12px;
color: #ffaa33;
margin-top: 4px;
min-height: 16px;
opacity: 0;
transition: opacity 0.2s ease;
}
.validation-warning.visible {
opacity: 1;
}
/* Action Buttons */
.submit-job-actions {
display: flex;
gap: 12px;
justify-content: flex-end;
margin-top: 8px;
}
.btn-secondary {
padding: 10px 20px;
background: transparent;
border: 1px solid rgba(160, 160, 160, 0.4);
border-radius: 6px;
color: #aaa;
font-family: "Courier New", monospace;
font-size: 14px;
cursor: pointer;
transition: all 0.2s ease;
}
.btn-secondary:hover {
background: rgba(255, 255, 255, 0.05);
border-color: rgba(160, 160, 160, 0.6);
color: #ccc;
}
.btn-primary {
padding: 10px 20px;
background: linear-gradient(135deg, rgba(0, 180, 80, 0.8), rgba(0, 140, 60, 0.9));
border: 1px solid rgba(0, 180, 80, 0.5);
border-radius: 6px;
color: #fff;
font-family: "Courier New", monospace;
font-size: 14px;
cursor: pointer;
transition: all 0.2s ease;
}
.btn-primary:hover:not(:disabled) {
background: linear-gradient(135deg, rgba(0, 200, 90, 0.9), rgba(0, 160, 70, 1));
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(0, 180, 80, 0.3);
}
.btn-primary:disabled {
background: rgba(100, 100, 100, 0.3);
border-color: rgba(100, 100, 100, 0.3);
color: #666;
cursor: not-allowed;
}
/* Success State */
.submit-job-success {
text-align: center;
padding: 32px 16px;
}
.submit-job-success.hidden {
display: none;
}
.success-icon {
width: 64px;
height: 64px;
margin: 0 auto 20px;
color: #00b450;
}
.success-icon svg {
width: 100%;
height: 100%;
}
.submit-job-success h3 {
font-size: 20px;
color: #00b450;
margin: 0 0 12px 0;
}
.submit-job-success p {
font-size: 14px;
color: #888;
margin: 0 0 24px 0;
line-height: 1.5;
}
/* Backdrop */
.submit-job-backdrop {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.6);
opacity: 0;
transition: opacity 0.3s ease;
}
.submit-job-modal.open .submit-job-backdrop {
opacity: 1;
}
/* Mobile adjustments */ /* Mobile adjustments */
@media (max-width: 480px) { @media (max-width: 480px) {
.about-panel-content { .about-panel-content,
.fund-modal-content {
width: 100%; width: 100%;
max-width: 100%; max-width: 100%;
padding: 56px 20px 20px 20px; padding: 56px 20px 20px 20px;
} }
.info-button { .info-button {
right: 32px; right: 66px;
width: 26px; width: 26px;
height: 26px; height: 26px;
} }
@@ -623,33 +563,14 @@ canvas {
height: 14px; height: 14px;
} }
.submit-job-button { .fund-button {
right: 64px; right: 32px;
width: 26px;
height: 26px; height: 26px;
padding: 0 10px;
font-size: 11px;
} }
.submit-job-button svg { .fund-button svg {
width: 12px; width: 14px;
height: 12px; height: 14px;
}
.submit-job-content {
width: 95%;
padding: 24px 20px;
}
.submit-job-content h2 {
font-size: 20px;
}
.submit-job-actions {
flex-direction: column-reverse;
}
.btn-secondary,
.btn-primary {
width: 100%;
} }
} }

View File

@@ -1,680 +0,0 @@
"""Tests for agent scorecard functionality."""
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock, patch
from dashboard.services.scorecard_service import (
AgentMetrics,
PeriodType,
ScorecardSummary,
_aggregate_metrics,
_detect_patterns,
_extract_actor_from_event,
_generate_narrative_bullets,
_get_period_bounds,
_is_tracked_agent,
_query_token_transactions,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from infrastructure.events.bus import Event
class TestPeriodBounds:
"""Test period boundary calculations."""
def test_daily_period_bounds(self):
"""Test daily period returns correct 24-hour window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.daily, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 20, 0, 0, 0, tzinfo=UTC)
assert (end - start) == timedelta(days=1)
def test_weekly_period_bounds(self):
"""Test weekly period returns correct 7-day window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.weekly, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 14, 0, 0, 0, tzinfo=UTC)
assert (end - start) == timedelta(days=7)
def test_default_reference_date(self):
"""Test default reference date uses current time."""
start, end = _get_period_bounds(PeriodType.daily)
now = datetime.now(UTC)
# End should be start of current day (midnight)
expected_end = now.replace(hour=0, minute=0, second=0, microsecond=0)
assert end == expected_end
# Start should be 24 hours before end
assert (end - start) == timedelta(days=1)
class TestTrackedAgents:
"""Test agent tracking functions."""
def test_get_tracked_agents(self):
"""Test get_tracked_agents returns sorted list."""
agents = get_tracked_agents()
assert isinstance(agents, list)
assert "kimi" in agents
assert "claude" in agents
assert "gemini" in agents
assert "hermes" in agents
assert "manus" in agents
assert agents == sorted(agents)
def test_is_tracked_agent_true(self):
"""Test _is_tracked_agent returns True for tracked agents."""
assert _is_tracked_agent("kimi") is True
assert _is_tracked_agent("KIMI") is True # case insensitive
assert _is_tracked_agent("claude") is True
assert _is_tracked_agent("hermes") is True
def test_is_tracked_agent_false(self):
"""Test _is_tracked_agent returns False for untracked agents."""
assert _is_tracked_agent("unknown") is False
assert _is_tracked_agent("rockachopa") is False
assert _is_tracked_agent("") is False
class TestExtractActor:
"""Test actor extraction from events."""
def test_extract_from_actor_field(self):
"""Test extraction from data.actor field."""
event = Event(type="test", source="system", data={"actor": "kimi"})
assert _extract_actor_from_event(event) == "kimi"
def test_extract_from_agent_id_field(self):
"""Test extraction from data.agent_id field."""
event = Event(type="test", source="system", data={"agent_id": "claude"})
assert _extract_actor_from_event(event) == "claude"
def test_extract_from_source_fallback(self):
"""Test fallback to event.source."""
event = Event(type="test", source="gemini", data={})
assert _extract_actor_from_event(event) == "gemini"
def test_actor_priority_over_agent_id(self):
"""Test actor field takes priority over agent_id."""
event = Event(type="test", source="system", data={"actor": "kimi", "agent_id": "claude"})
assert _extract_actor_from_event(event) == "kimi"
class TestAggregateMetrics:
"""Test metrics aggregation from events."""
def test_empty_events(self):
"""Test aggregation with no events returns empty dict."""
result = _aggregate_metrics([])
assert result == {}
def test_push_event_aggregation(self):
"""Test push events aggregate commits correctly."""
events = [
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 3}),
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 2}),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert result["kimi"].commits == 5
def test_issue_opened_aggregation(self):
"""Test issue opened events aggregate correctly."""
events = [
Event(
type="gitea.issue.opened",
source="gitea",
data={"actor": "claude", "issue_number": 100},
),
Event(
type="gitea.issue.opened",
source="gitea",
data={"actor": "claude", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
assert "claude" in result
assert len(result["claude"].issues_touched) == 2
assert 100 in result["claude"].issues_touched
assert 101 in result["claude"].issues_touched
def test_comment_aggregation(self):
"""Test comment events aggregate correctly."""
events = [
Event(
type="gitea.issue.comment",
source="gitea",
data={"actor": "gemini", "issue_number": 100},
),
Event(
type="gitea.issue.comment",
source="gitea",
data={"actor": "gemini", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
assert "gemini" in result
assert result["gemini"].comments == 2
assert len(result["gemini"].issues_touched) == 2 # Comments touch issues too
def test_pr_events_aggregation(self):
"""Test PR open and merge events aggregate correctly."""
events = [
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 50, "action": "opened"},
),
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 50, "action": "closed", "merged": True},
),
Event(
type="gitea.pull_request",
source="gitea",
data={"actor": "kimi", "pr_number": 51, "action": "opened"},
),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].prs_opened) == 2
assert len(result["kimi"].prs_merged) == 1
assert 50 in result["kimi"].prs_merged
def test_untracked_agent_filtered(self):
"""Test events from untracked agents are filtered out."""
events = [
Event(
type="gitea.push", source="gitea", data={"actor": "rockachopa", "num_commits": 5}
),
]
result = _aggregate_metrics(events)
assert "rockachopa" not in result
def test_task_completion_aggregation(self):
"""Test task completion events aggregate test files."""
events = [
Event(
type="agent.task.completed",
source="gitea",
data={
"agent_id": "kimi",
"tests_affected": ["test_foo.py", "test_bar.py"],
"token_reward": 10,
},
),
]
result = _aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].tests_affected) == 2
assert "test_foo.py" in result["kimi"].tests_affected
assert result["kimi"].tokens_earned == 10
class TestAgentMetrics:
"""Test AgentMetrics class."""
def test_merge_rate_zero_prs(self):
"""Test merge rate is 0 when no PRs opened."""
metrics = AgentMetrics(agent_id="kimi")
assert metrics.pr_merge_rate == 0.0
def test_merge_rate_perfect(self):
"""Test 100% merge rate calculation."""
metrics = AgentMetrics(agent_id="kimi", prs_opened={1, 2, 3}, prs_merged={1, 2, 3})
assert metrics.pr_merge_rate == 1.0
def test_merge_rate_partial(self):
"""Test partial merge rate calculation."""
metrics = AgentMetrics(agent_id="kimi", prs_opened={1, 2, 3, 4}, prs_merged={1, 2})
assert metrics.pr_merge_rate == 0.5
class TestDetectPatterns:
"""Test pattern detection logic."""
def test_high_merge_rate_pattern(self):
"""Test detection of high merge rate pattern."""
metrics = AgentMetrics(
agent_id="kimi",
prs_opened={1, 2, 3, 4, 5},
prs_merged={1, 2, 3, 4}, # 80% merge rate
)
patterns = _detect_patterns(metrics)
assert any("High merge rate" in p for p in patterns)
def test_low_merge_rate_pattern(self):
"""Test detection of low merge rate pattern."""
metrics = AgentMetrics(
agent_id="kimi",
prs_opened={1, 2, 3, 4, 5},
prs_merged={1}, # 20% merge rate
)
patterns = _detect_patterns(metrics)
assert any("low merge rate" in p for p in patterns)
def test_high_commits_no_prs_pattern(self):
"""Test detection of direct-to-main commits pattern."""
metrics = AgentMetrics(
agent_id="kimi",
commits=15,
prs_opened=set(),
)
patterns = _detect_patterns(metrics)
assert any("High commit volume without PRs" in p for p in patterns)
def test_silent_worker_pattern(self):
"""Test detection of silent worker pattern."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2, 3, 4, 5, 6},
comments=0,
)
patterns = _detect_patterns(metrics)
assert any("silent worker" in p for p in patterns)
def test_communicative_pattern(self):
"""Test detection of highly communicative pattern."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2}, # 2 issues
comments=10, # 5x comments per issue
)
patterns = _detect_patterns(metrics)
assert any("Highly communicative" in p for p in patterns)
def test_token_accumulation_pattern(self):
"""Test detection of token accumulation pattern."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=150,
tokens_spent=10,
)
patterns = _detect_patterns(metrics)
assert any("Strong token accumulation" in p for p in patterns)
def test_token_spend_pattern(self):
"""Test detection of high token spend pattern."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=10,
tokens_spent=100,
)
patterns = _detect_patterns(metrics)
assert any("High token spend" in p for p in patterns)
class TestGenerateNarrative:
"""Test narrative bullet generation."""
def test_empty_metrics_narrative(self):
"""Test narrative for empty metrics mentions no activity."""
metrics = AgentMetrics(agent_id="kimi")
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert len(bullets) == 1
assert "No recorded activity" in bullets[0]
def test_activity_summary_narrative(self):
"""Test narrative includes activity summary."""
metrics = AgentMetrics(
agent_id="kimi",
commits=5,
prs_opened={1, 2},
prs_merged={1},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
activity_bullet = next((b for b in bullets if "Active across" in b), None)
assert activity_bullet is not None
assert "5 commits" in activity_bullet
assert "2 PRs opened" in activity_bullet
assert "1 PR merged" in activity_bullet
def test_tests_affected_narrative(self):
"""Test narrative includes tests affected."""
metrics = AgentMetrics(
agent_id="kimi",
tests_affected={"test_a.py", "test_b.py"},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("2 test files" in b for b in bullets)
def test_tokens_earned_narrative(self):
"""Test narrative includes token earnings."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=100,
tokens_spent=20,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net earned 80 tokens" in b for b in bullets)
def test_tokens_spent_narrative(self):
"""Test narrative includes token spending."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=20,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net spent 80 tokens" in b for b in bullets)
def test_balanced_tokens_narrative(self):
"""Test narrative for balanced token flow."""
metrics = AgentMetrics(
agent_id="kimi",
tokens_earned=100,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Balanced token flow" in b for b in bullets)
class TestScorecardSummary:
"""Test ScorecardSummary dataclass."""
def test_to_dict_structure(self):
"""Test to_dict returns expected structure."""
metrics = AgentMetrics(
agent_id="kimi",
issues_touched={1, 2},
prs_opened={10, 11},
prs_merged={10},
tokens_earned=100,
tokens_spent=20,
)
summary = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=metrics,
narrative_bullets=["Test bullet"],
patterns=["Test pattern"],
)
data = summary.to_dict()
assert data["agent_id"] == "kimi"
assert data["period_type"] == "daily"
assert "metrics" in data
assert data["metrics"]["issues_touched"] == 2
assert data["metrics"]["prs_opened"] == 2
assert data["metrics"]["prs_merged"] == 1
assert data["metrics"]["pr_merge_rate"] == 0.5
assert data["metrics"]["tokens_earned"] == 100
assert data["metrics"]["token_net"] == 80
assert data["narrative_bullets"] == ["Test bullet"]
assert data["patterns"] == ["Test pattern"]
class TestQueryTokenTransactions:
"""Test token transaction querying."""
def test_empty_ledger(self):
"""Test empty ledger returns zero values."""
with patch("lightning.ledger.get_transactions", return_value=[]):
earned, spent = _query_token_transactions("kimi", datetime.now(UTC), datetime.now(UTC))
assert earned == 0
assert spent == 0
def test_ledger_with_transactions(self):
"""Test ledger aggregation of transactions."""
now = datetime.now(UTC)
mock_tx = [
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=now.isoformat(),
),
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="outgoing"),
amount_sats=30,
created_at=now.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 100
assert spent == 30
def test_ledger_filters_by_agent(self):
"""Test ledger filters transactions by agent_id."""
now = datetime.now(UTC)
mock_tx = [
MagicMock(
agent_id="claude",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=now.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was for claude, not kimi
def test_ledger_filters_by_time(self):
"""Test ledger filters transactions by time range."""
now = datetime.now(UTC)
old_time = now - timedelta(days=2)
mock_tx = [
MagicMock(
agent_id="kimi",
tx_type=MagicMock(value="incoming"),
amount_sats=100,
created_at=old_time.isoformat(),
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
# Query for today only
earned, spent = _query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was 2 days ago
class TestGenerateScorecard:
"""Test scorecard generation."""
def test_generate_scorecard_no_activity(self):
"""Test scorecard generation for agent with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
assert scorecard is not None
assert scorecard.agent_id == "kimi"
assert scorecard.period_type == PeriodType.daily
assert len(scorecard.narrative_bullets) == 1
assert "No recorded activity" in scorecard.narrative_bullets[0]
def test_generate_scorecard_with_activity(self):
"""Test scorecard generation includes activity."""
events = [
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 5}),
]
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=events
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(100, 20),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
assert scorecard is not None
assert scorecard.metrics.commits == 5
assert scorecard.metrics.tokens_earned == 100
assert scorecard.metrics.tokens_spent == 20
class TestGenerateAllScorecards:
"""Test generating scorecards for all agents."""
def test_generates_for_all_tracked_agents(self):
"""Test all tracked agents get scorecards even with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)
agent_ids = {s.agent_id for s in scorecards}
expected = {"kimi", "claude", "gemini", "hermes", "manus"}
assert expected.issubset(agent_ids)
def test_scorecards_sorted(self):
"""Test scorecards are sorted by agent_id."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)
agent_ids = [s.agent_id for s in scorecards]
assert agent_ids == sorted(agent_ids)
class TestScorecardRoutes:
"""Test scorecard API routes."""
def test_list_agents_endpoint(self, client):
"""Test GET /scorecards/api/agents returns tracked agents."""
response = client.get("/scorecards/api/agents")
assert response.status_code == 200
data = response.json()
assert "agents" in data
assert "kimi" in data["agents"]
assert "claude" in data["agents"]
def test_get_scorecard_endpoint(self, client):
"""Test GET /scorecards/api/{agent_id} returns scorecard."""
with patch("dashboard.routes.scorecards.generate_scorecard") as mock_generate:
mock_generate.return_value = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=["Test bullet"],
patterns=[],
)
response = client.get("/scorecards/api/kimi?period=daily")
assert response.status_code == 200
data = response.json()
assert data["agent_id"] == "kimi"
assert data["period_type"] == "daily"
def test_get_scorecard_invalid_period(self, client):
"""Test GET with invalid period returns 400."""
response = client.get("/scorecards/api/kimi?period=invalid")
assert response.status_code == 400
assert "error" in response.json()
def test_get_all_scorecards_endpoint(self, client):
"""Test GET /scorecards/api returns all scorecards."""
with patch("dashboard.routes.scorecards.generate_all_scorecards") as mock_generate:
mock_generate.return_value = [
ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=[],
patterns=[],
),
]
response = client.get("/scorecards/api?period=daily")
assert response.status_code == 200
data = response.json()
assert data["period"] == "daily"
assert "scorecards" in data
assert len(data["scorecards"]) == 1
def test_scorecards_page_renders(self, client):
"""Test GET /scorecards returns HTML page."""
response = client.get("/scorecards")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
assert "AGENT SCORECARDS" in response.text
def test_scorecard_panel_renders(self, client):
"""Test GET /scorecards/panel/{agent_id} returns HTML."""
with patch("dashboard.routes.scorecards.generate_scorecard") as mock_generate:
mock_generate.return_value = ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi", commits=5),
narrative_bullets=["Active across 5 commits this day."],
patterns=["High activity"],
)
response = client.get("/scorecards/panel/kimi?period=daily")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")
assert "Kimi" in response.text
def test_all_panels_renders(self, client):
"""Test GET /scorecards/all/panels returns HTML with all panels."""
with patch("dashboard.routes.scorecards.generate_all_scorecards") as mock_generate:
mock_generate.return_value = [
ScorecardSummary(
agent_id="kimi",
period_type=PeriodType.daily,
period_start=datetime.now(UTC),
period_end=datetime.now(UTC),
metrics=AgentMetrics(agent_id="kimi"),
narrative_bullets=[],
patterns=[],
),
]
response = client.get("/scorecards/all/panels?period=daily")
assert response.status_code == 200
assert "text/html" in response.headers.get("content-type", "")

View File

@@ -1,129 +0,0 @@
"""Tests for the WorldInterface contract and type system."""
import pytest
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
# ---------------------------------------------------------------------------
# Type construction
# ---------------------------------------------------------------------------
class TestPerceptionOutput:
def test_defaults(self):
p = PerceptionOutput()
assert p.location == ""
assert p.entities == []
assert p.events == []
assert p.raw == {}
assert p.timestamp is not None
def test_custom_values(self):
p = PerceptionOutput(
location="Balmora",
entities=["Guard", "Merchant"],
events=["door_opened"],
)
assert p.location == "Balmora"
assert len(p.entities) == 2
assert "door_opened" in p.events
class TestCommandInput:
def test_minimal(self):
c = CommandInput(action="move")
assert c.action == "move"
assert c.target is None
assert c.parameters == {}
def test_with_target_and_params(self):
c = CommandInput(action="attack", target="Rat", parameters={"weapon": "sword"})
assert c.target == "Rat"
assert c.parameters["weapon"] == "sword"
class TestActionResult:
def test_defaults(self):
r = ActionResult()
assert r.status == ActionStatus.SUCCESS
assert r.message == ""
def test_failure(self):
r = ActionResult(status=ActionStatus.FAILURE, message="blocked")
assert r.status == ActionStatus.FAILURE
class TestActionStatus:
def test_values(self):
assert ActionStatus.SUCCESS.value == "success"
assert ActionStatus.FAILURE.value == "failure"
assert ActionStatus.PENDING.value == "pending"
assert ActionStatus.NOOP.value == "noop"
# ---------------------------------------------------------------------------
# Abstract contract
# ---------------------------------------------------------------------------
class TestWorldInterfaceContract:
"""Verify the ABC cannot be instantiated directly."""
def test_cannot_instantiate(self):
with pytest.raises(TypeError):
WorldInterface()
def test_subclass_must_implement_observe(self):
class Incomplete(WorldInterface):
def act(self, command):
pass
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_act(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_speak(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
with pytest.raises(TypeError):
Incomplete()
def test_complete_subclass_instantiates(self):
class Complete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
def speak(self, message, target=None):
pass
adapter = Complete()
assert adapter.is_connected is True # default
assert isinstance(adapter.observe(), PerceptionOutput)
assert isinstance(adapter.act(CommandInput(action="test")), ActionResult)

View File

@@ -1,80 +0,0 @@
"""Tests for the MockWorldAdapter — full observe/act/speak cycle."""
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus, CommandInput, PerceptionOutput
class TestMockWorldAdapter:
def test_observe_returns_perception(self):
adapter = MockWorldAdapter(location="Vivec")
perception = adapter.observe()
assert isinstance(perception, PerceptionOutput)
assert perception.location == "Vivec"
assert perception.raw == {"adapter": "mock"}
def test_observe_entities(self):
adapter = MockWorldAdapter(entities=["Jiub", "Silt Strider"])
perception = adapter.observe()
assert perception.entities == ["Jiub", "Silt Strider"]
def test_act_logs_command(self):
adapter = MockWorldAdapter()
cmd = CommandInput(action="move", target="north")
result = adapter.act(cmd)
assert result.status == ActionStatus.SUCCESS
assert "move" in result.message
assert len(adapter.action_log) == 1
assert adapter.action_log[0].command.action == "move"
def test_act_multiple_commands(self):
adapter = MockWorldAdapter()
adapter.act(CommandInput(action="attack"))
adapter.act(CommandInput(action="defend"))
adapter.act(CommandInput(action="retreat"))
assert len(adapter.action_log) == 3
def test_speak_logs_message(self):
adapter = MockWorldAdapter()
adapter.speak("Hello, traveler!")
assert len(adapter.speech_log) == 1
assert adapter.speech_log[0]["message"] == "Hello, traveler!"
assert adapter.speech_log[0]["target"] is None
def test_speak_with_target(self):
adapter = MockWorldAdapter()
adapter.speak("Die, scum!", target="Cliff Racer")
assert adapter.speech_log[0]["target"] == "Cliff Racer"
def test_lifecycle(self):
adapter = MockWorldAdapter()
assert adapter.is_connected is False
adapter.connect()
assert adapter.is_connected is True
adapter.disconnect()
assert adapter.is_connected is False
def test_full_observe_act_speak_cycle(self):
"""Acceptance criterion: full observe/act/speak cycle passes."""
adapter = MockWorldAdapter(
location="Seyda Neen",
entities=["Fargoth", "Hrisskar"],
events=["quest_started"],
)
adapter.connect()
# Observe
perception = adapter.observe()
assert perception.location == "Seyda Neen"
assert len(perception.entities) == 2
assert "quest_started" in perception.events
# Act
result = adapter.act(CommandInput(action="talk", target="Fargoth"))
assert result.status == ActionStatus.SUCCESS
# Speak
adapter.speak("Where is your ring, Fargoth?", target="Fargoth")
assert len(adapter.speech_log) == 1
adapter.disconnect()
assert adapter.is_connected is False

View File

@@ -1,68 +0,0 @@
"""Tests for the adapter registry."""
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.registry import AdapterRegistry
class TestAdapterRegistry:
def test_register_and_get(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock")
assert isinstance(adapter, MockWorldAdapter)
def test_register_with_kwargs(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock", location="Custom Room")
assert adapter._location == "Custom Room"
def test_get_unknown_raises(self):
reg = AdapterRegistry()
with pytest.raises(KeyError):
reg.get("nonexistent")
def test_register_non_subclass_raises(self):
reg = AdapterRegistry()
with pytest.raises(TypeError):
reg.register("bad", dict)
def test_list_adapters(self):
reg = AdapterRegistry()
reg.register("beta", MockWorldAdapter)
reg.register("alpha", MockWorldAdapter)
assert reg.list_adapters() == ["alpha", "beta"]
def test_contains(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
assert "mock" in reg
assert "other" not in reg
def test_len(self):
reg = AdapterRegistry()
assert len(reg) == 0
reg.register("mock", MockWorldAdapter)
assert len(reg) == 1
def test_overwrite_warns(self, caplog):
import logging
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
with caplog.at_level(logging.WARNING):
reg.register("mock", MockWorldAdapter)
assert "Overwriting" in caplog.text
class TestModuleLevelRegistry:
"""Test the convenience functions in infrastructure.world.__init__."""
def test_register_and_get(self):
from infrastructure.world import get_adapter, register_adapter
register_adapter("test_mock", MockWorldAdapter)
adapter = get_adapter("test_mock")
assert isinstance(adapter, MockWorldAdapter)

View File

@@ -1,44 +0,0 @@
"""Tests for the TES3MP stub adapter."""
import pytest
from infrastructure.world.adapters.tes3mp import TES3MPWorldAdapter
from infrastructure.world.types import CommandInput
class TestTES3MPStub:
"""Acceptance criterion: stub imports cleanly and raises NotImplementedError."""
def test_instantiates(self):
adapter = TES3MPWorldAdapter(host="127.0.0.1", port=25565)
assert adapter._host == "127.0.0.1"
assert adapter._port == 25565
def test_is_connected_default_false(self):
adapter = TES3MPWorldAdapter()
assert adapter.is_connected is False
def test_connect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="connect"):
adapter.connect()
def test_disconnect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="disconnect"):
adapter.disconnect()
def test_observe_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="observe"):
adapter.observe()
def test_act_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="act"):
adapter.act(CommandInput(action="move"))
def test_speak_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="speak"):
adapter.speak("Hello")

View File

@@ -58,55 +58,6 @@ class TestDetectIssueFromBranch:
assert mod.detect_issue_from_branch() is None assert mod.detect_issue_from_branch() is None
class TestConsumeOnce:
"""cycle_result.json must be deleted after reading."""
def test_cycle_result_deleted_after_read(self, mod, tmp_path):
"""After _load_cycle_result() data is consumed in main(), the file is deleted."""
result_file = tmp_path / "cycle_result.json"
result_file.write_text('{"issue": 42, "type": "bug"}')
with (
patch.object(mod, "CYCLE_RESULT_FILE", result_file),
patch.object(mod, "RETRO_FILE", tmp_path / "retro" / "cycles.jsonl"),
patch.object(mod, "SUMMARY_FILE", tmp_path / "retro" / "summary.json"),
patch.object(mod, "EPOCH_COUNTER_FILE", tmp_path / "retro" / ".epoch_counter"),
patch(
"sys.argv",
["cycle_retro", "--cycle", "1", "--success", "--main-green", "--duration", "60"],
),
):
mod.main()
assert not result_file.exists(), "cycle_result.json should be deleted after consumption"
def test_cycle_result_not_deleted_when_empty(self, mod, tmp_path):
"""If cycle_result.json doesn't exist, no error occurs."""
result_file = tmp_path / "nonexistent_result.json"
with (
patch.object(mod, "CYCLE_RESULT_FILE", result_file),
patch.object(mod, "RETRO_FILE", tmp_path / "retro" / "cycles.jsonl"),
patch.object(mod, "SUMMARY_FILE", tmp_path / "retro" / "summary.json"),
patch.object(mod, "EPOCH_COUNTER_FILE", tmp_path / "retro" / ".epoch_counter"),
patch(
"sys.argv",
[
"cycle_retro",
"--cycle",
"1",
"--success",
"--main-green",
"--duration",
"60",
"--issue",
"10",
],
),
):
mod.main() # Should not raise
class TestBackfillExtractIssueNumber: class TestBackfillExtractIssueNumber:
"""Tests for backfill_retro.extract_issue_number PR-number filtering.""" """Tests for backfill_retro.extract_issue_number PR-number filtering."""

View File

@@ -1,176 +0,0 @@
"""Tests for Heartbeat v2 — WorldInterface-driven cognitive loop.
Acceptance criteria:
- With MockWorldAdapter: heartbeat runs, logs show observe→reason→act→reflect
- Without adapter: existing think_once() behaviour unchanged
- WebSocket broadcasts include current action and reasoning summary
"""
from unittest.mock import AsyncMock, patch
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus
from loop.heartbeat import CycleRecord, Heartbeat
@pytest.fixture
def mock_adapter():
adapter = MockWorldAdapter(
location="Balmora",
entities=["Guard", "Merchant"],
events=["player_entered"],
)
adapter.connect()
return adapter
class TestHeartbeatWithAdapter:
"""With MockWorldAdapter: heartbeat runs full embodied cycle."""
@pytest.mark.asyncio
async def test_run_once_returns_cycle_record(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert isinstance(record, CycleRecord)
assert record.cycle_id == 1
@pytest.mark.asyncio
async def test_observation_populated(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.observation["location"] == "Balmora"
assert "Guard" in record.observation["entities"]
assert "player_entered" in record.observation["events"]
@pytest.mark.asyncio
async def test_action_dispatched_to_world(self, mock_adapter):
"""Act phase should dispatch to world.act() for non-idle actions."""
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
# The default loop phases don't set an explicit action, so it
# falls through to "idle" → NOOP. That's correct behaviour —
# the real LLM-powered reason phase will set action metadata.
assert record.action_status in (
ActionStatus.NOOP.value,
ActionStatus.SUCCESS.value,
)
@pytest.mark.asyncio
async def test_reflect_notes_present(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert "Balmora" in record.reflect_notes
@pytest.mark.asyncio
async def test_cycle_count_increments(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
await hb.run_once()
assert hb.cycle_count == 2
assert len(hb.history) == 2
@pytest.mark.asyncio
async def test_duration_recorded(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.duration_ms >= 0
@pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter):
received = []
async def callback(record):
received.append(record)
hb = Heartbeat(world=mock_adapter, on_cycle=callback)
await hb.run_once()
assert len(received) == 1
assert received[0].cycle_id == 1
class TestHeartbeatWithoutAdapter:
"""Without adapter: existing think_once() behaviour unchanged."""
@pytest.mark.asyncio
async def test_passive_cycle(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.action_taken == "think"
assert record.action_status == "noop"
assert "Passive" in record.reflect_notes
@pytest.mark.asyncio
async def test_passive_no_observation(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.observation == {}
class TestHeartbeatLifecycle:
def test_interval_property(self):
hb = Heartbeat(interval=60.0)
assert hb.interval == 60.0
hb.interval = 10.0
assert hb.interval == 10.0
def test_interval_minimum(self):
hb = Heartbeat()
hb.interval = 0.1
assert hb.interval == 1.0
def test_world_property(self):
hb = Heartbeat()
assert hb.world is None
adapter = MockWorldAdapter()
hb.world = adapter
assert hb.world is adapter
def test_stop_sets_flag(self):
hb = Heartbeat()
assert not hb.is_running
hb.stop()
assert not hb.is_running
class TestHeartbeatBroadcast:
"""WebSocket broadcasts include action and reasoning summary."""
@pytest.mark.asyncio
async def test_broadcast_called(self, mock_adapter):
with patch(
"loop.heartbeat.ws_manager",
create=True,
) as mock_ws:
mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat
with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
ws_mod.broadcast.assert_called_once()
call_args = ws_mod.broadcast.call_args
assert call_args[0][0] == "heartbeat.cycle"
data = call_args[0][1]
assert "action" in data
assert "reasoning_summary" in data
assert "observation" in data
class TestHeartbeatLog:
"""Verify logging of observe→reason→act→reflect cycle."""
@pytest.mark.asyncio
async def test_embodied_cycle_logs(self, mock_adapter, caplog):
import logging
with caplog.at_level(logging.INFO):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
messages = caplog.text
assert "Phase 1 (Gather)" in messages
assert "Phase 2 (Reason)" in messages
assert "Phase 3 (Act)" in messages
assert "Heartbeat cycle #1 complete" in messages

View File

@@ -60,17 +60,8 @@ class TestGetToken:
assert token == "file-token-456" assert token == "file-token-456"
def test_returns_none_when_no_token(self, monkeypatch): def test_returns_none_when_no_token(self):
"""Return None when no token available.""" """Return None when no token available."""
# Prevent repo-root .timmy_gitea_token fallback from leaking real token
_orig_exists = Path.exists
def _exists_no_timmy(self):
if self.name == ".timmy_gitea_token":
return False
return _orig_exists(self)
monkeypatch.setattr(Path, "exists", _exists_no_timmy)
config = {"token_file": "/nonexistent/path"} config = {"token_file": "/nonexistent/path"}
token = hs.get_token(config) token = hs.get_token(config)

View File

@@ -1,232 +0,0 @@
# Timmy Automations Backlog Organization
**Date:** 2026-03-21
**Issue:** #720 - Refine and group Timmy Automations backlog
**Organized by:** Kimi agent
---
## Summary
The Timmy Automations backlog has been organized into **10 milestones** grouping related work into coherent iterations. This document serves as the authoritative reference for milestone purposes and issue assignments.
---
## Milestones Overview
| Milestone | Issues | Due Date | Description |
|-----------|--------|----------|-------------|
| **Automation Hub v1** | 2 open | 2026-04-10 | Core automation infrastructure - Timmy Automations module, orchestration, and workflow management |
| **Daily Run v1** | 8 open | 2026-04-15 | First iteration of the Daily Run automation system - 10-minute ritual, agenda generation, and focus presets |
| **Infrastructure** | 3 open | 2026-04-15 | Infrastructure and deployment tasks - DNS, SSL, VPS, and DevOps |
| **Dashboard v1** | 0 open | 2026-04-20 | Mission Control dashboard enhancements - Daily Run metrics, triage visibility, and agent scorecards |
| **Inbox & Focus v1** | 1 open | 2026-04-25 | Unified inbox view for Timmy - issue triage, focus management, and work selection |
| **Token Economy v1** | 4 open | 2026-04-30 | Token-based reward system for agents - rules, scorecards, quests, and adaptive rewards |
| **Code Hygiene** | 14 open | 2026-04-30 | Code quality improvements - tests, docstrings, refactoring, and hardcoded value extraction |
| **Matrix Staging** | 19 open | 2026-04-05 | The Matrix 3D world staging deployment - UI fixes, WebSocket, Workshop integration |
| **OpenClaw Sovereignty** | 11 open | 2026-05-15 | Deploy sovereign AI agent on Hermes VPS - Ollama, OpenClaw, and Matrix portal integration |
---
## Detailed Breakdown
### Automation Hub v1 (Due: 2026-04-10)
Core automation infrastructure - the foundation for all other automation work.
| Issue | Title | Status |
|-------|-------|--------|
| #720 | Refine and group Timmy Automations backlog | **In Progress** |
| #719 | Generate weekly narrative summary of work and vibes | Open |
**Recommendation:** Complete #719 first to establish the narrative logging pattern before other milestones.
---
### Daily Run v1 (Due: 2026-04-15)
The 10-minute ritual that starts Timmy's day - agenda generation, focus presets, and health checks.
| Issue | Title | Status |
|-------|-------|--------|
| #716 | Add focus-day presets for Daily Run and work selection | Open |
| #704 | Enrich Daily Run agenda with classifications and suggestions | Open |
| #705 | Add helper to log Daily Run sessions to a logbook issue | Open |
| #706 | Capture Daily Run feels notes and surface nudges | Open |
| #707 | Integrate Deep Triage outputs into Daily Run agenda | Open |
| #708 | Map flakiness and risky areas for test tightening | Open |
| #709 | Add a library of test-tightening recipes for Daily Run | Open |
| #710 | Implement quick health snapshot before coding | Open |
**Recommendation:** Start with #710 (health snapshot) as it provides immediate value and informs other Daily Run features. Then #716 (focus presets) to establish the work selection pattern.
---
### Infrastructure (Due: 2026-04-15)
DevOps and deployment tasks required for production stability.
| Issue | Title | Status |
|-------|-------|--------|
| #687 | Pre-commit and pre-push hooks fail on main due to 256 ModuleNotFoundErrors | Open |
| #688 | Point all 4 domains to Hermes VPS in GoDaddy DNS | Open |
| #689 | Run SSL provisioning after DNS is pointed | Open |
**Recommendation:** These are sequential - #687 blocks commits, #688 blocks #689. Prioritize #687 for code hygiene.
---
### Dashboard v1 (Due: 2026-04-20)
Mission Control dashboard for automation visibility. Currently empty as related work is in Token Economy (#712).
**Note:** Issue #718 (dashboard card for Daily Run) is already closed. Issue #712 (agent scorecards) spans both Token Economy and Dashboard milestones.
---
### Inbox & Focus v1 (Due: 2026-04-25)
Unified view for issue triage and work selection.
| Issue | Title | Status |
|-------|-------|--------|
| #715 | Implement Timmy Inbox unified view | Open |
**Note:** This is a significant feature that may need to be broken down further once work begins.
---
### Token Economy v1 (Due: 2026-04-30)
Reward system for agent participation and quality work.
| Issue | Title | Status |
|-------|-------|--------|
| #711 | Centralize agent token rules and hooks for automations | Open |
| #712 | Generate daily/weekly agent scorecards | Open |
| #713 | Implement token quest system for agents | Open |
| #714 | Adapt token rewards based on system stress signals | Open |
**Recommendation:** Start with #711 to establish the token infrastructure, then #712 for visibility. #713 and #714 are enhancements that build on the base system.
---
### Code Hygiene (Due: 2026-04-30)
Ongoing code quality improvements. These are good "filler" tasks between larger features.
| Issue | Title | Status |
|-------|-------|--------|
| #769 | Add unit tests for src/infrastructure/db_pool.py | Open |
| #770 | Add unit tests for src/dashboard/routes/health.py | Open |
| #771 | Refactor run_agentic_loop() — 120 lines, extract helpers | Open |
| #772 | Refactor produce_system_status() — 88 lines, split into sections | Open |
| #773 | Add docstrings to public functions in src/dashboard/routes/tasks.py | Open |
| #774 | Add docstrings to VoiceTTS.set_rate(), set_volume(), set_voice() | Open |
| #775 | Add docstrings to system route functions in src/dashboard/routes/system.py | Open |
| #776 | Extract hardcoded PRAGMA busy_timeout=5000 to config | Open |
| #777 | DRY up tasks_pending/active/completed — extract shared helper | Open |
| #778 | Remove bare `pass` after logged exceptions in src/timmy/tools.py | Open |
| #779 | Add unit tests for src/timmy/conversation.py | Open |
| #780 | Add unit tests for src/timmy/interview.py | Open |
| #781 | Add error handling for missing DB in src/dashboard/routes/tasks.py | Open |
| #782 | Extract hardcoded sats limit in consult_grok() to config | Open |
**Recommendation:** These are independent and can be picked up in any order. Good candidates for when blocked on larger features.
---
### Matrix Staging (Due: 2026-04-05)
The Matrix 3D world - UI fixes and WebSocket integration for the Workshop.
**QA Issues:**
| Issue | Title |
|-------|-------|
| #733 | The Matrix staging deployment — 3 issues to fix |
| #757 | No landing page or enter button — site loads directly into 3D world |
| #758 | WebSocket never connects — VITE_WS_URL is empty in production build |
| #759 | Missing Submit Job and Fund Session UI buttons |
| #760 | Chat messages silently dropped when WebSocket is offline |
| #761 | All routes serve identical content — no client-side router |
| #762 | All 5 agents permanently show IDLE state |
| #763 | Chat clear button overlaps connection status on small viewports |
| #764 | Mobile: status panel overlaps HUD agent count on narrow viewports |
**UI Enhancement Issues:**
| Issue | Title |
|-------|-------|
| #747 | Add graceful offline mode — show demo mode instead of hanging |
| #748 | Add loading spinner/progress bar while 3D scene initializes |
| #749 | Add keyboard shortcuts — Escape to close modals, Enter to submit chat |
| #750 | Chat input should auto-focus when Workshop panel opens |
| #751 | Add connection status indicator with color coding |
| #752 | Add dark/light theme toggle |
| #753 | Fund Session modal should show explanatory text about what sats do |
| #754 | Submit Job modal should validate input before submission |
| #755 | Add About/Info panel explaining what The Matrix/Workshop is |
| #756 | Add FPS counter visibility toggle — debug-only by default |
**Note:** This milestone has the earliest due date (2026-04-05) and most issues. Consider splitting into "Matrix Critical" (QA blockers) and "Matrix Polish" (UI enhancements).
---
### OpenClaw Sovereignty (Due: 2026-05-15)
Deploy a sovereign AI agent on Hermes VPS - the long-term goal of Timmy's independence from cloud APIs.
| Issue | Title | Status |
|-------|-------|--------|
| #721 | Research: OpenClaw architecture, deployment modes, and Ollama integration | Open |
| #722 | Research: Best small LLMs for agentic tool-calling on constrained hardware | Open |
| #723 | Research: OpenClaw SOUL.md and AGENTS.md patterns | Open |
| #724 | [1/8] Audit Hermes VPS resources and prepare for OpenClaw deployment | Open |
| #725 | [2/8] Install and configure Ollama on Hermes VPS | Open |
| #726 | [3/8] Install OpenClaw on Hermes VPS and complete onboarding | Open |
| #727 | [4/8] Expose OpenClaw gateway via Tailscale for Matrix portal access | Open |
| #728 | [5/8] Create Timmy's SOUL.md and AGENTS.md — sovereign agent persona | Open |
| #729 | [6/8] Integrate OpenClaw chat as a portal/scroll in The Matrix frontend | Open |
| #730 | [7/8] Create openclaw-tools Gitea repo — Timmy's sovereign toolbox | Open |
| #731 | [8/8] Write sovereignty migration plan — offload tasks from Anthropic to OpenClaw | Open |
**Note:** This is a research-heavy, sequential milestone. Issues #721-#723 should be completed before implementation begins. Consider creating a research summary document as output from the research issues.
---
## Issues Intentionally Left Unassigned
The following issues remain without milestone assignment by design:
### Philosophy Issues
Ongoing discussion threads that don't fit a milestone structure:
- #502, #511, #521, #528, #536, #543, #548, #556, #566, #571, #583, #588, #596, #602, #608, #613, #623, #630, #642
### Feature Ideas / Future Work
Ideas that need more definition before milestone assignment:
- #654, #653, #652, #651, #650 (ASCII Video showcase)
- #664 (Chain Memory song)
- #578, #577, #579 (Autonomous action, identity evolution, contextual mastery)
### Completed Issues
Already closed issues remain in their original state without milestone assignment.
---
## Recommended Execution Order
Based on priority and dependencies:
1. **Automation Hub v1** (April 10) - Foundation for all automation work
2. **Daily Run v1** (April 15) - Core developer experience improvement
3. **Infrastructure** (April 15) - Unblocks production deployments
4. **Matrix Staging** (April 5) - *Parallel track* - UI team work
5. **Inbox & Focus v1** (April 25) - Builds on Daily Run patterns
6. **Dashboard v1** (April 20) - Visualizes Token Economy data
7. **Token Economy v1** (April 30) - Gamification layer
8. **Code Hygiene** (April 30) - *Ongoing* - Fill gaps between features
9. **OpenClaw Sovereignty** (May 15) - Long-term research and deployment
---
## Notes for Future Triage
- Issues should be assigned to milestones at creation time
- Each milestone should have a "Definition of Done" documented
- Consider creating epic issues for large milestones (OpenClaw, Matrix)
- Weekly triage should review unassigned issues and new arrivals
- Milestone due dates should be adjusted based on velocity
---
*This document is maintained as part of the Timmy Automations subsystem. Update it when milestone structure changes.*

View File

@@ -53,26 +53,21 @@ def load_config() -> dict:
def get_token(config: dict) -> str | None: def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file. """Get Gitea token from environment or file."""
Priority: config["token"] > config["token_file"] > .timmy_gitea_token
"""
if "token" in config: if "token" in config:
return config["token"] return config["token"]
# Explicit token_file from config takes priority # Try timmy's token file
token_file_str = config.get("token_file", "")
if token_file_str:
token_file = Path(token_file_str)
if token_file.exists():
return token_file.read_text().strip()
# Fallback: repo-root .timmy_gitea_token
repo_root = Path(__file__).resolve().parent.parent.parent repo_root = Path(__file__).resolve().parent.parent.parent
timmy_token_path = repo_root / ".timmy_gitea_token" timmy_token_path = repo_root / ".timmy_gitea_token"
if timmy_token_path.exists(): if timmy_token_path.exists():
return timmy_token_path.read_text().strip() return timmy_token_path.read_text().strip()
# Fallback to legacy token file
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None return None