refactor: split scorecard_service.py into focused modules (#1406)
Some checks failed
Tests / lint (pull_request) Successful in 22s
Tests / test (pull_request) Failing after 25m52s

Splits the 517-line scorecard_service.py monolith into a proper package:

- types.py: PeriodType, AgentMetrics, ScorecardSummary dataclasses
- validators.py: Input validation, period bounds, actor extraction
- aggregators.py: Event collection, metrics aggregation, token queries
- calculators.py: Pattern detection algorithms, score calculations
- formatters.py: Narrative bullet generation, display formatting
- core.py: Main ScorecardService orchestrator (generate_scorecard, generate_all_scorecards)
- __init__.py: Clean public API exports

Benefits:
- Testable components in isolation
- Clear separation of concerns
- More maintainable dashboard logic
- No breaking changes to public API

Fixes #1406
This commit is contained in:
kimi
2026-03-24 16:04:22 -04:00
parent 4d2aeb937f
commit 596daa26a6
12 changed files with 766 additions and 620 deletions

View File

@@ -8,7 +8,7 @@ from datetime import datetime
from fastapi import APIRouter, Query, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.services.scorecard_service import (
from dashboard.services.scorecard import (
PeriodType,
ScorecardSummary,
generate_all_scorecards,

View File

@@ -1,6 +1,6 @@
"""Dashboard services for business logic."""
from dashboard.services.scorecard_service import (
from dashboard.services.scorecard import (
PeriodType,
ScorecardSummary,
generate_all_scorecards,

View File

@@ -0,0 +1,25 @@
"""Scorecard service package — track and summarize agent performance.
Generates daily/weekly scorecards showing:
- Issues touched, PRs opened/merged
- Tests affected, tokens earned/spent
- Pattern highlights (merge rate, activity quality)
"""
from __future__ import annotations
from dashboard.services.scorecard.core import (
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from dashboard.services.scorecard.types import AgentMetrics, PeriodType, ScorecardSummary
__all__ = [
"AgentMetrics",
"generate_all_scorecards",
"generate_scorecard",
"get_tracked_agents",
"PeriodType",
"ScorecardSummary",
]

View File

@@ -0,0 +1,203 @@
"""Data aggregation logic for scorecard generation."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING
from dashboard.services.scorecard.types import TRACKED_AGENTS, AgentMetrics
from dashboard.services.scorecard.validators import (
extract_actor_from_event,
is_tracked_agent,
)
from infrastructure.events.bus import get_event_bus
if TYPE_CHECKING:
from infrastructure.events.bus import Event
logger = logging.getLogger(__name__)
def collect_events_for_period(
start: datetime, end: datetime, agent_id: str | None = None
) -> list[Event]:
"""Collect events from the event bus for a time period.
Args:
start: Period start time
end: Period end time
agent_id: Optional agent filter
Returns:
List of matching events
"""
bus = get_event_bus()
events: list[Event] = []
# Query persisted events for relevant types
event_types = [
"gitea.push",
"gitea.issue.opened",
"gitea.issue.comment",
"gitea.pull_request",
"agent.task.completed",
"test.execution",
]
for event_type in event_types:
try:
type_events = bus.replay(
event_type=event_type,
source=agent_id,
limit=1000,
)
events.extend(type_events)
except Exception as exc:
logger.debug("Failed to replay events for %s: %s", event_type, exc)
# Filter by timestamp
filtered = []
for event in events:
try:
event_time = datetime.fromisoformat(event.timestamp.replace("Z", "+00:00"))
if start <= event_time < end:
filtered.append(event)
except (ValueError, AttributeError):
continue
return filtered
def aggregate_metrics(events: list[Event]) -> dict[str, AgentMetrics]:
"""Aggregate metrics from events grouped by agent.
Args:
events: List of events to process
Returns:
Dict mapping agent_id -> AgentMetrics
"""
metrics_by_agent: dict[str, AgentMetrics] = {}
for event in events:
actor = extract_actor_from_event(event)
# Skip non-agent events unless they explicitly have an agent_id
if not is_tracked_agent(actor) and "agent_id" not in event.data:
continue
if actor not in metrics_by_agent:
metrics_by_agent[actor] = AgentMetrics(agent_id=actor)
metrics = metrics_by_agent[actor]
# Process based on event type
event_type = event.type
if event_type == "gitea.push":
metrics.commits += event.data.get("num_commits", 1)
elif event_type == "gitea.issue.opened":
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.issue.comment":
metrics.comments += 1
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.pull_request":
pr_num = event.data.get("pr_number", 0)
action = event.data.get("action", "")
merged = event.data.get("merged", False)
if pr_num:
if action == "opened":
metrics.prs_opened.add(pr_num)
elif action == "closed" and merged:
metrics.prs_merged.add(pr_num)
# Also count as touched issue for tracking
metrics.issues_touched.add(pr_num)
elif event_type == "agent.task.completed":
# Extract test files from task data
affected = event.data.get("tests_affected", [])
for test in affected:
metrics.tests_affected.add(test)
# Token rewards from task completion
reward = event.data.get("token_reward", 0)
if reward:
metrics.tokens_earned += reward
elif event_type == "test.execution":
# Track test files that were executed
test_files = event.data.get("test_files", [])
for test in test_files:
metrics.tests_affected.add(test)
return metrics_by_agent
def query_token_transactions(agent_id: str, start: datetime, end: datetime) -> tuple[int, int]:
"""Query the lightning ledger for token transactions.
Args:
agent_id: The agent to query for
start: Period start
end: Period end
Returns:
Tuple of (tokens_earned, tokens_spent)
"""
try:
from lightning.ledger import get_transactions
transactions = get_transactions(limit=1000)
earned = 0
spent = 0
for tx in transactions:
# Filter by agent if specified
if tx.agent_id and tx.agent_id != agent_id:
continue
# Filter by timestamp
try:
tx_time = datetime.fromisoformat(tx.created_at.replace("Z", "+00:00"))
if not (start <= tx_time < end):
continue
except (ValueError, AttributeError):
continue
if tx.tx_type.value == "incoming":
earned += tx.amount_sats
else:
spent += tx.amount_sats
return earned, spent
except Exception as exc:
logger.debug("Failed to query token transactions: %s", exc)
return 0, 0
def ensure_all_tracked_agents(
metrics_by_agent: dict[str, AgentMetrics],
) -> dict[str, AgentMetrics]:
"""Ensure all tracked agents have metrics entries.
Args:
metrics_by_agent: Current metrics dictionary
Returns:
Updated metrics with all tracked agents included
"""
for agent_id in TRACKED_AGENTS:
if agent_id not in metrics_by_agent:
metrics_by_agent[agent_id] = AgentMetrics(agent_id=agent_id)
return metrics_by_agent

View File

@@ -0,0 +1,61 @@
"""Score calculation and pattern detection algorithms."""
from __future__ import annotations
from dashboard.services.scorecard.types import AgentMetrics
def calculate_pr_merge_rate(prs_opened: int, prs_merged: int) -> float:
"""Calculate PR merge rate.
Args:
prs_opened: Number of PRs opened
prs_merged: Number of PRs merged
Returns:
Merge rate between 0.0 and 1.0
"""
if prs_opened == 0:
return 0.0
return prs_merged / prs_opened
def detect_patterns(metrics: AgentMetrics) -> list[str]:
"""Detect interesting patterns in agent behavior.
Args:
metrics: The agent's metrics
Returns:
List of pattern descriptions
"""
patterns: list[str] = []
pr_opened = len(metrics.prs_opened)
merge_rate = metrics.pr_merge_rate
# Merge rate patterns
if pr_opened >= 3:
if merge_rate >= 0.8:
patterns.append("High merge rate with few failures — code quality focus.")
elif merge_rate <= 0.3:
patterns.append("Lots of noisy PRs, low merge rate — may need review support.")
# Activity patterns
if metrics.commits > 10 and pr_opened == 0:
patterns.append("High commit volume without PRs — working directly on main?")
if len(metrics.issues_touched) > 5 and metrics.comments == 0:
patterns.append("Touching many issues but low comment volume — silent worker.")
if metrics.comments > len(metrics.issues_touched) * 2:
patterns.append("Highly communicative — lots of discussion relative to work items.")
# Token patterns
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if net_tokens > 100:
patterns.append("Strong token accumulation — high value delivery.")
elif net_tokens < -50:
patterns.append("High token spend — may be in experimentation phase.")
return patterns

View File

@@ -0,0 +1,129 @@
"""Core scorecard service — orchestrates scorecard generation."""
from __future__ import annotations
from datetime import datetime
from dashboard.services.scorecard.aggregators import (
aggregate_metrics,
collect_events_for_period,
ensure_all_tracked_agents,
query_token_transactions,
)
from dashboard.services.scorecard.calculators import detect_patterns
from dashboard.services.scorecard.formatters import generate_narrative_bullets
from dashboard.services.scorecard.types import (
TRACKED_AGENTS,
AgentMetrics,
PeriodType,
ScorecardSummary,
)
from dashboard.services.scorecard.validators import get_period_bounds
def generate_scorecard(
agent_id: str,
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> ScorecardSummary | None:
"""Generate a scorecard for a single agent.
Args:
agent_id: The agent to generate scorecard for
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
ScorecardSummary or None if agent has no activity
"""
start, end = get_period_bounds(period_type, reference_date)
# Collect events
events = collect_events_for_period(start, end, agent_id)
# Aggregate metrics
all_metrics = aggregate_metrics(events)
# Get metrics for this specific agent
if agent_id not in all_metrics:
# Create empty metrics - still generate a scorecard
metrics = AgentMetrics(agent_id=agent_id)
else:
metrics = all_metrics[agent_id]
# Augment with token data from ledger
tokens_earned, tokens_spent = query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
# Generate narrative and patterns
narrative = generate_narrative_bullets(metrics, period_type)
patterns = detect_patterns(metrics)
return ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
def generate_all_scorecards(
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> list[ScorecardSummary]:
"""Generate scorecards for all tracked agents.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
List of ScorecardSummary for all agents with activity
"""
start, end = get_period_bounds(period_type, reference_date)
# Collect all events
events = collect_events_for_period(start, end)
# Aggregate metrics for all agents
all_metrics = aggregate_metrics(events)
# Include tracked agents even if no activity
ensure_all_tracked_agents(all_metrics)
# Generate scorecards
scorecards: list[ScorecardSummary] = []
for agent_id, metrics in all_metrics.items():
# Augment with token data
tokens_earned, tokens_spent = query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
narrative = generate_narrative_bullets(metrics, period_type)
patterns = detect_patterns(metrics)
scorecard = ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
scorecards.append(scorecard)
# Sort by agent_id for consistent ordering
scorecards.sort(key=lambda s: s.agent_id)
return scorecards
def get_tracked_agents() -> list[str]:
"""Return the list of tracked agent IDs."""
return sorted(TRACKED_AGENTS)

View File

@@ -0,0 +1,93 @@
"""Display formatting and narrative generation for scorecards."""
from __future__ import annotations
from dashboard.services.scorecard.types import AgentMetrics, PeriodType
def format_activity_summary(metrics: AgentMetrics) -> list[str]:
"""Format activity summary items.
Args:
metrics: The agent's metrics
Returns:
List of activity description strings
"""
activities = []
if metrics.commits:
activities.append(f"{metrics.commits} commit{'s' if metrics.commits != 1 else ''}")
if len(metrics.prs_opened):
activities.append(
f"{len(metrics.prs_opened)} PR{'s' if len(metrics.prs_opened) != 1 else ''} opened"
)
if len(metrics.prs_merged):
activities.append(
f"{len(metrics.prs_merged)} PR{'s' if len(metrics.prs_merged) != 1 else ''} merged"
)
if len(metrics.issues_touched):
activities.append(
f"{len(metrics.issues_touched)} issue{'s' if len(metrics.issues_touched) != 1 else ''} touched"
)
if metrics.comments:
activities.append(f"{metrics.comments} comment{'s' if metrics.comments != 1 else ''}")
return activities
def format_token_summary(tokens_earned: int, tokens_spent: int) -> str | None:
"""Format token summary text.
Args:
tokens_earned: Tokens earned
tokens_spent: Tokens spent
Returns:
Formatted token summary string or None if no token activity
"""
if not tokens_earned and not tokens_spent:
return None
net_tokens = tokens_earned - tokens_spent
if net_tokens > 0:
return f"Net earned {net_tokens} tokens ({tokens_earned} earned, {tokens_spent} spent)."
elif net_tokens < 0:
return f"Net spent {abs(net_tokens)} tokens ({tokens_earned} earned, {tokens_spent} spent)."
else:
return f"Balanced token flow ({tokens_earned} earned, {tokens_spent} spent)."
def generate_narrative_bullets(metrics: AgentMetrics, period_type: PeriodType) -> list[str]:
"""Generate narrative summary bullets for a scorecard.
Args:
metrics: The agent's metrics
period_type: daily or weekly
Returns:
List of narrative bullet points
"""
bullets: list[str] = []
period_label = "day" if period_type == PeriodType.daily else "week"
# Activity summary
activities = format_activity_summary(metrics)
if activities:
bullets.append(f"Active across {', '.join(activities)} this {period_label}.")
# Test activity
if len(metrics.tests_affected):
bullets.append(
f"Affected {len(metrics.tests_affected)} test file{'s' if len(metrics.tests_affected) != 1 else ''}."
)
# Token summary
token_summary = format_token_summary(metrics.tokens_earned, metrics.tokens_spent)
if token_summary:
bullets.append(token_summary)
# Handle empty case
if not bullets:
bullets.append(f"No recorded activity this {period_label}.")
return bullets

View File

@@ -0,0 +1,86 @@
"""Scorecard type definitions and data classes."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
class PeriodType(StrEnum):
"""Scorecard reporting period type."""
daily = "daily"
weekly = "weekly"
# Bot/agent usernames to track
TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
@dataclass
class AgentMetrics:
"""Raw metrics collected for an agent over a period."""
agent_id: str
issues_touched: set[int] = field(default_factory=set)
prs_opened: set[int] = field(default_factory=set)
prs_merged: set[int] = field(default_factory=set)
tests_affected: set[str] = field(default_factory=set)
tokens_earned: int = 0
tokens_spent: int = 0
commits: int = 0
comments: int = 0
@property
def pr_merge_rate(self) -> float:
"""Calculate PR merge rate (0.0 - 1.0)."""
opened = len(self.prs_opened)
if opened == 0:
return 0.0
return len(self.prs_merged) / opened
@dataclass
class ScorecardSummary:
"""A generated scorecard with narrative summary."""
agent_id: str
period_type: PeriodType
period_start: datetime
period_end: datetime
metrics: AgentMetrics
narrative_bullets: list[str] = field(default_factory=list)
patterns: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""Convert scorecard to dictionary for JSON serialization."""
return {
"agent_id": self.agent_id,
"period_type": self.period_type.value,
"period_start": self.period_start.isoformat(),
"period_end": self.period_end.isoformat(),
"metrics": {
"issues_touched": len(self.metrics.issues_touched),
"prs_opened": len(self.metrics.prs_opened),
"prs_merged": len(self.metrics.prs_merged),
"pr_merge_rate": round(self.metrics.pr_merge_rate, 2),
"tests_affected": len(self.tests_affected),
"commits": self.metrics.commits,
"comments": self.metrics.comments,
"tokens_earned": self.metrics.tokens_earned,
"tokens_spent": self.metrics.tokens_spent,
"token_net": self.metrics.tokens_earned - self.metrics.tokens_spent,
},
"narrative_bullets": self.narrative_bullets,
"patterns": self.patterns,
}
@property
def tests_affected(self) -> set[str]:
"""Alias for metrics.tests_affected."""
return self.metrics.tests_affected
# Import datetime here to avoid issues with forward references
from datetime import datetime # noqa: E402

View File

@@ -0,0 +1,71 @@
"""Input validation utilities for scorecard operations."""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
from dashboard.services.scorecard.types import TRACKED_AGENTS, PeriodType
if TYPE_CHECKING:
from infrastructure.events.bus import Event
def is_tracked_agent(actor: str) -> bool:
"""Check if an actor is a tracked agent."""
return actor.lower() in TRACKED_AGENTS
def extract_actor_from_event(event: Event) -> str:
"""Extract the actor/agent from an event."""
# Try data fields first
if "actor" in event.data:
return event.data["actor"]
if "agent_id" in event.data:
return event.data["agent_id"]
# Fall back to source
return event.source
def get_period_bounds(
period_type: PeriodType, reference_date: datetime | None = None
) -> tuple[datetime, datetime]:
"""Calculate start and end timestamps for a period.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
Tuple of (period_start, period_end) in UTC
"""
if reference_date is None:
reference_date = datetime.now(UTC)
# Normalize to start of day
end = reference_date.replace(hour=0, minute=0, second=0, microsecond=0)
if period_type == PeriodType.daily:
start = end - timedelta(days=1)
else: # weekly
start = end - timedelta(days=7)
return start, end
def validate_period_type(period: str) -> PeriodType:
"""Validate and convert a period string to PeriodType.
Args:
period: The period string to validate
Returns:
PeriodType enum value
Raises:
ValueError: If the period string is invalid
"""
try:
return PeriodType(period.lower())
except ValueError as exc:
raise ValueError(f"Invalid period '{period}'. Use 'daily' or 'weekly'.") from exc

View File

@@ -1,517 +0,0 @@
"""Agent scorecard service — track and summarize agent performance.
Generates daily/weekly scorecards showing:
- Issues touched, PRs opened/merged
- Tests affected, tokens earned/spent
- Pattern highlights (merge rate, activity quality)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any
from infrastructure.events.bus import Event, get_event_bus
logger = logging.getLogger(__name__)
# Bot/agent usernames to track
TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
class PeriodType(StrEnum):
"""Scorecard reporting period type."""
daily = "daily"
weekly = "weekly"
@dataclass
class AgentMetrics:
"""Raw metrics collected for an agent over a period."""
agent_id: str
issues_touched: set[int] = field(default_factory=set)
prs_opened: set[int] = field(default_factory=set)
prs_merged: set[int] = field(default_factory=set)
tests_affected: set[str] = field(default_factory=set)
tokens_earned: int = 0
tokens_spent: int = 0
commits: int = 0
comments: int = 0
@property
def pr_merge_rate(self) -> float:
"""Calculate PR merge rate (0.0 - 1.0)."""
opened = len(self.prs_opened)
if opened == 0:
return 0.0
return len(self.prs_merged) / opened
@dataclass
class ScorecardSummary:
"""A generated scorecard with narrative summary."""
agent_id: str
period_type: PeriodType
period_start: datetime
period_end: datetime
metrics: AgentMetrics
narrative_bullets: list[str] = field(default_factory=list)
patterns: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""Convert scorecard to dictionary for JSON serialization."""
return {
"agent_id": self.agent_id,
"period_type": self.period_type.value,
"period_start": self.period_start.isoformat(),
"period_end": self.period_end.isoformat(),
"metrics": {
"issues_touched": len(self.metrics.issues_touched),
"prs_opened": len(self.metrics.prs_opened),
"prs_merged": len(self.metrics.prs_merged),
"pr_merge_rate": round(self.metrics.pr_merge_rate, 2),
"tests_affected": len(self.tests_affected),
"commits": self.metrics.commits,
"comments": self.metrics.comments,
"tokens_earned": self.metrics.tokens_earned,
"tokens_spent": self.metrics.tokens_spent,
"token_net": self.metrics.tokens_earned - self.metrics.tokens_spent,
},
"narrative_bullets": self.narrative_bullets,
"patterns": self.patterns,
}
@property
def tests_affected(self) -> set[str]:
"""Alias for metrics.tests_affected."""
return self.metrics.tests_affected
def _get_period_bounds(
period_type: PeriodType, reference_date: datetime | None = None
) -> tuple[datetime, datetime]:
"""Calculate start and end timestamps for a period.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
Tuple of (period_start, period_end) in UTC
"""
if reference_date is None:
reference_date = datetime.now(UTC)
# Normalize to start of day
end = reference_date.replace(hour=0, minute=0, second=0, microsecond=0)
if period_type == PeriodType.daily:
start = end - timedelta(days=1)
else: # weekly
start = end - timedelta(days=7)
return start, end
def _collect_events_for_period(
start: datetime, end: datetime, agent_id: str | None = None
) -> list[Event]:
"""Collect events from the event bus for a time period.
Args:
start: Period start time
end: Period end time
agent_id: Optional agent filter
Returns:
List of matching events
"""
bus = get_event_bus()
events: list[Event] = []
# Query persisted events for relevant types
event_types = [
"gitea.push",
"gitea.issue.opened",
"gitea.issue.comment",
"gitea.pull_request",
"agent.task.completed",
"test.execution",
]
for event_type in event_types:
try:
type_events = bus.replay(
event_type=event_type,
source=agent_id,
limit=1000,
)
events.extend(type_events)
except Exception as exc:
logger.debug("Failed to replay events for %s: %s", event_type, exc)
# Filter by timestamp
filtered = []
for event in events:
try:
event_time = datetime.fromisoformat(event.timestamp.replace("Z", "+00:00"))
if start <= event_time < end:
filtered.append(event)
except (ValueError, AttributeError):
continue
return filtered
def _extract_actor_from_event(event: Event) -> str:
"""Extract the actor/agent from an event."""
# Try data fields first
if "actor" in event.data:
return event.data["actor"]
if "agent_id" in event.data:
return event.data["agent_id"]
# Fall back to source
return event.source
def _is_tracked_agent(actor: str) -> bool:
"""Check if an actor is a tracked agent."""
return actor.lower() in TRACKED_AGENTS
def _aggregate_metrics(events: list[Event]) -> dict[str, AgentMetrics]:
"""Aggregate metrics from events grouped by agent.
Args:
events: List of events to process
Returns:
Dict mapping agent_id -> AgentMetrics
"""
metrics_by_agent: dict[str, AgentMetrics] = {}
for event in events:
actor = _extract_actor_from_event(event)
# Skip non-agent events unless they explicitly have an agent_id
if not _is_tracked_agent(actor) and "agent_id" not in event.data:
continue
if actor not in metrics_by_agent:
metrics_by_agent[actor] = AgentMetrics(agent_id=actor)
metrics = metrics_by_agent[actor]
# Process based on event type
event_type = event.type
if event_type == "gitea.push":
metrics.commits += event.data.get("num_commits", 1)
elif event_type == "gitea.issue.opened":
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.issue.comment":
metrics.comments += 1
issue_num = event.data.get("issue_number", 0)
if issue_num:
metrics.issues_touched.add(issue_num)
elif event_type == "gitea.pull_request":
pr_num = event.data.get("pr_number", 0)
action = event.data.get("action", "")
merged = event.data.get("merged", False)
if pr_num:
if action == "opened":
metrics.prs_opened.add(pr_num)
elif action == "closed" and merged:
metrics.prs_merged.add(pr_num)
# Also count as touched issue for tracking
metrics.issues_touched.add(pr_num)
elif event_type == "agent.task.completed":
# Extract test files from task data
affected = event.data.get("tests_affected", [])
for test in affected:
metrics.tests_affected.add(test)
# Token rewards from task completion
reward = event.data.get("token_reward", 0)
if reward:
metrics.tokens_earned += reward
elif event_type == "test.execution":
# Track test files that were executed
test_files = event.data.get("test_files", [])
for test in test_files:
metrics.tests_affected.add(test)
return metrics_by_agent
def _query_token_transactions(agent_id: str, start: datetime, end: datetime) -> tuple[int, int]:
"""Query the lightning ledger for token transactions.
Args:
agent_id: The agent to query for
start: Period start
end: Period end
Returns:
Tuple of (tokens_earned, tokens_spent)
"""
try:
from lightning.ledger import get_transactions
transactions = get_transactions(limit=1000)
earned = 0
spent = 0
for tx in transactions:
# Filter by agent if specified
if tx.agent_id and tx.agent_id != agent_id:
continue
# Filter by timestamp
try:
tx_time = datetime.fromisoformat(tx.created_at.replace("Z", "+00:00"))
if not (start <= tx_time < end):
continue
except (ValueError, AttributeError):
continue
if tx.tx_type.value == "incoming":
earned += tx.amount_sats
else:
spent += tx.amount_sats
return earned, spent
except Exception as exc:
logger.debug("Failed to query token transactions: %s", exc)
return 0, 0
def _generate_narrative_bullets(metrics: AgentMetrics, period_type: PeriodType) -> list[str]:
"""Generate narrative summary bullets for a scorecard.
Args:
metrics: The agent's metrics
period_type: daily or weekly
Returns:
List of narrative bullet points
"""
bullets: list[str] = []
period_label = "day" if period_type == PeriodType.daily else "week"
# Activity summary
activities = []
if metrics.commits:
activities.append(f"{metrics.commits} commit{'s' if metrics.commits != 1 else ''}")
if len(metrics.prs_opened):
activities.append(
f"{len(metrics.prs_opened)} PR{'s' if len(metrics.prs_opened) != 1 else ''} opened"
)
if len(metrics.prs_merged):
activities.append(
f"{len(metrics.prs_merged)} PR{'s' if len(metrics.prs_merged) != 1 else ''} merged"
)
if len(metrics.issues_touched):
activities.append(
f"{len(metrics.issues_touched)} issue{'s' if len(metrics.issues_touched) != 1 else ''} touched"
)
if metrics.comments:
activities.append(f"{metrics.comments} comment{'s' if metrics.comments != 1 else ''}")
if activities:
bullets.append(f"Active across {', '.join(activities)} this {period_label}.")
# Test activity
if len(metrics.tests_affected):
bullets.append(
f"Affected {len(metrics.tests_affected)} test file{'s' if len(metrics.tests_affected) != 1 else ''}."
)
# Token summary
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if metrics.tokens_earned or metrics.tokens_spent:
if net_tokens > 0:
bullets.append(
f"Net earned {net_tokens} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
elif net_tokens < 0:
bullets.append(
f"Net spent {abs(net_tokens)} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
else:
bullets.append(
f"Balanced token flow ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
)
# Handle empty case
if not bullets:
bullets.append(f"No recorded activity this {period_label}.")
return bullets
def _detect_patterns(metrics: AgentMetrics) -> list[str]:
"""Detect interesting patterns in agent behavior.
Args:
metrics: The agent's metrics
Returns:
List of pattern descriptions
"""
patterns: list[str] = []
pr_opened = len(metrics.prs_opened)
merge_rate = metrics.pr_merge_rate
# Merge rate patterns
if pr_opened >= 3:
if merge_rate >= 0.8:
patterns.append("High merge rate with few failures — code quality focus.")
elif merge_rate <= 0.3:
patterns.append("Lots of noisy PRs, low merge rate — may need review support.")
# Activity patterns
if metrics.commits > 10 and pr_opened == 0:
patterns.append("High commit volume without PRs — working directly on main?")
if len(metrics.issues_touched) > 5 and metrics.comments == 0:
patterns.append("Touching many issues but low comment volume — silent worker.")
if metrics.comments > len(metrics.issues_touched) * 2:
patterns.append("Highly communicative — lots of discussion relative to work items.")
# Token patterns
net_tokens = metrics.tokens_earned - metrics.tokens_spent
if net_tokens > 100:
patterns.append("Strong token accumulation — high value delivery.")
elif net_tokens < -50:
patterns.append("High token spend — may be in experimentation phase.")
return patterns
def generate_scorecard(
agent_id: str,
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> ScorecardSummary | None:
"""Generate a scorecard for a single agent.
Args:
agent_id: The agent to generate scorecard for
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
ScorecardSummary or None if agent has no activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect events
events = _collect_events_for_period(start, end, agent_id)
# Aggregate metrics
all_metrics = _aggregate_metrics(events)
# Get metrics for this specific agent
if agent_id not in all_metrics:
# Create empty metrics - still generate a scorecard
metrics = AgentMetrics(agent_id=agent_id)
else:
metrics = all_metrics[agent_id]
# Augment with token data from ledger
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
# Generate narrative and patterns
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
return ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
def generate_all_scorecards(
period_type: PeriodType = PeriodType.daily,
reference_date: datetime | None = None,
) -> list[ScorecardSummary]:
"""Generate scorecards for all tracked agents.
Args:
period_type: daily or weekly
reference_date: The date to calculate from (defaults to now)
Returns:
List of ScorecardSummary for all agents with activity
"""
start, end = _get_period_bounds(period_type, reference_date)
# Collect all events
events = _collect_events_for_period(start, end)
# Aggregate metrics for all agents
all_metrics = _aggregate_metrics(events)
# Include tracked agents even if no activity
for agent_id in TRACKED_AGENTS:
if agent_id not in all_metrics:
all_metrics[agent_id] = AgentMetrics(agent_id=agent_id)
# Generate scorecards
scorecards: list[ScorecardSummary] = []
for agent_id, metrics in all_metrics.items():
# Augment with token data
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
narrative = _generate_narrative_bullets(metrics, period_type)
patterns = _detect_patterns(metrics)
scorecard = ScorecardSummary(
agent_id=agent_id,
period_type=period_type,
period_start=start,
period_end=end,
metrics=metrics,
narrative_bullets=narrative,
patterns=patterns,
)
scorecards.append(scorecard)
# Sort by agent_id for consistent ordering
scorecards.sort(key=lambda s: s.agent_id)
return scorecards
def get_tracked_agents() -> list[str]:
"""Return the list of tracked agent IDs."""
return sorted(TRACKED_AGENTS)

View File

@@ -1,10 +1,10 @@
"""Unit tests for dashboard/services/scorecard_service.py.
"""Unit tests for dashboard/services/scorecard package.
Focuses on edge cases and scenarios not covered in test_scorecards.py:
- _aggregate_metrics: test.execution events, PR-closed-without-merge,
- aggregate_metrics: test.execution events, PR-closed-without-merge,
push default commit count, untracked agent with agent_id passthrough
- _detect_patterns: boundary conditions (< 3 PRs, exactly 3, exactly 80%)
- _generate_narrative_bullets: singular/plural forms
- detect_patterns: boundary conditions (< 3 PRs, exactly 3, exactly 80%)
- generate_narrative_bullets: singular/plural forms
- generate_scorecard: token augmentation max() logic
- ScorecardSummary.to_dict(): ISO timestamp format, tests_affected count
"""
@@ -18,31 +18,31 @@ import pytest
pytestmark = pytest.mark.unit
from dashboard.services.scorecard_service import (
from dashboard.services.scorecard import (
AgentMetrics,
PeriodType,
ScorecardSummary,
_aggregate_metrics,
_detect_patterns,
_generate_narrative_bullets,
generate_scorecard,
)
from dashboard.services.scorecard.aggregators import aggregate_metrics
from dashboard.services.scorecard.calculators import detect_patterns
from dashboard.services.scorecard.formatters import generate_narrative_bullets
from infrastructure.events.bus import Event
# ---------------------------------------------------------------------------
# _aggregate_metrics — edge cases
# aggregate_metrics — edge cases
# ---------------------------------------------------------------------------
class TestAggregateMetricsEdgeCases:
"""Edge cases for _aggregate_metrics not covered in test_scorecards.py."""
"""Edge cases for aggregate_metrics not covered in test_scorecards.py."""
def test_push_event_defaults_to_one_commit(self):
"""Push event with no num_commits key should count as 1 commit."""
events = [
Event(type="gitea.push", source="gitea", data={"actor": "claude"}),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert result["claude"].commits == 1
@@ -55,7 +55,7 @@ class TestAggregateMetricsEdgeCases:
data={"actor": "kimi", "pr_number": 99, "action": "closed", "merged": False},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
# PR was not merged — should not be in prs_merged
assert "kimi" in result
@@ -77,7 +77,7 @@ class TestAggregateMetricsEdgeCases:
},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "gemini" in result
assert "tests/test_alpha.py" in result["gemini"].tests_affected
@@ -92,7 +92,7 @@ class TestAggregateMetricsEdgeCases:
data={"agent_id": "kimi", "tests_affected": [], "token_reward": 5},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
# kimi is tracked and agent_id is present in data
assert "kimi" in result
@@ -107,7 +107,7 @@ class TestAggregateMetricsEdgeCases:
data={"actor": "anon-bot", "num_commits": 10},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "anon-bot" not in result
@@ -120,7 +120,7 @@ class TestAggregateMetricsEdgeCases:
data={"actor": "hermes", "issue_number": 0},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "hermes" in result
assert len(result["hermes"].issues_touched) == 0
@@ -134,7 +134,7 @@ class TestAggregateMetricsEdgeCases:
data={"actor": "manus", "issue_number": 0},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "manus" in result
assert result["manus"].comments == 1
@@ -149,7 +149,7 @@ class TestAggregateMetricsEdgeCases:
data={"agent_id": "claude", "tests_affected": [], "token_reward": 20},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "claude" in result
assert len(result["claude"].tests_affected) == 0
@@ -161,7 +161,7 @@ class TestAggregateMetricsEdgeCases:
Event(type="gitea.push", source="gitea", data={"actor": "claude", "num_commits": 3}),
Event(type="gitea.push", source="gitea", data={"actor": "gemini", "num_commits": 7}),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert result["claude"].commits == 3
assert result["gemini"].commits == 7
@@ -175,7 +175,7 @@ class TestAggregateMetricsEdgeCases:
data={"actor": "kimi", "pr_number": 0, "action": "opened"},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].prs_opened) == 0
@@ -192,7 +192,7 @@ class TestDetectPatternsBoundaries:
def test_no_patterns_with_empty_metrics(self):
"""Empty metrics should not trigger any patterns."""
metrics = AgentMetrics(agent_id="kimi")
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert patterns == []
@@ -203,7 +203,7 @@ class TestDetectPatternsBoundaries:
prs_opened={1, 2},
prs_merged={1, 2}, # 100% rate but only 2 PRs
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
# Should NOT trigger high-merge-rate pattern (< 3 PRs)
assert not any("High merge rate" in p for p in patterns)
@@ -216,7 +216,7 @@ class TestDetectPatternsBoundaries:
prs_opened={1, 2, 3},
prs_merged={1, 2, 3}, # 100% rate, 3 PRs
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High merge rate" in p for p in patterns)
@@ -227,7 +227,7 @@ class TestDetectPatternsBoundaries:
prs_opened={1, 2, 3, 4, 5},
prs_merged={1, 2, 3, 4}, # 80%
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High merge rate" in p for p in patterns)
@@ -238,7 +238,7 @@ class TestDetectPatternsBoundaries:
prs_opened={1, 2, 3, 4, 5, 6, 7}, # 7 PRs
prs_merged={1, 2, 3, 4, 5}, # ~71.4% — below 80%
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert not any("High merge rate" in p for p in patterns)
@@ -249,7 +249,7 @@ class TestDetectPatternsBoundaries:
commits=10,
prs_opened=set(),
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert not any("High commit volume" in p for p in patterns)
@@ -260,27 +260,27 @@ class TestDetectPatternsBoundaries:
commits=11,
prs_opened=set(),
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High commit volume without PRs" in p for p in patterns)
def test_token_accumulation_exact_boundary(self):
"""Net tokens = 100 does NOT trigger accumulation pattern (must be > 100)."""
metrics = AgentMetrics(agent_id="kimi", tokens_earned=100, tokens_spent=0)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert not any("Strong token accumulation" in p for p in patterns)
def test_token_spend_exact_boundary(self):
"""Net tokens = -50 does NOT trigger high spend pattern (must be < -50)."""
metrics = AgentMetrics(agent_id="kimi", tokens_earned=0, tokens_spent=50)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert not any("High token spend" in p for p in patterns)
# ---------------------------------------------------------------------------
# _generate_narrative_bullets — singular/plural
# generate_narrative_bullets — singular/plural
# ---------------------------------------------------------------------------
@@ -290,7 +290,7 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_commit(self):
"""One commit should use singular form."""
metrics = AgentMetrics(agent_id="kimi", commits=1)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity = next((b for b in bullets if "Active across" in b), None)
assert activity is not None
@@ -300,7 +300,7 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_pr_opened(self):
"""One opened PR should use singular form."""
metrics = AgentMetrics(agent_id="kimi", prs_opened={1})
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity = next((b for b in bullets if "Active across" in b), None)
assert activity is not None
@@ -309,7 +309,7 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_pr_merged(self):
"""One merged PR should use singular form."""
metrics = AgentMetrics(agent_id="kimi", prs_merged={1})
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity = next((b for b in bullets if "Active across" in b), None)
assert activity is not None
@@ -318,7 +318,7 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_issue_touched(self):
"""One issue touched should use singular form."""
metrics = AgentMetrics(agent_id="kimi", issues_touched={42})
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity = next((b for b in bullets if "Active across" in b), None)
assert activity is not None
@@ -327,7 +327,7 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_comment(self):
"""One comment should use singular form."""
metrics = AgentMetrics(agent_id="kimi", comments=1)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity = next((b for b in bullets if "Active across" in b), None)
assert activity is not None
@@ -336,14 +336,14 @@ class TestGenerateNarrativeSingularPlural:
def test_singular_test_file(self):
"""One test file should use singular form."""
metrics = AgentMetrics(agent_id="kimi", tests_affected={"test_foo.py"})
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert any("1 test file." in b for b in bullets)
def test_weekly_period_label(self):
"""Weekly period uses 'week' label in no-activity message."""
metrics = AgentMetrics(agent_id="kimi")
bullets = _generate_narrative_bullets(metrics, PeriodType.weekly)
bullets = generate_narrative_bullets(metrics, PeriodType.weekly)
assert any("this week" in b for b in bullets)
@@ -366,11 +366,11 @@ class TestGenerateScorecardTokenAugmentation:
),
]
with patch(
"dashboard.services.scorecard_service._collect_events_for_period",
"dashboard.services.scorecard.core.collect_events_for_period",
return_value=events,
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(50, 0), # ledger says 50 earned
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
@@ -388,11 +388,11 @@ class TestGenerateScorecardTokenAugmentation:
),
]
with patch(
"dashboard.services.scorecard_service._collect_events_for_period",
"dashboard.services.scorecard.core.collect_events_for_period",
return_value=events,
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(500, 100), # ledger says 500 earned, 100 spent
):
scorecard = generate_scorecard("kimi", PeriodType.daily)

View File

@@ -3,21 +3,22 @@
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock, patch
from dashboard.services.scorecard_service import (
from dashboard.services.scorecard import (
AgentMetrics,
PeriodType,
ScorecardSummary,
_aggregate_metrics,
_detect_patterns,
_extract_actor_from_event,
_generate_narrative_bullets,
_get_period_bounds,
_is_tracked_agent,
_query_token_transactions,
generate_all_scorecards,
generate_scorecard,
get_tracked_agents,
)
from dashboard.services.scorecard.aggregators import aggregate_metrics, query_token_transactions
from dashboard.services.scorecard.calculators import detect_patterns
from dashboard.services.scorecard.formatters import generate_narrative_bullets
from dashboard.services.scorecard.validators import (
extract_actor_from_event,
get_period_bounds,
is_tracked_agent,
)
from infrastructure.events.bus import Event
@@ -27,7 +28,7 @@ class TestPeriodBounds:
def test_daily_period_bounds(self):
"""Test daily period returns correct 24-hour window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.daily, reference)
start, end = get_period_bounds(PeriodType.daily, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 20, 0, 0, 0, tzinfo=UTC)
@@ -36,7 +37,7 @@ class TestPeriodBounds:
def test_weekly_period_bounds(self):
"""Test weekly period returns correct 7-day window."""
reference = datetime(2026, 3, 21, 12, 30, 45, tzinfo=UTC)
start, end = _get_period_bounds(PeriodType.weekly, reference)
start, end = get_period_bounds(PeriodType.weekly, reference)
assert end == datetime(2026, 3, 21, 0, 0, 0, tzinfo=UTC)
assert start == datetime(2026, 3, 14, 0, 0, 0, tzinfo=UTC)
@@ -44,7 +45,7 @@ class TestPeriodBounds:
def test_default_reference_date(self):
"""Test default reference date uses current time."""
start, end = _get_period_bounds(PeriodType.daily)
start, end = get_period_bounds(PeriodType.daily)
now = datetime.now(UTC)
# End should be start of current day (midnight)
@@ -70,16 +71,16 @@ class TestTrackedAgents:
def test_is_tracked_agent_true(self):
"""Test _is_tracked_agent returns True for tracked agents."""
assert _is_tracked_agent("kimi") is True
assert _is_tracked_agent("KIMI") is True # case insensitive
assert _is_tracked_agent("claude") is True
assert _is_tracked_agent("hermes") is True
assert is_tracked_agent("kimi") is True
assert is_tracked_agent("KIMI") is True # case insensitive
assert is_tracked_agent("claude") is True
assert is_tracked_agent("hermes") is True
def test_is_tracked_agent_false(self):
"""Test _is_tracked_agent returns False for untracked agents."""
assert _is_tracked_agent("unknown") is False
assert _is_tracked_agent("rockachopa") is False
assert _is_tracked_agent("") is False
assert is_tracked_agent("unknown") is False
assert is_tracked_agent("rockachopa") is False
assert is_tracked_agent("") is False
class TestExtractActor:
@@ -88,22 +89,22 @@ class TestExtractActor:
def test_extract_from_actor_field(self):
"""Test extraction from data.actor field."""
event = Event(type="test", source="system", data={"actor": "kimi"})
assert _extract_actor_from_event(event) == "kimi"
assert extract_actor_from_event(event) == "kimi"
def test_extract_from_agent_id_field(self):
"""Test extraction from data.agent_id field."""
event = Event(type="test", source="system", data={"agent_id": "claude"})
assert _extract_actor_from_event(event) == "claude"
assert extract_actor_from_event(event) == "claude"
def test_extract_from_source_fallback(self):
"""Test fallback to event.source."""
event = Event(type="test", source="gemini", data={})
assert _extract_actor_from_event(event) == "gemini"
assert extract_actor_from_event(event) == "gemini"
def test_actor_priority_over_agent_id(self):
"""Test actor field takes priority over agent_id."""
event = Event(type="test", source="system", data={"actor": "kimi", "agent_id": "claude"})
assert _extract_actor_from_event(event) == "kimi"
assert extract_actor_from_event(event) == "kimi"
class TestAggregateMetrics:
@@ -111,7 +112,7 @@ class TestAggregateMetrics:
def test_empty_events(self):
"""Test aggregation with no events returns empty dict."""
result = _aggregate_metrics([])
result = aggregate_metrics([])
assert result == {}
def test_push_event_aggregation(self):
@@ -120,7 +121,7 @@ class TestAggregateMetrics:
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 3}),
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 2}),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "kimi" in result
assert result["kimi"].commits == 5
@@ -139,7 +140,7 @@ class TestAggregateMetrics:
data={"actor": "claude", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "claude" in result
assert len(result["claude"].issues_touched) == 2
@@ -160,7 +161,7 @@ class TestAggregateMetrics:
data={"actor": "gemini", "issue_number": 101},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "gemini" in result
assert result["gemini"].comments == 2
@@ -185,7 +186,7 @@ class TestAggregateMetrics:
data={"actor": "kimi", "pr_number": 51, "action": "opened"},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].prs_opened) == 2
@@ -199,7 +200,7 @@ class TestAggregateMetrics:
type="gitea.push", source="gitea", data={"actor": "rockachopa", "num_commits": 5}
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "rockachopa" not in result
@@ -216,7 +217,7 @@ class TestAggregateMetrics:
},
),
]
result = _aggregate_metrics(events)
result = aggregate_metrics(events)
assert "kimi" in result
assert len(result["kimi"].tests_affected) == 2
@@ -253,7 +254,7 @@ class TestDetectPatterns:
prs_opened={1, 2, 3, 4, 5},
prs_merged={1, 2, 3, 4}, # 80% merge rate
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High merge rate" in p for p in patterns)
@@ -264,7 +265,7 @@ class TestDetectPatterns:
prs_opened={1, 2, 3, 4, 5},
prs_merged={1}, # 20% merge rate
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("low merge rate" in p for p in patterns)
@@ -275,7 +276,7 @@ class TestDetectPatterns:
commits=15,
prs_opened=set(),
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High commit volume without PRs" in p for p in patterns)
@@ -286,7 +287,7 @@ class TestDetectPatterns:
issues_touched={1, 2, 3, 4, 5, 6},
comments=0,
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("silent worker" in p for p in patterns)
@@ -297,7 +298,7 @@ class TestDetectPatterns:
issues_touched={1, 2}, # 2 issues
comments=10, # 5x comments per issue
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("Highly communicative" in p for p in patterns)
@@ -308,7 +309,7 @@ class TestDetectPatterns:
tokens_earned=150,
tokens_spent=10,
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("Strong token accumulation" in p for p in patterns)
@@ -319,7 +320,7 @@ class TestDetectPatterns:
tokens_earned=10,
tokens_spent=100,
)
patterns = _detect_patterns(metrics)
patterns = detect_patterns(metrics)
assert any("High token spend" in p for p in patterns)
@@ -330,7 +331,7 @@ class TestGenerateNarrative:
def test_empty_metrics_narrative(self):
"""Test narrative for empty metrics mentions no activity."""
metrics = AgentMetrics(agent_id="kimi")
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert len(bullets) == 1
assert "No recorded activity" in bullets[0]
@@ -343,7 +344,7 @@ class TestGenerateNarrative:
prs_opened={1, 2},
prs_merged={1},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
activity_bullet = next((b for b in bullets if "Active across" in b), None)
assert activity_bullet is not None
@@ -357,7 +358,7 @@ class TestGenerateNarrative:
agent_id="kimi",
tests_affected={"test_a.py", "test_b.py"},
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert any("2 test files" in b for b in bullets)
@@ -368,7 +369,7 @@ class TestGenerateNarrative:
tokens_earned=100,
tokens_spent=20,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net earned 80 tokens" in b for b in bullets)
@@ -379,7 +380,7 @@ class TestGenerateNarrative:
tokens_earned=20,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Net spent 80 tokens" in b for b in bullets)
@@ -390,7 +391,7 @@ class TestGenerateNarrative:
tokens_earned=100,
tokens_spent=100,
)
bullets = _generate_narrative_bullets(metrics, PeriodType.daily)
bullets = generate_narrative_bullets(metrics, PeriodType.daily)
assert any("Balanced token flow" in b for b in bullets)
@@ -438,7 +439,7 @@ class TestQueryTokenTransactions:
def test_empty_ledger(self):
"""Test empty ledger returns zero values."""
with patch("lightning.ledger.get_transactions", return_value=[]):
earned, spent = _query_token_transactions("kimi", datetime.now(UTC), datetime.now(UTC))
earned, spent = query_token_transactions("kimi", datetime.now(UTC), datetime.now(UTC))
assert earned == 0
assert spent == 0
@@ -460,7 +461,7 @@ class TestQueryTokenTransactions:
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
earned, spent = query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 100
@@ -478,7 +479,7 @@ class TestQueryTokenTransactions:
),
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
earned, spent = _query_token_transactions(
earned, spent = query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was for claude, not kimi
@@ -497,7 +498,7 @@ class TestQueryTokenTransactions:
]
with patch("lightning.ledger.get_transactions", return_value=mock_tx):
# Query for today only
earned, spent = _query_token_transactions(
earned, spent = query_token_transactions(
"kimi", now - timedelta(hours=1), now + timedelta(hours=1)
)
assert earned == 0 # Transaction was 2 days ago
@@ -508,11 +509,9 @@ class TestGenerateScorecard:
def test_generate_scorecard_no_activity(self):
"""Test scorecard generation for agent with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch("dashboard.services.scorecard.core.collect_events_for_period", return_value=[]):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(0, 0),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
@@ -529,10 +528,10 @@ class TestGenerateScorecard:
Event(type="gitea.push", source="gitea", data={"actor": "kimi", "num_commits": 5}),
]
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=events
"dashboard.services.scorecard.core.collect_events_for_period", return_value=events
):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(100, 20),
):
scorecard = generate_scorecard("kimi", PeriodType.daily)
@@ -548,11 +547,9 @@ class TestGenerateAllScorecards:
def test_generates_for_all_tracked_agents(self):
"""Test all tracked agents get scorecards even with no activity."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch("dashboard.services.scorecard.core.collect_events_for_period", return_value=[]):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)
@@ -563,11 +560,9 @@ class TestGenerateAllScorecards:
def test_scorecards_sorted(self):
"""Test scorecards are sorted by agent_id."""
with patch(
"dashboard.services.scorecard_service._collect_events_for_period", return_value=[]
):
with patch("dashboard.services.scorecard.core.collect_events_for_period", return_value=[]):
with patch(
"dashboard.services.scorecard_service._query_token_transactions",
"dashboard.services.scorecard.core.query_token_transactions",
return_value=(0, 0),
):
scorecards = generate_all_scorecards(PeriodType.daily)