Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
560454528a feat: emergent narrative engine from agent interactions (#1607)
Some checks failed
CI / test (pull_request) Failing after 58s
Review Approval Gate / verify-review (pull_request) Failing after 7s
CI / validate (pull_request) Failing after 56s
Add nexus/chronicle.py — a living chronicle that watches fleet events
(dispatches, commits, errors, recoveries, collaborations) and renders
them as narrative prose.

- AgentEvent dataclass captures every fleet signal with kind, agent,
  detail, timestamp, and metadata
- ChronicleWriter ingests events, persists to daily JSONL, and renders
  prose via rotating template banks
- Arc detection finds dramatic patterns (struggle-and-recovery, solo
  sprint, fleet convergence, silent grind, abandon-then-retry) and
  appends arc-level commentary to the rendered chronicle
- render() and render_markdown() produce human-readable output;
  summary() returns structured stats
- Convenience constructors (event_from_gitea_issue, event_from_heartbeat,
  event_from_commit) bridge existing fleet signals into the chronicle
- 22 tests cover ingestion, persistence, template rotation, arc
  detection, markdown rendering, and edge cases

Fixes #1607

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 01:19:40 -04:00
3 changed files with 602 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
try:
from nexus.nexus_think import NexusMind
@@ -29,4 +30,7 @@ __all__ = [
"ExperienceStore",
"TrajectoryLogger",
"NexusMind",
"ChronicleWriter",
"AgentEvent",
"EventKind",
]

387
nexus/chronicle.py Normal file
View File

@@ -0,0 +1,387 @@
"""
Nexus Chronicle — Emergent Narrative from Agent Interactions
Watches the fleet's activity (dispatches, errors, recoveries,
collaborations) and transforms raw event data into narrative prose.
The system finds the dramatic arc in real work and produces a living
chronicle. The story writes itself from the data.
Usage:
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
writer = ChronicleWriter()
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="took issue #42"))
writer.ingest(AgentEvent(kind=EventKind.ERROR, agent="claude", detail="rate limit hit"))
writer.ingest(AgentEvent(kind=EventKind.RECOVERY, agent="claude", detail="retried after backoff"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: add narrative engine"))
prose = writer.render()
print(prose)
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Event model
# ---------------------------------------------------------------------------
class EventKind(str, Enum):
"""The kinds of agent events the chronicle recognises."""
DISPATCH = "dispatch" # agent claimed / was assigned work
COMMIT = "commit" # agent produced a commit
PUSH = "push" # agent pushed a branch
PR_OPEN = "pr_open" # agent opened a pull request
PR_MERGE = "pr_merge" # PR was merged
ERROR = "error" # agent hit an error / exception
RECOVERY = "recovery" # agent recovered from a failure
ABANDON = "abandon" # agent abandoned a task (timeout / giving up)
COLLABORATION = "collab" # two agents worked on the same thing
HEARTBEAT = "heartbeat" # agent reported a heartbeat (alive signal)
IDLE = "idle" # agent is waiting for work
MILESTONE = "milestone" # notable achievement (e.g. 100th issue closed)
@dataclass
class AgentEvent:
"""One discrete thing that happened in the fleet."""
kind: EventKind
agent: str # who did this (e.g. "claude", "mimo-v2-pro")
detail: str = "" # free-text description
timestamp: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"kind": self.kind.value,
"agent": self.agent,
"detail": self.detail,
"timestamp": self.timestamp,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict) -> "AgentEvent":
return cls(
kind=EventKind(data["kind"]),
agent=data["agent"],
detail=data.get("detail", ""),
timestamp=data.get("timestamp", time.time()),
metadata=data.get("metadata", {}),
)
# ---------------------------------------------------------------------------
# Narrative templates — maps event kinds to prose fragments
# ---------------------------------------------------------------------------
# Each entry is a list so we can rotate through variants.
_TEMPLATES: dict[EventKind, list[str]] = {
EventKind.DISPATCH: [
"{agent} stepped forward and claimed the work: {detail}.",
"{agent} took on the challenge — {detail}.",
"The task landed on {agent}'s desk: {detail}.",
],
EventKind.COMMIT: [
'{agent} sealed a commit into the record: "{detail}".',
'{agent} committed "{detail}" — progress crystallised.',
"{agent} carved a new ring into the trunk: {detail}.",
],
EventKind.PUSH: [
"{agent} pushed the work upstream.",
"The branch rose into the forge — {agent}'s changes were live.",
"{agent} sent their work into the wider current.",
],
EventKind.PR_OPEN: [
"{agent} opened a pull request: {detail}.",
"A proposal surfaced — {agent} asked the fleet to review {detail}.",
"{agent} laid their work before the reviewers: {detail}.",
],
EventKind.PR_MERGE: [
"{agent}'s branch folded into the whole: {detail}.",
"Consensus reached — {agent}'s changes were merged: {detail}.",
"{detail} joined the canon. {agent}'s contribution lives on.",
],
EventKind.ERROR: [
"{agent} ran into an obstacle: {detail}.",
"Trouble. {agent} encountered {detail} and had to pause.",
"The path grew difficult — {agent} hit {detail}.",
],
EventKind.RECOVERY: [
"{agent} regrouped and pressed on: {detail}.",
"After the setback, {agent} found a way through: {detail}.",
"{agent} recovered — {detail}.",
],
EventKind.ABANDON: [
"{agent} released the task, unable to finish: {detail}.",
"Sometimes wisdom is knowing when to let go. {agent} abandoned {detail}.",
"{agent} stepped back from {detail}. Another will carry it forward.",
],
EventKind.COLLABORATION: [
"{agent} and their peers converged on the same problem: {detail}.",
"Two minds touched the same work — {agent} in collaboration: {detail}.",
"The fleet coordinated — {agent} joined the effort on {detail}.",
],
EventKind.HEARTBEAT: [
"{agent} checked in — still thinking, still present.",
"A pulse from {agent}: the mind is alive.",
"{agent} breathed through another cycle.",
],
EventKind.IDLE: [
"{agent} rested, waiting for the next call.",
"Quiet descended — {agent} held still between tasks.",
"{agent} stood ready, watchful in the lull.",
],
EventKind.MILESTONE: [
"A moment worth noting — {agent}: {detail}.",
"The chronicle marks a milestone. {agent}: {detail}.",
"History ticked over — {agent} reached {detail}.",
],
}
# Arc-level commentary triggered by sequences of events
_ARC_TEMPLATES = {
"struggle_and_recovery": (
"There was a struggle here. {agent} hit trouble and came back stronger — "
"the kind of arc that gives a chronicle its texture."
),
"silent_grind": (
"No drama, just steady work. {agents} moved through the backlog with quiet persistence."
),
"abandon_then_retry": (
"{agent} let go once. But the work called again, and this time it was answered."
),
"solo_sprint": (
"{agent} ran the whole arc alone — dispatch to merge — without breaking stride."
),
"fleet_convergence": (
"The fleet converged. Multiple agents touched the same thread and wove it tighter."
),
}
# ---------------------------------------------------------------------------
# Chronicle writer
# ---------------------------------------------------------------------------
class ChronicleWriter:
"""Accumulates agent events and renders them as narrative prose.
The writer keeps a running log of events. Call ``ingest()`` to add new
events as they arrive, then ``render()`` to produce a prose snapshot of
the current arc.
Events are also persisted to JSONL so the chronicle survives restarts.
"""
def __init__(self, log_path: Optional[Path] = None):
today = time.strftime("%Y-%m-%d")
self.log_path = log_path or (
Path.home() / ".nexus" / "chronicle" / f"chronicle_{today}.jsonl"
)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._events: list[AgentEvent] = []
self._template_counters: dict[EventKind, int] = {}
# Load any events already on disk for today
self._load_existing()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ingest(self, event: AgentEvent) -> None:
"""Add an event to the chronicle and persist it to disk."""
self._events.append(event)
with open(self.log_path, "a") as f:
f.write(json.dumps(event.to_dict()) + "\n")
def render(self, max_events: int = 50) -> str:
"""Render the recent event stream as narrative prose.
Returns a multi-paragraph string suitable for display or logging.
"""
events = self._events[-max_events:]
if not events:
return "The chronicle is empty. No events have been recorded yet."
paragraphs: list[str] = []
# Opening line with timestamp range
first_ts = time.strftime("%H:%M", time.localtime(events[0].timestamp))
last_ts = time.strftime("%H:%M", time.localtime(events[-1].timestamp))
paragraphs.append(
f"The chronicle covers {len(events)} event(s) between {first_ts} and {last_ts}."
)
# Event-by-event prose
sentences: list[str] = []
for evt in events:
sentences.append(self._render_event(evt))
paragraphs.append(" ".join(sentences))
# Arc-level commentary
arc = self._detect_arc(events)
if arc:
paragraphs.append(arc)
return "\n\n".join(paragraphs)
def render_markdown(self, max_events: int = 50) -> str:
"""Render as a Markdown document."""
events = self._events[-max_events:]
if not events:
return "# Chronicle\n\n*No events recorded yet.*"
today = time.strftime("%Y-%m-%d")
lines = [f"# Chronicle — {today}", ""]
for evt in events:
ts = time.strftime("%H:%M:%S", time.localtime(evt.timestamp))
prose = self._render_event(evt)
lines.append(f"**{ts}** — {prose}")
arc = self._detect_arc(events)
if arc:
lines += ["", "---", "", f"*{arc}*"]
return "\n".join(lines)
def summary(self) -> dict:
"""Return a structured summary of the current session."""
agents: dict[str, dict] = {}
kind_counts: dict[str, int] = {}
for evt in self._events:
agents.setdefault(evt.agent, {"events": 0, "kinds": []})
agents[evt.agent]["events"] += 1
agents[evt.agent]["kinds"].append(evt.kind.value)
kind_counts[evt.kind.value] = kind_counts.get(evt.kind.value, 0) + 1
return {
"total_events": len(self._events),
"agents": agents,
"kind_counts": kind_counts,
"log_path": str(self.log_path),
}
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _render_event(self, evt: AgentEvent) -> str:
"""Turn a single event into a prose sentence."""
templates = _TEMPLATES.get(evt.kind, ["{agent}: {detail}"])
counter = self._template_counters.get(evt.kind, 0)
template = templates[counter % len(templates)]
self._template_counters[evt.kind] = counter + 1
return template.format(agent=evt.agent, detail=evt.detail or evt.kind.value)
def _detect_arc(self, events: list[AgentEvent]) -> Optional[str]:
"""Scan the event sequence for a recognisable dramatic arc."""
if not events:
return None
kinds = [e.kind for e in events]
agents = list({e.agent for e in events})
# struggle → recovery
if EventKind.ERROR in kinds and EventKind.RECOVERY in kinds:
err_idx = kinds.index(EventKind.ERROR)
rec_idx = kinds.index(EventKind.RECOVERY)
if rec_idx > err_idx:
agent = events[err_idx].agent
return _ARC_TEMPLATES["struggle_and_recovery"].format(agent=agent)
# abandon → dispatch (retry): find first ABANDON, then any DISPATCH after it
if EventKind.ABANDON in kinds and EventKind.DISPATCH in kinds:
ab_idx = kinds.index(EventKind.ABANDON)
retry_idx = next(
(i for i, k in enumerate(kinds) if k == EventKind.DISPATCH and i > ab_idx),
None,
)
if retry_idx is not None:
agent = events[retry_idx].agent
return _ARC_TEMPLATES["abandon_then_retry"].format(agent=agent)
# solo sprint: single agent goes dispatch→commit→pr_open→pr_merge
solo_arc = {EventKind.DISPATCH, EventKind.COMMIT, EventKind.PR_OPEN, EventKind.PR_MERGE}
if solo_arc.issubset(set(kinds)) and len(agents) == 1:
return _ARC_TEMPLATES["solo_sprint"].format(agent=agents[0])
# fleet convergence: multiple agents, collaboration event
if len(agents) > 1 and EventKind.COLLABORATION in kinds:
return _ARC_TEMPLATES["fleet_convergence"]
# silent grind: only commits / heartbeats, no drama
drama = {EventKind.ERROR, EventKind.ABANDON, EventKind.RECOVERY, EventKind.COLLABORATION}
if not drama.intersection(set(kinds)) and EventKind.COMMIT in kinds:
return _ARC_TEMPLATES["silent_grind"].format(agents=", ".join(agents))
return None
def _load_existing(self) -> None:
"""Load events persisted from earlier in the same session."""
if not self.log_path.exists():
return
with open(self.log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
self._events.append(AgentEvent.from_dict(json.loads(line)))
except (json.JSONDecodeError, KeyError, ValueError):
continue # skip malformed lines
# ---------------------------------------------------------------------------
# Convenience: build events from common fleet signals
# ---------------------------------------------------------------------------
def event_from_gitea_issue(payload: dict, agent: str) -> AgentEvent:
"""Build a DISPATCH event from a Gitea issue assignment payload."""
issue_num = payload.get("number", "?")
title = payload.get("title", "")
return AgentEvent(
kind=EventKind.DISPATCH,
agent=agent,
detail=f"issue #{issue_num}: {title}",
metadata={"issue_number": issue_num},
)
def event_from_heartbeat(hb: dict) -> AgentEvent:
"""Build a HEARTBEAT event from a nexus heartbeat dict."""
agent = hb.get("model", "unknown")
status = hb.get("status", "thinking")
cycle = hb.get("cycle", 0)
return AgentEvent(
kind=EventKind.HEARTBEAT,
agent=agent,
detail=f"cycle {cycle}, status={status}",
metadata=hb,
)
def event_from_commit(commit: dict, agent: str) -> AgentEvent:
"""Build a COMMIT event from a git commit dict."""
message = commit.get("message", "").split("\n")[0] # subject line only
sha = commit.get("sha", "")[:8]
return AgentEvent(
kind=EventKind.COMMIT,
agent=agent,
detail=message,
metadata={"sha": sha},
)

211
tests/test_chronicle.py Normal file
View File

@@ -0,0 +1,211 @@
"""
Tests for nexus.chronicle — emergent narrative from agent interactions.
"""
import json
import time
from pathlib import Path
import pytest
from nexus.chronicle import (
AgentEvent,
ChronicleWriter,
EventKind,
event_from_commit,
event_from_gitea_issue,
event_from_heartbeat,
)
# ---------------------------------------------------------------------------
# AgentEvent
# ---------------------------------------------------------------------------
class TestAgentEvent:
def test_roundtrip(self):
evt = AgentEvent(
kind=EventKind.DISPATCH,
agent="claude",
detail="took issue #42",
)
assert AgentEvent.from_dict(evt.to_dict()).kind == EventKind.DISPATCH
assert AgentEvent.from_dict(evt.to_dict()).agent == "claude"
assert AgentEvent.from_dict(evt.to_dict()).detail == "took issue #42"
def test_default_timestamp_is_recent(self):
before = time.time()
evt = AgentEvent(kind=EventKind.IDLE, agent="mimo")
after = time.time()
assert before <= evt.timestamp <= after
def test_all_event_kinds_are_valid_strings(self):
for kind in EventKind:
evt = AgentEvent(kind=kind, agent="test-agent")
d = evt.to_dict()
assert d["kind"] == kind.value
restored = AgentEvent.from_dict(d)
assert restored.kind == kind
# ---------------------------------------------------------------------------
# ChronicleWriter — basic ingestion and render
# ---------------------------------------------------------------------------
class TestChronicleWriter:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def test_empty_render(self, writer):
text = writer.render()
assert "empty" in text.lower()
def test_single_event_render(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="issue #1"))
text = writer.render()
assert "claude" in text
assert "issue #1" in text
def test_render_covers_timestamps(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="a", detail="start"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="a", detail="done"))
text = writer.render()
assert "chronicle covers" in text.lower()
def test_events_persisted_to_disk(self, writer, tmp_path):
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: x"))
lines = (tmp_path / "chronicle.jsonl").read_text().strip().splitlines()
assert len(lines) == 1
data = json.loads(lines[0])
assert data["kind"] == "commit"
assert data["agent"] == "claude"
def test_load_existing_on_init(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
evt = AgentEvent(kind=EventKind.PUSH, agent="mimo", detail="pushed branch")
log.write_text(json.dumps(evt.to_dict()) + "\n")
writer2 = ChronicleWriter(log_path=log)
assert len(writer2._events) == 1
assert writer2._events[0].kind == EventKind.PUSH
def test_malformed_lines_are_skipped(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
log.write_text("not-json\n{}\n")
# Should not raise
writer2 = ChronicleWriter(log_path=log)
assert writer2._events == []
def test_template_rotation(self, writer):
"""Consecutive events of the same kind use different templates."""
sentences = set()
for _ in range(3):
writer.ingest(AgentEvent(kind=EventKind.HEARTBEAT, agent="claude"))
text = writer.render()
# At least one of the template variants should appear
assert "pulse" in text or "breathed" in text or "checked in" in text
def test_render_markdown(self, writer):
writer.ingest(AgentEvent(kind=EventKind.PR_OPEN, agent="claude", detail="PR #99"))
md = writer.render_markdown()
assert md.startswith("# Chronicle")
assert "PR #99" in md
def test_summary(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="y"))
s = writer.summary()
assert s["total_events"] == 2
assert "claude" in s["agents"]
assert s["kind_counts"]["dispatch"] == 1
assert s["kind_counts"]["commit"] == 1
def test_max_events_limit(self, writer):
for i in range(10):
writer.ingest(AgentEvent(kind=EventKind.IDLE, agent="a", detail=str(i)))
text = writer.render(max_events=3)
# Only 3 events should appear in prose — check coverage header
assert "3 event(s)" in text
# ---------------------------------------------------------------------------
# Arc detection
# ---------------------------------------------------------------------------
class TestArcDetection:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def _ingest(self, writer, *kinds, agent="claude"):
for k in kinds:
writer.ingest(AgentEvent(kind=k, agent=agent, detail="x"))
def test_struggle_and_recovery_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ERROR, EventKind.RECOVERY)
text = writer.render()
assert "struggle" in text.lower() or "trouble" in text.lower()
def test_no_arc_when_no_pattern(self, writer):
self._ingest(writer, EventKind.IDLE)
text = writer.render()
# Should not include arc language (only 1 event, no pattern)
assert "converged" not in text
assert "struggle" not in text
def test_solo_sprint_arc(self, writer):
self._ingest(
writer,
EventKind.DISPATCH,
EventKind.COMMIT,
EventKind.PR_OPEN,
EventKind.PR_MERGE,
)
text = writer.render()
assert "solo" in text.lower() or "alone" in text.lower()
def test_fleet_convergence_arc(self, writer, tmp_path):
writer2 = ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
writer2.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COLLABORATION, agent="mimo", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="x"))
text = writer2.render()
assert "converged" in text.lower() or "fleet" in text.lower()
def test_silent_grind_arc(self, writer):
self._ingest(writer, EventKind.COMMIT, EventKind.COMMIT, EventKind.COMMIT)
text = writer.render()
assert "steady" in text.lower() or "quiet" in text.lower() or "grind" in text.lower()
def test_abandon_then_retry_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ABANDON, EventKind.DISPATCH)
text = writer.render()
assert "let go" in text.lower() or "abandon" in text.lower() or "called again" in text.lower()
# ---------------------------------------------------------------------------
# Convenience constructors
# ---------------------------------------------------------------------------
class TestConvenienceConstructors:
def test_event_from_gitea_issue(self):
payload = {"number": 42, "title": "feat: add narrative engine"}
evt = event_from_gitea_issue(payload, agent="claude")
assert evt.kind == EventKind.DISPATCH
assert "42" in evt.detail
assert evt.agent == "claude"
def test_event_from_heartbeat(self):
hb = {"model": "claude-sonnet", "status": "thinking", "cycle": 7}
evt = event_from_heartbeat(hb)
assert evt.kind == EventKind.HEARTBEAT
assert evt.agent == "claude-sonnet"
assert "7" in evt.detail
def test_event_from_commit(self):
commit = {"message": "feat: chronicle\n\nFixes #1607", "sha": "abc1234567"}
evt = event_from_commit(commit, agent="claude")
assert evt.kind == EventKind.COMMIT
assert evt.detail == "feat: chronicle" # subject line only
assert evt.metadata["sha"] == "abc12345"