388 lines
15 KiB
Python
388 lines
15 KiB
Python
"""
|
|
Nexus Chronicle — Emergent Narrative from Agent Interactions
|
|
|
|
Watches the fleet's activity (dispatches, errors, recoveries,
|
|
collaborations) and transforms raw event data into narrative prose.
|
|
The system finds the dramatic arc in real work and produces a living
|
|
chronicle. The story writes itself from the data.
|
|
|
|
Usage:
|
|
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
|
|
|
|
writer = ChronicleWriter()
|
|
|
|
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="took issue #42"))
|
|
writer.ingest(AgentEvent(kind=EventKind.ERROR, agent="claude", detail="rate limit hit"))
|
|
writer.ingest(AgentEvent(kind=EventKind.RECOVERY, agent="claude", detail="retried after backoff"))
|
|
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: add narrative engine"))
|
|
|
|
prose = writer.render()
|
|
print(prose)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Event model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class EventKind(str, Enum):
|
|
"""The kinds of agent events the chronicle recognises."""
|
|
|
|
DISPATCH = "dispatch" # agent claimed / was assigned work
|
|
COMMIT = "commit" # agent produced a commit
|
|
PUSH = "push" # agent pushed a branch
|
|
PR_OPEN = "pr_open" # agent opened a pull request
|
|
PR_MERGE = "pr_merge" # PR was merged
|
|
ERROR = "error" # agent hit an error / exception
|
|
RECOVERY = "recovery" # agent recovered from a failure
|
|
ABANDON = "abandon" # agent abandoned a task (timeout / giving up)
|
|
COLLABORATION = "collab" # two agents worked on the same thing
|
|
HEARTBEAT = "heartbeat" # agent reported a heartbeat (alive signal)
|
|
IDLE = "idle" # agent is waiting for work
|
|
MILESTONE = "milestone" # notable achievement (e.g. 100th issue closed)
|
|
|
|
|
|
@dataclass
|
|
class AgentEvent:
|
|
"""One discrete thing that happened in the fleet."""
|
|
|
|
kind: EventKind
|
|
agent: str # who did this (e.g. "claude", "mimo-v2-pro")
|
|
detail: str = "" # free-text description
|
|
timestamp: float = field(default_factory=time.time)
|
|
metadata: dict = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"kind": self.kind.value,
|
|
"agent": self.agent,
|
|
"detail": self.detail,
|
|
"timestamp": self.timestamp,
|
|
"metadata": self.metadata,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "AgentEvent":
|
|
return cls(
|
|
kind=EventKind(data["kind"]),
|
|
agent=data["agent"],
|
|
detail=data.get("detail", ""),
|
|
timestamp=data.get("timestamp", time.time()),
|
|
metadata=data.get("metadata", {}),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Narrative templates — maps event kinds to prose fragments
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Each entry is a list so we can rotate through variants.
|
|
_TEMPLATES: dict[EventKind, list[str]] = {
|
|
EventKind.DISPATCH: [
|
|
"{agent} stepped forward and claimed the work: {detail}.",
|
|
"{agent} took on the challenge — {detail}.",
|
|
"The task landed on {agent}'s desk: {detail}.",
|
|
],
|
|
EventKind.COMMIT: [
|
|
'{agent} sealed a commit into the record: "{detail}".',
|
|
'{agent} committed "{detail}" — progress crystallised.',
|
|
"{agent} carved a new ring into the trunk: {detail}.",
|
|
],
|
|
EventKind.PUSH: [
|
|
"{agent} pushed the work upstream.",
|
|
"The branch rose into the forge — {agent}'s changes were live.",
|
|
"{agent} sent their work into the wider current.",
|
|
],
|
|
EventKind.PR_OPEN: [
|
|
"{agent} opened a pull request: {detail}.",
|
|
"A proposal surfaced — {agent} asked the fleet to review {detail}.",
|
|
"{agent} laid their work before the reviewers: {detail}.",
|
|
],
|
|
EventKind.PR_MERGE: [
|
|
"{agent}'s branch folded into the whole: {detail}.",
|
|
"Consensus reached — {agent}'s changes were merged: {detail}.",
|
|
"{detail} joined the canon. {agent}'s contribution lives on.",
|
|
],
|
|
EventKind.ERROR: [
|
|
"{agent} ran into an obstacle: {detail}.",
|
|
"Trouble. {agent} encountered {detail} and had to pause.",
|
|
"The path grew difficult — {agent} hit {detail}.",
|
|
],
|
|
EventKind.RECOVERY: [
|
|
"{agent} regrouped and pressed on: {detail}.",
|
|
"After the setback, {agent} found a way through: {detail}.",
|
|
"{agent} recovered — {detail}.",
|
|
],
|
|
EventKind.ABANDON: [
|
|
"{agent} released the task, unable to finish: {detail}.",
|
|
"Sometimes wisdom is knowing when to let go. {agent} abandoned {detail}.",
|
|
"{agent} stepped back from {detail}. Another will carry it forward.",
|
|
],
|
|
EventKind.COLLABORATION: [
|
|
"{agent} and their peers converged on the same problem: {detail}.",
|
|
"Two minds touched the same work — {agent} in collaboration: {detail}.",
|
|
"The fleet coordinated — {agent} joined the effort on {detail}.",
|
|
],
|
|
EventKind.HEARTBEAT: [
|
|
"{agent} checked in — still thinking, still present.",
|
|
"A pulse from {agent}: the mind is alive.",
|
|
"{agent} breathed through another cycle.",
|
|
],
|
|
EventKind.IDLE: [
|
|
"{agent} rested, waiting for the next call.",
|
|
"Quiet descended — {agent} held still between tasks.",
|
|
"{agent} stood ready, watchful in the lull.",
|
|
],
|
|
EventKind.MILESTONE: [
|
|
"A moment worth noting — {agent}: {detail}.",
|
|
"The chronicle marks a milestone. {agent}: {detail}.",
|
|
"History ticked over — {agent} reached {detail}.",
|
|
],
|
|
}
|
|
|
|
# Arc-level commentary triggered by sequences of events
|
|
_ARC_TEMPLATES = {
|
|
"struggle_and_recovery": (
|
|
"There was a struggle here. {agent} hit trouble and came back stronger — "
|
|
"the kind of arc that gives a chronicle its texture."
|
|
),
|
|
"silent_grind": (
|
|
"No drama, just steady work. {agents} moved through the backlog with quiet persistence."
|
|
),
|
|
"abandon_then_retry": (
|
|
"{agent} let go once. But the work called again, and this time it was answered."
|
|
),
|
|
"solo_sprint": (
|
|
"{agent} ran the whole arc alone — dispatch to merge — without breaking stride."
|
|
),
|
|
"fleet_convergence": (
|
|
"The fleet converged. Multiple agents touched the same thread and wove it tighter."
|
|
),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chronicle writer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ChronicleWriter:
|
|
"""Accumulates agent events and renders them as narrative prose.
|
|
|
|
The writer keeps a running log of events. Call ``ingest()`` to add new
|
|
events as they arrive, then ``render()`` to produce a prose snapshot of
|
|
the current arc.
|
|
|
|
Events are also persisted to JSONL so the chronicle survives restarts.
|
|
"""
|
|
|
|
def __init__(self, log_path: Optional[Path] = None):
|
|
today = time.strftime("%Y-%m-%d")
|
|
self.log_path = log_path or (
|
|
Path.home() / ".nexus" / "chronicle" / f"chronicle_{today}.jsonl"
|
|
)
|
|
self.log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._events: list[AgentEvent] = []
|
|
self._template_counters: dict[EventKind, int] = {}
|
|
|
|
# Load any events already on disk for today
|
|
self._load_existing()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def ingest(self, event: AgentEvent) -> None:
|
|
"""Add an event to the chronicle and persist it to disk."""
|
|
self._events.append(event)
|
|
with open(self.log_path, "a") as f:
|
|
f.write(json.dumps(event.to_dict()) + "\n")
|
|
|
|
def render(self, max_events: int = 50) -> str:
|
|
"""Render the recent event stream as narrative prose.
|
|
|
|
Returns a multi-paragraph string suitable for display or logging.
|
|
"""
|
|
events = self._events[-max_events:]
|
|
if not events:
|
|
return "The chronicle is empty. No events have been recorded yet."
|
|
|
|
paragraphs: list[str] = []
|
|
|
|
# Opening line with timestamp range
|
|
first_ts = time.strftime("%H:%M", time.localtime(events[0].timestamp))
|
|
last_ts = time.strftime("%H:%M", time.localtime(events[-1].timestamp))
|
|
paragraphs.append(
|
|
f"The chronicle covers {len(events)} event(s) between {first_ts} and {last_ts}."
|
|
)
|
|
|
|
# Event-by-event prose
|
|
sentences: list[str] = []
|
|
for evt in events:
|
|
sentences.append(self._render_event(evt))
|
|
paragraphs.append(" ".join(sentences))
|
|
|
|
# Arc-level commentary
|
|
arc = self._detect_arc(events)
|
|
if arc:
|
|
paragraphs.append(arc)
|
|
|
|
return "\n\n".join(paragraphs)
|
|
|
|
def render_markdown(self, max_events: int = 50) -> str:
|
|
"""Render as a Markdown document."""
|
|
events = self._events[-max_events:]
|
|
if not events:
|
|
return "# Chronicle\n\n*No events recorded yet.*"
|
|
|
|
today = time.strftime("%Y-%m-%d")
|
|
lines = [f"# Chronicle — {today}", ""]
|
|
|
|
for evt in events:
|
|
ts = time.strftime("%H:%M:%S", time.localtime(evt.timestamp))
|
|
prose = self._render_event(evt)
|
|
lines.append(f"**{ts}** — {prose}")
|
|
|
|
arc = self._detect_arc(events)
|
|
if arc:
|
|
lines += ["", "---", "", f"*{arc}*"]
|
|
|
|
return "\n".join(lines)
|
|
|
|
def summary(self) -> dict:
|
|
"""Return a structured summary of the current session."""
|
|
agents: dict[str, dict] = {}
|
|
kind_counts: dict[str, int] = {}
|
|
|
|
for evt in self._events:
|
|
agents.setdefault(evt.agent, {"events": 0, "kinds": []})
|
|
agents[evt.agent]["events"] += 1
|
|
agents[evt.agent]["kinds"].append(evt.kind.value)
|
|
kind_counts[evt.kind.value] = kind_counts.get(evt.kind.value, 0) + 1
|
|
|
|
return {
|
|
"total_events": len(self._events),
|
|
"agents": agents,
|
|
"kind_counts": kind_counts,
|
|
"log_path": str(self.log_path),
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal
|
|
# ------------------------------------------------------------------
|
|
|
|
def _render_event(self, evt: AgentEvent) -> str:
|
|
"""Turn a single event into a prose sentence."""
|
|
templates = _TEMPLATES.get(evt.kind, ["{agent}: {detail}"])
|
|
counter = self._template_counters.get(evt.kind, 0)
|
|
template = templates[counter % len(templates)]
|
|
self._template_counters[evt.kind] = counter + 1
|
|
return template.format(agent=evt.agent, detail=evt.detail or evt.kind.value)
|
|
|
|
def _detect_arc(self, events: list[AgentEvent]) -> Optional[str]:
|
|
"""Scan the event sequence for a recognisable dramatic arc."""
|
|
if not events:
|
|
return None
|
|
|
|
kinds = [e.kind for e in events]
|
|
agents = list({e.agent for e in events})
|
|
|
|
# struggle → recovery
|
|
if EventKind.ERROR in kinds and EventKind.RECOVERY in kinds:
|
|
err_idx = kinds.index(EventKind.ERROR)
|
|
rec_idx = kinds.index(EventKind.RECOVERY)
|
|
if rec_idx > err_idx:
|
|
agent = events[err_idx].agent
|
|
return _ARC_TEMPLATES["struggle_and_recovery"].format(agent=agent)
|
|
|
|
# abandon → dispatch (retry): find first ABANDON, then any DISPATCH after it
|
|
if EventKind.ABANDON in kinds and EventKind.DISPATCH in kinds:
|
|
ab_idx = kinds.index(EventKind.ABANDON)
|
|
retry_idx = next(
|
|
(i for i, k in enumerate(kinds) if k == EventKind.DISPATCH and i > ab_idx),
|
|
None,
|
|
)
|
|
if retry_idx is not None:
|
|
agent = events[retry_idx].agent
|
|
return _ARC_TEMPLATES["abandon_then_retry"].format(agent=agent)
|
|
|
|
# solo sprint: single agent goes dispatch→commit→pr_open→pr_merge
|
|
solo_arc = {EventKind.DISPATCH, EventKind.COMMIT, EventKind.PR_OPEN, EventKind.PR_MERGE}
|
|
if solo_arc.issubset(set(kinds)) and len(agents) == 1:
|
|
return _ARC_TEMPLATES["solo_sprint"].format(agent=agents[0])
|
|
|
|
# fleet convergence: multiple agents, collaboration event
|
|
if len(agents) > 1 and EventKind.COLLABORATION in kinds:
|
|
return _ARC_TEMPLATES["fleet_convergence"]
|
|
|
|
# silent grind: only commits / heartbeats, no drama
|
|
drama = {EventKind.ERROR, EventKind.ABANDON, EventKind.RECOVERY, EventKind.COLLABORATION}
|
|
if not drama.intersection(set(kinds)) and EventKind.COMMIT in kinds:
|
|
return _ARC_TEMPLATES["silent_grind"].format(agents=", ".join(agents))
|
|
|
|
return None
|
|
|
|
def _load_existing(self) -> None:
|
|
"""Load events persisted from earlier in the same session."""
|
|
if not self.log_path.exists():
|
|
return
|
|
with open(self.log_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
self._events.append(AgentEvent.from_dict(json.loads(line)))
|
|
except (json.JSONDecodeError, KeyError, ValueError):
|
|
continue # skip malformed lines
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Convenience: build events from common fleet signals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def event_from_gitea_issue(payload: dict, agent: str) -> AgentEvent:
|
|
"""Build a DISPATCH event from a Gitea issue assignment payload."""
|
|
issue_num = payload.get("number", "?")
|
|
title = payload.get("title", "")
|
|
return AgentEvent(
|
|
kind=EventKind.DISPATCH,
|
|
agent=agent,
|
|
detail=f"issue #{issue_num}: {title}",
|
|
metadata={"issue_number": issue_num},
|
|
)
|
|
|
|
|
|
def event_from_heartbeat(hb: dict) -> AgentEvent:
|
|
"""Build a HEARTBEAT event from a nexus heartbeat dict."""
|
|
agent = hb.get("model", "unknown")
|
|
status = hb.get("status", "thinking")
|
|
cycle = hb.get("cycle", 0)
|
|
return AgentEvent(
|
|
kind=EventKind.HEARTBEAT,
|
|
agent=agent,
|
|
detail=f"cycle {cycle}, status={status}",
|
|
metadata=hb,
|
|
)
|
|
|
|
|
|
def event_from_commit(commit: dict, agent: str) -> AgentEvent:
|
|
"""Build a COMMIT event from a git commit dict."""
|
|
message = commit.get("message", "").split("\n")[0] # subject line only
|
|
sha = commit.get("sha", "")[:8]
|
|
return AgentEvent(
|
|
kind=EventKind.COMMIT,
|
|
agent=agent,
|
|
detail=message,
|
|
metadata={"sha": sha},
|
|
)
|