Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
516 lines
16 KiB
Python
516 lines
16 KiB
Python
"""Agent scorecard service — track and summarize agent performance.
|
|
|
|
Generates daily/weekly scorecards showing:
|
|
- Issues touched, PRs opened/merged
|
|
- Tests affected, tokens earned/spent
|
|
- Pattern highlights (merge rate, activity quality)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime, timedelta
|
|
from enum import StrEnum
|
|
from typing import Any
|
|
|
|
from infrastructure.events.bus import Event, get_event_bus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Bot/agent usernames to track
|
|
TRACKED_AGENTS = frozenset({"hermes", "kimi", "manus", "claude", "gemini"})
|
|
|
|
|
|
class PeriodType(StrEnum):
|
|
daily = "daily"
|
|
weekly = "weekly"
|
|
|
|
|
|
@dataclass
|
|
class AgentMetrics:
|
|
"""Raw metrics collected for an agent over a period."""
|
|
|
|
agent_id: str
|
|
issues_touched: set[int] = field(default_factory=set)
|
|
prs_opened: set[int] = field(default_factory=set)
|
|
prs_merged: set[int] = field(default_factory=set)
|
|
tests_affected: set[str] = field(default_factory=set)
|
|
tokens_earned: int = 0
|
|
tokens_spent: int = 0
|
|
commits: int = 0
|
|
comments: int = 0
|
|
|
|
@property
|
|
def pr_merge_rate(self) -> float:
|
|
"""Calculate PR merge rate (0.0 - 1.0)."""
|
|
opened = len(self.prs_opened)
|
|
if opened == 0:
|
|
return 0.0
|
|
return len(self.prs_merged) / opened
|
|
|
|
|
|
@dataclass
|
|
class ScorecardSummary:
|
|
"""A generated scorecard with narrative summary."""
|
|
|
|
agent_id: str
|
|
period_type: PeriodType
|
|
period_start: datetime
|
|
period_end: datetime
|
|
metrics: AgentMetrics
|
|
narrative_bullets: list[str] = field(default_factory=list)
|
|
patterns: list[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Convert scorecard to dictionary for JSON serialization."""
|
|
return {
|
|
"agent_id": self.agent_id,
|
|
"period_type": self.period_type.value,
|
|
"period_start": self.period_start.isoformat(),
|
|
"period_end": self.period_end.isoformat(),
|
|
"metrics": {
|
|
"issues_touched": len(self.metrics.issues_touched),
|
|
"prs_opened": len(self.metrics.prs_opened),
|
|
"prs_merged": len(self.metrics.prs_merged),
|
|
"pr_merge_rate": round(self.metrics.pr_merge_rate, 2),
|
|
"tests_affected": len(self.tests_affected),
|
|
"commits": self.metrics.commits,
|
|
"comments": self.metrics.comments,
|
|
"tokens_earned": self.metrics.tokens_earned,
|
|
"tokens_spent": self.metrics.tokens_spent,
|
|
"token_net": self.metrics.tokens_earned - self.metrics.tokens_spent,
|
|
},
|
|
"narrative_bullets": self.narrative_bullets,
|
|
"patterns": self.patterns,
|
|
}
|
|
|
|
@property
|
|
def tests_affected(self) -> set[str]:
|
|
"""Alias for metrics.tests_affected."""
|
|
return self.metrics.tests_affected
|
|
|
|
|
|
def _get_period_bounds(
|
|
period_type: PeriodType, reference_date: datetime | None = None
|
|
) -> tuple[datetime, datetime]:
|
|
"""Calculate start and end timestamps for a period.
|
|
|
|
Args:
|
|
period_type: daily or weekly
|
|
reference_date: The date to calculate from (defaults to now)
|
|
|
|
Returns:
|
|
Tuple of (period_start, period_end) in UTC
|
|
"""
|
|
if reference_date is None:
|
|
reference_date = datetime.now(UTC)
|
|
|
|
# Normalize to start of day
|
|
end = reference_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
if period_type == PeriodType.daily:
|
|
start = end - timedelta(days=1)
|
|
else: # weekly
|
|
start = end - timedelta(days=7)
|
|
|
|
return start, end
|
|
|
|
|
|
def _collect_events_for_period(
|
|
start: datetime, end: datetime, agent_id: str | None = None
|
|
) -> list[Event]:
|
|
"""Collect events from the event bus for a time period.
|
|
|
|
Args:
|
|
start: Period start time
|
|
end: Period end time
|
|
agent_id: Optional agent filter
|
|
|
|
Returns:
|
|
List of matching events
|
|
"""
|
|
bus = get_event_bus()
|
|
events: list[Event] = []
|
|
|
|
# Query persisted events for relevant types
|
|
event_types = [
|
|
"gitea.push",
|
|
"gitea.issue.opened",
|
|
"gitea.issue.comment",
|
|
"gitea.pull_request",
|
|
"agent.task.completed",
|
|
"test.execution",
|
|
]
|
|
|
|
for event_type in event_types:
|
|
try:
|
|
type_events = bus.replay(
|
|
event_type=event_type,
|
|
source=agent_id,
|
|
limit=1000,
|
|
)
|
|
events.extend(type_events)
|
|
except Exception as exc:
|
|
logger.debug("Failed to replay events for %s: %s", event_type, exc)
|
|
|
|
# Filter by timestamp
|
|
filtered = []
|
|
for event in events:
|
|
try:
|
|
event_time = datetime.fromisoformat(event.timestamp.replace("Z", "+00:00"))
|
|
if start <= event_time < end:
|
|
filtered.append(event)
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
return filtered
|
|
|
|
|
|
def _extract_actor_from_event(event: Event) -> str:
|
|
"""Extract the actor/agent from an event."""
|
|
# Try data fields first
|
|
if "actor" in event.data:
|
|
return event.data["actor"]
|
|
if "agent_id" in event.data:
|
|
return event.data["agent_id"]
|
|
# Fall back to source
|
|
return event.source
|
|
|
|
|
|
def _is_tracked_agent(actor: str) -> bool:
|
|
"""Check if an actor is a tracked agent."""
|
|
return actor.lower() in TRACKED_AGENTS
|
|
|
|
|
|
def _aggregate_metrics(events: list[Event]) -> dict[str, AgentMetrics]:
|
|
"""Aggregate metrics from events grouped by agent.
|
|
|
|
Args:
|
|
events: List of events to process
|
|
|
|
Returns:
|
|
Dict mapping agent_id -> AgentMetrics
|
|
"""
|
|
metrics_by_agent: dict[str, AgentMetrics] = {}
|
|
|
|
for event in events:
|
|
actor = _extract_actor_from_event(event)
|
|
|
|
# Skip non-agent events unless they explicitly have an agent_id
|
|
if not _is_tracked_agent(actor) and "agent_id" not in event.data:
|
|
continue
|
|
|
|
if actor not in metrics_by_agent:
|
|
metrics_by_agent[actor] = AgentMetrics(agent_id=actor)
|
|
|
|
metrics = metrics_by_agent[actor]
|
|
|
|
# Process based on event type
|
|
event_type = event.type
|
|
|
|
if event_type == "gitea.push":
|
|
metrics.commits += event.data.get("num_commits", 1)
|
|
|
|
elif event_type == "gitea.issue.opened":
|
|
issue_num = event.data.get("issue_number", 0)
|
|
if issue_num:
|
|
metrics.issues_touched.add(issue_num)
|
|
|
|
elif event_type == "gitea.issue.comment":
|
|
metrics.comments += 1
|
|
issue_num = event.data.get("issue_number", 0)
|
|
if issue_num:
|
|
metrics.issues_touched.add(issue_num)
|
|
|
|
elif event_type == "gitea.pull_request":
|
|
pr_num = event.data.get("pr_number", 0)
|
|
action = event.data.get("action", "")
|
|
merged = event.data.get("merged", False)
|
|
|
|
if pr_num:
|
|
if action == "opened":
|
|
metrics.prs_opened.add(pr_num)
|
|
elif action == "closed" and merged:
|
|
metrics.prs_merged.add(pr_num)
|
|
# Also count as touched issue for tracking
|
|
metrics.issues_touched.add(pr_num)
|
|
|
|
elif event_type == "agent.task.completed":
|
|
# Extract test files from task data
|
|
affected = event.data.get("tests_affected", [])
|
|
for test in affected:
|
|
metrics.tests_affected.add(test)
|
|
|
|
# Token rewards from task completion
|
|
reward = event.data.get("token_reward", 0)
|
|
if reward:
|
|
metrics.tokens_earned += reward
|
|
|
|
elif event_type == "test.execution":
|
|
# Track test files that were executed
|
|
test_files = event.data.get("test_files", [])
|
|
for test in test_files:
|
|
metrics.tests_affected.add(test)
|
|
|
|
return metrics_by_agent
|
|
|
|
|
|
def _query_token_transactions(agent_id: str, start: datetime, end: datetime) -> tuple[int, int]:
|
|
"""Query the lightning ledger for token transactions.
|
|
|
|
Args:
|
|
agent_id: The agent to query for
|
|
start: Period start
|
|
end: Period end
|
|
|
|
Returns:
|
|
Tuple of (tokens_earned, tokens_spent)
|
|
"""
|
|
try:
|
|
from lightning.ledger import get_transactions
|
|
|
|
transactions = get_transactions(limit=1000)
|
|
|
|
earned = 0
|
|
spent = 0
|
|
|
|
for tx in transactions:
|
|
# Filter by agent if specified
|
|
if tx.agent_id and tx.agent_id != agent_id:
|
|
continue
|
|
|
|
# Filter by timestamp
|
|
try:
|
|
tx_time = datetime.fromisoformat(tx.created_at.replace("Z", "+00:00"))
|
|
if not (start <= tx_time < end):
|
|
continue
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
if tx.tx_type.value == "incoming":
|
|
earned += tx.amount_sats
|
|
else:
|
|
spent += tx.amount_sats
|
|
|
|
return earned, spent
|
|
|
|
except Exception as exc:
|
|
logger.debug("Failed to query token transactions: %s", exc)
|
|
return 0, 0
|
|
|
|
|
|
def _generate_narrative_bullets(metrics: AgentMetrics, period_type: PeriodType) -> list[str]:
|
|
"""Generate narrative summary bullets for a scorecard.
|
|
|
|
Args:
|
|
metrics: The agent's metrics
|
|
period_type: daily or weekly
|
|
|
|
Returns:
|
|
List of narrative bullet points
|
|
"""
|
|
bullets: list[str] = []
|
|
period_label = "day" if period_type == PeriodType.daily else "week"
|
|
|
|
# Activity summary
|
|
activities = []
|
|
if metrics.commits:
|
|
activities.append(f"{metrics.commits} commit{'s' if metrics.commits != 1 else ''}")
|
|
if len(metrics.prs_opened):
|
|
activities.append(
|
|
f"{len(metrics.prs_opened)} PR{'s' if len(metrics.prs_opened) != 1 else ''} opened"
|
|
)
|
|
if len(metrics.prs_merged):
|
|
activities.append(
|
|
f"{len(metrics.prs_merged)} PR{'s' if len(metrics.prs_merged) != 1 else ''} merged"
|
|
)
|
|
if len(metrics.issues_touched):
|
|
activities.append(
|
|
f"{len(metrics.issues_touched)} issue{'s' if len(metrics.issues_touched) != 1 else ''} touched"
|
|
)
|
|
if metrics.comments:
|
|
activities.append(f"{metrics.comments} comment{'s' if metrics.comments != 1 else ''}")
|
|
|
|
if activities:
|
|
bullets.append(f"Active across {', '.join(activities)} this {period_label}.")
|
|
|
|
# Test activity
|
|
if len(metrics.tests_affected):
|
|
bullets.append(
|
|
f"Affected {len(metrics.tests_affected)} test file{'s' if len(metrics.tests_affected) != 1 else ''}."
|
|
)
|
|
|
|
# Token summary
|
|
net_tokens = metrics.tokens_earned - metrics.tokens_spent
|
|
if metrics.tokens_earned or metrics.tokens_spent:
|
|
if net_tokens > 0:
|
|
bullets.append(
|
|
f"Net earned {net_tokens} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
|
|
)
|
|
elif net_tokens < 0:
|
|
bullets.append(
|
|
f"Net spent {abs(net_tokens)} tokens ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
|
|
)
|
|
else:
|
|
bullets.append(
|
|
f"Balanced token flow ({metrics.tokens_earned} earned, {metrics.tokens_spent} spent)."
|
|
)
|
|
|
|
# Handle empty case
|
|
if not bullets:
|
|
bullets.append(f"No recorded activity this {period_label}.")
|
|
|
|
return bullets
|
|
|
|
|
|
def _detect_patterns(metrics: AgentMetrics) -> list[str]:
|
|
"""Detect interesting patterns in agent behavior.
|
|
|
|
Args:
|
|
metrics: The agent's metrics
|
|
|
|
Returns:
|
|
List of pattern descriptions
|
|
"""
|
|
patterns: list[str] = []
|
|
|
|
pr_opened = len(metrics.prs_opened)
|
|
merge_rate = metrics.pr_merge_rate
|
|
|
|
# Merge rate patterns
|
|
if pr_opened >= 3:
|
|
if merge_rate >= 0.8:
|
|
patterns.append("High merge rate with few failures — code quality focus.")
|
|
elif merge_rate <= 0.3:
|
|
patterns.append("Lots of noisy PRs, low merge rate — may need review support.")
|
|
|
|
# Activity patterns
|
|
if metrics.commits > 10 and pr_opened == 0:
|
|
patterns.append("High commit volume without PRs — working directly on main?")
|
|
|
|
if len(metrics.issues_touched) > 5 and metrics.comments == 0:
|
|
patterns.append("Touching many issues but low comment volume — silent worker.")
|
|
|
|
if metrics.comments > len(metrics.issues_touched) * 2:
|
|
patterns.append("Highly communicative — lots of discussion relative to work items.")
|
|
|
|
# Token patterns
|
|
net_tokens = metrics.tokens_earned - metrics.tokens_spent
|
|
if net_tokens > 100:
|
|
patterns.append("Strong token accumulation — high value delivery.")
|
|
elif net_tokens < -50:
|
|
patterns.append("High token spend — may be in experimentation phase.")
|
|
|
|
return patterns
|
|
|
|
|
|
def generate_scorecard(
|
|
agent_id: str,
|
|
period_type: PeriodType = PeriodType.daily,
|
|
reference_date: datetime | None = None,
|
|
) -> ScorecardSummary | None:
|
|
"""Generate a scorecard for a single agent.
|
|
|
|
Args:
|
|
agent_id: The agent to generate scorecard for
|
|
period_type: daily or weekly
|
|
reference_date: The date to calculate from (defaults to now)
|
|
|
|
Returns:
|
|
ScorecardSummary or None if agent has no activity
|
|
"""
|
|
start, end = _get_period_bounds(period_type, reference_date)
|
|
|
|
# Collect events
|
|
events = _collect_events_for_period(start, end, agent_id)
|
|
|
|
# Aggregate metrics
|
|
all_metrics = _aggregate_metrics(events)
|
|
|
|
# Get metrics for this specific agent
|
|
if agent_id not in all_metrics:
|
|
# Create empty metrics - still generate a scorecard
|
|
metrics = AgentMetrics(agent_id=agent_id)
|
|
else:
|
|
metrics = all_metrics[agent_id]
|
|
|
|
# Augment with token data from ledger
|
|
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
|
|
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
|
|
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
|
|
|
|
# Generate narrative and patterns
|
|
narrative = _generate_narrative_bullets(metrics, period_type)
|
|
patterns = _detect_patterns(metrics)
|
|
|
|
return ScorecardSummary(
|
|
agent_id=agent_id,
|
|
period_type=period_type,
|
|
period_start=start,
|
|
period_end=end,
|
|
metrics=metrics,
|
|
narrative_bullets=narrative,
|
|
patterns=patterns,
|
|
)
|
|
|
|
|
|
def generate_all_scorecards(
|
|
period_type: PeriodType = PeriodType.daily,
|
|
reference_date: datetime | None = None,
|
|
) -> list[ScorecardSummary]:
|
|
"""Generate scorecards for all tracked agents.
|
|
|
|
Args:
|
|
period_type: daily or weekly
|
|
reference_date: The date to calculate from (defaults to now)
|
|
|
|
Returns:
|
|
List of ScorecardSummary for all agents with activity
|
|
"""
|
|
start, end = _get_period_bounds(period_type, reference_date)
|
|
|
|
# Collect all events
|
|
events = _collect_events_for_period(start, end)
|
|
|
|
# Aggregate metrics for all agents
|
|
all_metrics = _aggregate_metrics(events)
|
|
|
|
# Include tracked agents even if no activity
|
|
for agent_id in TRACKED_AGENTS:
|
|
if agent_id not in all_metrics:
|
|
all_metrics[agent_id] = AgentMetrics(agent_id=agent_id)
|
|
|
|
# Generate scorecards
|
|
scorecards: list[ScorecardSummary] = []
|
|
|
|
for agent_id, metrics in all_metrics.items():
|
|
# Augment with token data
|
|
tokens_earned, tokens_spent = _query_token_transactions(agent_id, start, end)
|
|
metrics.tokens_earned = max(metrics.tokens_earned, tokens_earned)
|
|
metrics.tokens_spent = max(metrics.tokens_spent, tokens_spent)
|
|
|
|
narrative = _generate_narrative_bullets(metrics, period_type)
|
|
patterns = _detect_patterns(metrics)
|
|
|
|
scorecard = ScorecardSummary(
|
|
agent_id=agent_id,
|
|
period_type=period_type,
|
|
period_start=start,
|
|
period_end=end,
|
|
metrics=metrics,
|
|
narrative_bullets=narrative,
|
|
patterns=patterns,
|
|
)
|
|
scorecards.append(scorecard)
|
|
|
|
# Sort by agent_id for consistent ordering
|
|
scorecards.sort(key=lambda s: s.agent_id)
|
|
|
|
return scorecards
|
|
|
|
|
|
def get_tracked_agents() -> list[str]:
|
|
"""Return the list of tracked agent IDs."""
|
|
return sorted(TRACKED_AGENTS)
|