Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
90641aa56e feat: prevent rubber-stamping of PRs with no changes (#1445)
Some checks failed
CI / test (pull_request) Failing after 54s
CI / validate (pull_request) Failing after 52s
Review Approval Gate / verify-review (pull_request) Failing after 7s
scripts/review_gate.py — enhanced review gate with 3 checks:
  1. Empty PR detection (zombie PR with 0 changes)
  2. Approval requirement (at least 1 APPROVED review)
  3. Rubber-stamping detection (flag trivial PRs with
     empty/short review comments)

Fixes: typo in GITEA_TOKEN env var (was os.env...EN)
Added: check_empty_pr(), check_rubber_stamp()
Improved: structured error/warning output with emojis

tests/test_review_gate.py: 15 tests
  check_empty_pr: valid, empty, additions-only, deletions-only
  check_approvals: single, multiple, none, empty
  check_rubber_stamp: substantive, trivial+empty review,
    trivial+short review, trivial+substantive review,
    no-approvals skip, large PR with empty review ok
2026-04-15 21:29:16 -04:00
12 changed files with 222 additions and 875 deletions

View File

@@ -285,49 +285,6 @@ class AgentMemory:
logger.warning(f"Failed to store memory: {e}")
return None
def remember_alexander_request_response(
self,
*,
request_text: str,
response_text: str,
requester: str = "Alexander Whitestone",
source: str = "",
metadata: Optional[dict] = None,
) -> Optional[str]:
"""Store an Alexander request + wizard response artifact in the sovereign room."""
if not self._check_available():
logger.warning("Cannot store Alexander artifact — MemPalace unavailable")
return None
try:
from nexus.mempalace.searcher import add_memory
from nexus.mempalace.conversation_artifacts import build_request_response_artifact
artifact = build_request_response_artifact(
requester=requester,
responder=self.agent_name,
request_text=request_text,
response_text=response_text,
source=source,
)
extra_metadata = dict(artifact.metadata)
if metadata:
extra_metadata.update(metadata)
doc_id = add_memory(
text=artifact.text,
room=artifact.room,
wing=self.wing,
palace_path=self.palace_path,
source_file=source,
extra_metadata=extra_metadata,
)
logger.debug("Stored Alexander request/response artifact in sovereign room")
return doc_id
except Exception as e:
logger.warning(f"Failed to store Alexander artifact: {e}")
return None
def write_diary(
self,
summary: Optional[str] = None,

View File

@@ -62,15 +62,6 @@ core_rooms:
- proof-of-concept code snippets
- benchmark data
- key: sovereign
label: Sovereign
purpose: Artifacts of Alexander Whitestone's requests, directives, and wizard responses
examples:
- dated request/response artifacts
- conversation summaries with speaker tags
- directive ledgers
- response follow-through notes
optional_rooms:
- key: evennia
label: Evennia
@@ -107,6 +98,15 @@ optional_rooms:
purpose: Catch-all for artefacts not yet assigned to a named room
wizards: ["*"]
- key: sovereign
label: Sovereign
purpose: Artifacts of Alexander Whitestone's requests, directives, and conversation history
wizards: ["*"]
conventions:
naming: "YYYY-MM-DD_HHMMSS_<topic>.md"
index: "INDEX.md"
description: "Each artifact is a dated record of a request from Alexander and the wizard's response. The running INDEX.md provides a chronological catalog."
# Tunnel routing table
# Defines which room pairs are connected across wizard wings.
# A tunnel lets `recall <query> --fleet` search both wings at once.

View File

@@ -14,7 +14,6 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
try:
from nexus.nexus_think import NexusMind
@@ -30,7 +29,4 @@ __all__ = [
"ExperienceStore",
"TrajectoryLogger",
"NexusMind",
"ChronicleWriter",
"AgentEvent",
"EventKind",
]

View File

@@ -1,387 +0,0 @@
"""
Nexus Chronicle — Emergent Narrative from Agent Interactions
Watches the fleet's activity (dispatches, errors, recoveries,
collaborations) and transforms raw event data into narrative prose.
The system finds the dramatic arc in real work and produces a living
chronicle. The story writes itself from the data.
Usage:
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
writer = ChronicleWriter()
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="took issue #42"))
writer.ingest(AgentEvent(kind=EventKind.ERROR, agent="claude", detail="rate limit hit"))
writer.ingest(AgentEvent(kind=EventKind.RECOVERY, agent="claude", detail="retried after backoff"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: add narrative engine"))
prose = writer.render()
print(prose)
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Event model
# ---------------------------------------------------------------------------
class EventKind(str, Enum):
"""The kinds of agent events the chronicle recognises."""
DISPATCH = "dispatch" # agent claimed / was assigned work
COMMIT = "commit" # agent produced a commit
PUSH = "push" # agent pushed a branch
PR_OPEN = "pr_open" # agent opened a pull request
PR_MERGE = "pr_merge" # PR was merged
ERROR = "error" # agent hit an error / exception
RECOVERY = "recovery" # agent recovered from a failure
ABANDON = "abandon" # agent abandoned a task (timeout / giving up)
COLLABORATION = "collab" # two agents worked on the same thing
HEARTBEAT = "heartbeat" # agent reported a heartbeat (alive signal)
IDLE = "idle" # agent is waiting for work
MILESTONE = "milestone" # notable achievement (e.g. 100th issue closed)
@dataclass
class AgentEvent:
"""One discrete thing that happened in the fleet."""
kind: EventKind
agent: str # who did this (e.g. "claude", "mimo-v2-pro")
detail: str = "" # free-text description
timestamp: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"kind": self.kind.value,
"agent": self.agent,
"detail": self.detail,
"timestamp": self.timestamp,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict) -> "AgentEvent":
return cls(
kind=EventKind(data["kind"]),
agent=data["agent"],
detail=data.get("detail", ""),
timestamp=data.get("timestamp", time.time()),
metadata=data.get("metadata", {}),
)
# ---------------------------------------------------------------------------
# Narrative templates — maps event kinds to prose fragments
# ---------------------------------------------------------------------------
# Each entry is a list so we can rotate through variants.
_TEMPLATES: dict[EventKind, list[str]] = {
EventKind.DISPATCH: [
"{agent} stepped forward and claimed the work: {detail}.",
"{agent} took on the challenge — {detail}.",
"The task landed on {agent}'s desk: {detail}.",
],
EventKind.COMMIT: [
'{agent} sealed a commit into the record: "{detail}".',
'{agent} committed "{detail}" — progress crystallised.',
"{agent} carved a new ring into the trunk: {detail}.",
],
EventKind.PUSH: [
"{agent} pushed the work upstream.",
"The branch rose into the forge — {agent}'s changes were live.",
"{agent} sent their work into the wider current.",
],
EventKind.PR_OPEN: [
"{agent} opened a pull request: {detail}.",
"A proposal surfaced — {agent} asked the fleet to review {detail}.",
"{agent} laid their work before the reviewers: {detail}.",
],
EventKind.PR_MERGE: [
"{agent}'s branch folded into the whole: {detail}.",
"Consensus reached — {agent}'s changes were merged: {detail}.",
"{detail} joined the canon. {agent}'s contribution lives on.",
],
EventKind.ERROR: [
"{agent} ran into an obstacle: {detail}.",
"Trouble. {agent} encountered {detail} and had to pause.",
"The path grew difficult — {agent} hit {detail}.",
],
EventKind.RECOVERY: [
"{agent} regrouped and pressed on: {detail}.",
"After the setback, {agent} found a way through: {detail}.",
"{agent} recovered — {detail}.",
],
EventKind.ABANDON: [
"{agent} released the task, unable to finish: {detail}.",
"Sometimes wisdom is knowing when to let go. {agent} abandoned {detail}.",
"{agent} stepped back from {detail}. Another will carry it forward.",
],
EventKind.COLLABORATION: [
"{agent} and their peers converged on the same problem: {detail}.",
"Two minds touched the same work — {agent} in collaboration: {detail}.",
"The fleet coordinated — {agent} joined the effort on {detail}.",
],
EventKind.HEARTBEAT: [
"{agent} checked in — still thinking, still present.",
"A pulse from {agent}: the mind is alive.",
"{agent} breathed through another cycle.",
],
EventKind.IDLE: [
"{agent} rested, waiting for the next call.",
"Quiet descended — {agent} held still between tasks.",
"{agent} stood ready, watchful in the lull.",
],
EventKind.MILESTONE: [
"A moment worth noting — {agent}: {detail}.",
"The chronicle marks a milestone. {agent}: {detail}.",
"History ticked over — {agent} reached {detail}.",
],
}
# Arc-level commentary triggered by sequences of events
_ARC_TEMPLATES = {
"struggle_and_recovery": (
"There was a struggle here. {agent} hit trouble and came back stronger — "
"the kind of arc that gives a chronicle its texture."
),
"silent_grind": (
"No drama, just steady work. {agents} moved through the backlog with quiet persistence."
),
"abandon_then_retry": (
"{agent} let go once. But the work called again, and this time it was answered."
),
"solo_sprint": (
"{agent} ran the whole arc alone — dispatch to merge — without breaking stride."
),
"fleet_convergence": (
"The fleet converged. Multiple agents touched the same thread and wove it tighter."
),
}
# ---------------------------------------------------------------------------
# Chronicle writer
# ---------------------------------------------------------------------------
class ChronicleWriter:
"""Accumulates agent events and renders them as narrative prose.
The writer keeps a running log of events. Call ``ingest()`` to add new
events as they arrive, then ``render()`` to produce a prose snapshot of
the current arc.
Events are also persisted to JSONL so the chronicle survives restarts.
"""
def __init__(self, log_path: Optional[Path] = None):
today = time.strftime("%Y-%m-%d")
self.log_path = log_path or (
Path.home() / ".nexus" / "chronicle" / f"chronicle_{today}.jsonl"
)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._events: list[AgentEvent] = []
self._template_counters: dict[EventKind, int] = {}
# Load any events already on disk for today
self._load_existing()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ingest(self, event: AgentEvent) -> None:
"""Add an event to the chronicle and persist it to disk."""
self._events.append(event)
with open(self.log_path, "a") as f:
f.write(json.dumps(event.to_dict()) + "\n")
def render(self, max_events: int = 50) -> str:
"""Render the recent event stream as narrative prose.
Returns a multi-paragraph string suitable for display or logging.
"""
events = self._events[-max_events:]
if not events:
return "The chronicle is empty. No events have been recorded yet."
paragraphs: list[str] = []
# Opening line with timestamp range
first_ts = time.strftime("%H:%M", time.localtime(events[0].timestamp))
last_ts = time.strftime("%H:%M", time.localtime(events[-1].timestamp))
paragraphs.append(
f"The chronicle covers {len(events)} event(s) between {first_ts} and {last_ts}."
)
# Event-by-event prose
sentences: list[str] = []
for evt in events:
sentences.append(self._render_event(evt))
paragraphs.append(" ".join(sentences))
# Arc-level commentary
arc = self._detect_arc(events)
if arc:
paragraphs.append(arc)
return "\n\n".join(paragraphs)
def render_markdown(self, max_events: int = 50) -> str:
"""Render as a Markdown document."""
events = self._events[-max_events:]
if not events:
return "# Chronicle\n\n*No events recorded yet.*"
today = time.strftime("%Y-%m-%d")
lines = [f"# Chronicle — {today}", ""]
for evt in events:
ts = time.strftime("%H:%M:%S", time.localtime(evt.timestamp))
prose = self._render_event(evt)
lines.append(f"**{ts}** — {prose}")
arc = self._detect_arc(events)
if arc:
lines += ["", "---", "", f"*{arc}*"]
return "\n".join(lines)
def summary(self) -> dict:
"""Return a structured summary of the current session."""
agents: dict[str, dict] = {}
kind_counts: dict[str, int] = {}
for evt in self._events:
agents.setdefault(evt.agent, {"events": 0, "kinds": []})
agents[evt.agent]["events"] += 1
agents[evt.agent]["kinds"].append(evt.kind.value)
kind_counts[evt.kind.value] = kind_counts.get(evt.kind.value, 0) + 1
return {
"total_events": len(self._events),
"agents": agents,
"kind_counts": kind_counts,
"log_path": str(self.log_path),
}
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _render_event(self, evt: AgentEvent) -> str:
"""Turn a single event into a prose sentence."""
templates = _TEMPLATES.get(evt.kind, ["{agent}: {detail}"])
counter = self._template_counters.get(evt.kind, 0)
template = templates[counter % len(templates)]
self._template_counters[evt.kind] = counter + 1
return template.format(agent=evt.agent, detail=evt.detail or evt.kind.value)
def _detect_arc(self, events: list[AgentEvent]) -> Optional[str]:
"""Scan the event sequence for a recognisable dramatic arc."""
if not events:
return None
kinds = [e.kind for e in events]
agents = list({e.agent for e in events})
# struggle → recovery
if EventKind.ERROR in kinds and EventKind.RECOVERY in kinds:
err_idx = kinds.index(EventKind.ERROR)
rec_idx = kinds.index(EventKind.RECOVERY)
if rec_idx > err_idx:
agent = events[err_idx].agent
return _ARC_TEMPLATES["struggle_and_recovery"].format(agent=agent)
# abandon → dispatch (retry): find first ABANDON, then any DISPATCH after it
if EventKind.ABANDON in kinds and EventKind.DISPATCH in kinds:
ab_idx = kinds.index(EventKind.ABANDON)
retry_idx = next(
(i for i, k in enumerate(kinds) if k == EventKind.DISPATCH and i > ab_idx),
None,
)
if retry_idx is not None:
agent = events[retry_idx].agent
return _ARC_TEMPLATES["abandon_then_retry"].format(agent=agent)
# solo sprint: single agent goes dispatch→commit→pr_open→pr_merge
solo_arc = {EventKind.DISPATCH, EventKind.COMMIT, EventKind.PR_OPEN, EventKind.PR_MERGE}
if solo_arc.issubset(set(kinds)) and len(agents) == 1:
return _ARC_TEMPLATES["solo_sprint"].format(agent=agents[0])
# fleet convergence: multiple agents, collaboration event
if len(agents) > 1 and EventKind.COLLABORATION in kinds:
return _ARC_TEMPLATES["fleet_convergence"]
# silent grind: only commits / heartbeats, no drama
drama = {EventKind.ERROR, EventKind.ABANDON, EventKind.RECOVERY, EventKind.COLLABORATION}
if not drama.intersection(set(kinds)) and EventKind.COMMIT in kinds:
return _ARC_TEMPLATES["silent_grind"].format(agents=", ".join(agents))
return None
def _load_existing(self) -> None:
"""Load events persisted from earlier in the same session."""
if not self.log_path.exists():
return
with open(self.log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
self._events.append(AgentEvent.from_dict(json.loads(line)))
except (json.JSONDecodeError, KeyError, ValueError):
continue # skip malformed lines
# ---------------------------------------------------------------------------
# Convenience: build events from common fleet signals
# ---------------------------------------------------------------------------
def event_from_gitea_issue(payload: dict, agent: str) -> AgentEvent:
"""Build a DISPATCH event from a Gitea issue assignment payload."""
issue_num = payload.get("number", "?")
title = payload.get("title", "")
return AgentEvent(
kind=EventKind.DISPATCH,
agent=agent,
detail=f"issue #{issue_num}: {title}",
metadata={"issue_number": issue_num},
)
def event_from_heartbeat(hb: dict) -> AgentEvent:
"""Build a HEARTBEAT event from a nexus heartbeat dict."""
agent = hb.get("model", "unknown")
status = hb.get("status", "thinking")
cycle = hb.get("cycle", 0)
return AgentEvent(
kind=EventKind.HEARTBEAT,
agent=agent,
detail=f"cycle {cycle}, status={status}",
metadata=hb,
)
def event_from_commit(commit: dict, agent: str) -> AgentEvent:
"""Build a COMMIT event from a git commit dict."""
message = commit.get("message", "").split("\n")[0] # subject line only
sha = commit.get("sha", "")[:8]
return AgentEvent(
kind=EventKind.COMMIT,
agent=agent,
detail=message,
metadata={"sha": sha},
)

View File

@@ -13,12 +13,6 @@ from __future__ import annotations
from nexus.mempalace.config import MEMPALACE_PATH, FLEET_WING
from nexus.mempalace.searcher import search_memories, add_memory, MemPalaceResult
from nexus.mempalace.conversation_artifacts import (
ConversationArtifact,
build_request_response_artifact,
extract_alexander_request_pairs,
normalize_speaker,
)
__all__ = [
"MEMPALACE_PATH",
@@ -26,8 +20,4 @@ __all__ = [
"search_memories",
"add_memory",
"MemPalaceResult",
"ConversationArtifact",
"build_request_response_artifact",
"extract_alexander_request_pairs",
"normalize_speaker",
]

View File

@@ -40,7 +40,6 @@ CORE_ROOMS: list[str] = [
"nexus", # reports, docs, KT
"issues", # tickets, backlog
"experiments", # prototypes, spikes
"sovereign", # Alexander request/response artifacts
]
# ── ChromaDB collection name ──────────────────────────────────────────────────

View File

@@ -1,122 +0,0 @@
"""Helpers for preserving Alexander request/response artifacts in MemPalace.
This module provides a small, typed bridge between raw conversation turns and
MemPalace drawers stored in the shared `sovereign` room. The goal is not to
solve all future speaker-tagging needs at once; it gives the Nexus one
canonical artifact shape that other miners and bridges can reuse.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Iterable
_ALEXANDER_ALIASES = {
"alexander",
"alexander whitestone",
"rockachopa",
"triptimmy",
}
@dataclass(frozen=True)
class ConversationArtifact:
requester: str
responder: str
request_text: str
response_text: str
room: str = "sovereign"
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))
metadata: dict = field(default_factory=dict)
@property
def text(self) -> str:
return (
f"# Conversation Artifact\n\n"
f"## Alexander Request\n{self.request_text.strip()}\n\n"
f"## Wizard Response\n{self.response_text.strip()}\n"
)
def normalize_speaker(name: str | None) -> str:
cleaned = " ".join((name or "").strip().lower().split())
if cleaned in _ALEXANDER_ALIASES:
return "alexander"
return cleaned.replace(" ", "_") or "unknown"
def build_request_response_artifact(
*,
requester: str,
responder: str,
request_text: str,
response_text: str,
source: str = "",
timestamp: str | None = None,
request_timestamp: str | None = None,
response_timestamp: str | None = None,
) -> ConversationArtifact:
requester_slug = normalize_speaker(requester)
responder_slug = normalize_speaker(responder)
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata = {
"artifact_type": "alexander_request_response",
"requester": requester_slug,
"responder": responder_slug,
"speaker_tags": [f"speaker:{requester_slug}", f"speaker:{responder_slug}"],
"source": source,
"timestamp": ts,
}
if request_timestamp:
metadata["request_timestamp"] = request_timestamp
if response_timestamp:
metadata["response_timestamp"] = response_timestamp
return ConversationArtifact(
requester=requester_slug,
responder=responder_slug,
request_text=request_text,
response_text=response_text,
timestamp=ts,
metadata=metadata,
)
def extract_alexander_request_pairs(
turns: Iterable[dict],
*,
responder: str,
source: str = "",
) -> list[ConversationArtifact]:
responder_slug = normalize_speaker(responder)
pending_request: dict | None = None
artifacts: list[ConversationArtifact] = []
for turn in turns:
speaker = normalize_speaker(
turn.get("speaker") or turn.get("username") or turn.get("author") or turn.get("name")
)
text = (turn.get("text") or turn.get("content") or "").strip()
if not text:
continue
if speaker == "alexander":
pending_request = turn
continue
if speaker == responder_slug and pending_request is not None:
artifacts.append(
build_request_response_artifact(
requester="alexander",
responder=responder_slug,
request_text=(pending_request.get("text") or pending_request.get("content") or "").strip(),
response_text=text,
source=source,
request_timestamp=pending_request.get("timestamp"),
response_timestamp=turn.get("timestamp"),
timestamp=turn.get("timestamp") or pending_request.get("timestamp"),
)
)
pending_request = None
return artifacts

View File

@@ -1,7 +1,10 @@
#!/usr/bin/env python3
"""
Review Gate — Poka-yoke for unreviewed merges.
Fails if the current PR has fewer than 1 approving review.
Review Gate — Poka-yoke for unreviewed merges and zombie PRs.
Checks:
1. PR has at least 1 approving review (no rubber-stamping without approval)
2. PR has actual changes (no zombie PRs with 0 additions/deletions)
Usage in Gitea workflow:
- name: Review Approval Gate
@@ -13,7 +16,6 @@ Usage in Gitea workflow:
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
@@ -33,7 +35,68 @@ def api_call(method, path):
return {"error": e.read().decode(), "status": e.code}
def check_empty_pr(pr_data):
"""Check if PR has no actual changes (zombie PR)."""
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
changed_files = pr_data.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return False, (
f"ZOMBIE PR: PR has 0 additions, 0 deletions, 0 changed files. "
f"This appears to be an empty PR with no actual changes."
)
return True, None
def check_approvals(reviews):
"""Check if PR has at least one approving review."""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
return True, len(approvals)
return False, 0
def check_rubber_stamp(pr_data, reviews):
"""
Check for rubber-stamping: approving reviews on PRs with trivial changes.
Rubber-stamping indicators:
- Approving reviews exist
- PR has very few changes (< 5 lines total)
- Review comments are empty or generic
"""
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if not approvals:
return True, None # No approvals to check
additions = pr_data.get("additions", 0)
deletions = pr_data.get("deletions", 0)
total_changes = additions + deletions
# Flag if approving a PR with fewer than 5 total changes
if total_changes < 5 and len(approvals) > 0:
# Check if review bodies are substantive
empty_reviews = [
r for r in approvals
if not r.get("body") or len(r.get("body", "").strip()) < 10
]
if empty_reviews:
return False, (
f"SUSPICIOUS: PR has only {total_changes} total changes "
f"but {len(approvals)} approving review(s), "
f"{len(empty_reviews)} with empty/minimal comments. "
f"This may indicate rubber-stamping."
)
return True, None
def main():
errors = []
warnings = []
# Validate environment
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
@@ -44,27 +107,57 @@ def main():
pr_number = PR_NUMBER
if not pr_number:
# Try to infer from Gitea Actions environment
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
if not pr_number:
print("ERROR: Could not determine PR number")
sys.exit(1)
# Fetch PR data
pr_data = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}")
if isinstance(pr_data, dict) and "error" in pr_data:
print(f"ERROR fetching PR: {pr_data}")
sys.exit(1)
# Fetch reviews
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
if isinstance(reviews, dict) and "error" in reviews:
print(f"ERROR fetching reviews: {reviews}")
sys.exit(1)
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
print("Merges are not permitted without at least one approval.")
# ── Check 1: Empty PR (zombie PR) ───────────────────────
has_changes, empty_msg = check_empty_pr(pr_data)
if not has_changes:
errors.append(empty_msg)
# ── Check 2: Has approvals ──────────────────────────────
has_approval, approval_count = check_approvals(reviews)
if not has_approval:
errors.append(
f"PR #{pr_number} has no approving reviews. "
f"Merges require at least one approval."
)
# ── Check 3: Rubber-stamping detection ──────────────────
clean, rubber_msg = check_rubber_stamp(pr_data, reviews)
if not clean:
warnings.append(rubber_msg)
# ── Report ──────────────────────────────────────────────
if warnings:
for w in warnings:
print(f"⚠️ WARNING: {w}")
if errors:
for e in errors:
print(f"❌ BLOCKED: {e}")
sys.exit(1)
print(f"✅ OK: PR #{pr_number} has {approval_count} approval(s) "
f"and {pr_data.get('additions', 0)} additions / "
f"{pr_data.get('deletions', 0)} deletions.")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -20,7 +20,6 @@ from agent.memory import (
SessionTranscript,
create_agent_memory,
)
from nexus.mempalace.conversation_artifacts import ConversationArtifact
from agent.memory_hooks import MemoryHooks
@@ -185,24 +184,6 @@ class TestAgentMemory:
doc_id = mem.write_diary()
assert doc_id is None # MemPalace unavailable
def test_remember_alexander_request_response_uses_sovereign_room(self):
mem = AgentMemory(agent_name="allegro")
mem._available = True
with patch("nexus.mempalace.searcher.add_memory", return_value="doc-123") as add_memory:
doc_id = mem.remember_alexander_request_response(
request_text="Catalog my requests.",
response_text="I will preserve them as artifacts.",
requester="Alexander Whitestone",
source="telegram:timmy-time",
)
assert doc_id == "doc-123"
kwargs = add_memory.call_args.kwargs
assert kwargs["room"] == "sovereign"
assert kwargs["wing"] == mem.wing
assert kwargs["extra_metadata"]["artifact_type"] == "alexander_request_response"
assert kwargs["extra_metadata"]["speaker_tags"] == ["speaker:alexander", "speaker:allegro"]
# ---------------------------------------------------------------------------
# MemoryHooks tests

View File

@@ -1,211 +0,0 @@
"""
Tests for nexus.chronicle — emergent narrative from agent interactions.
"""
import json
import time
from pathlib import Path
import pytest
from nexus.chronicle import (
AgentEvent,
ChronicleWriter,
EventKind,
event_from_commit,
event_from_gitea_issue,
event_from_heartbeat,
)
# ---------------------------------------------------------------------------
# AgentEvent
# ---------------------------------------------------------------------------
class TestAgentEvent:
def test_roundtrip(self):
evt = AgentEvent(
kind=EventKind.DISPATCH,
agent="claude",
detail="took issue #42",
)
assert AgentEvent.from_dict(evt.to_dict()).kind == EventKind.DISPATCH
assert AgentEvent.from_dict(evt.to_dict()).agent == "claude"
assert AgentEvent.from_dict(evt.to_dict()).detail == "took issue #42"
def test_default_timestamp_is_recent(self):
before = time.time()
evt = AgentEvent(kind=EventKind.IDLE, agent="mimo")
after = time.time()
assert before <= evt.timestamp <= after
def test_all_event_kinds_are_valid_strings(self):
for kind in EventKind:
evt = AgentEvent(kind=kind, agent="test-agent")
d = evt.to_dict()
assert d["kind"] == kind.value
restored = AgentEvent.from_dict(d)
assert restored.kind == kind
# ---------------------------------------------------------------------------
# ChronicleWriter — basic ingestion and render
# ---------------------------------------------------------------------------
class TestChronicleWriter:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def test_empty_render(self, writer):
text = writer.render()
assert "empty" in text.lower()
def test_single_event_render(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="issue #1"))
text = writer.render()
assert "claude" in text
assert "issue #1" in text
def test_render_covers_timestamps(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="a", detail="start"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="a", detail="done"))
text = writer.render()
assert "chronicle covers" in text.lower()
def test_events_persisted_to_disk(self, writer, tmp_path):
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: x"))
lines = (tmp_path / "chronicle.jsonl").read_text().strip().splitlines()
assert len(lines) == 1
data = json.loads(lines[0])
assert data["kind"] == "commit"
assert data["agent"] == "claude"
def test_load_existing_on_init(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
evt = AgentEvent(kind=EventKind.PUSH, agent="mimo", detail="pushed branch")
log.write_text(json.dumps(evt.to_dict()) + "\n")
writer2 = ChronicleWriter(log_path=log)
assert len(writer2._events) == 1
assert writer2._events[0].kind == EventKind.PUSH
def test_malformed_lines_are_skipped(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
log.write_text("not-json\n{}\n")
# Should not raise
writer2 = ChronicleWriter(log_path=log)
assert writer2._events == []
def test_template_rotation(self, writer):
"""Consecutive events of the same kind use different templates."""
sentences = set()
for _ in range(3):
writer.ingest(AgentEvent(kind=EventKind.HEARTBEAT, agent="claude"))
text = writer.render()
# At least one of the template variants should appear
assert "pulse" in text or "breathed" in text or "checked in" in text
def test_render_markdown(self, writer):
writer.ingest(AgentEvent(kind=EventKind.PR_OPEN, agent="claude", detail="PR #99"))
md = writer.render_markdown()
assert md.startswith("# Chronicle")
assert "PR #99" in md
def test_summary(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="y"))
s = writer.summary()
assert s["total_events"] == 2
assert "claude" in s["agents"]
assert s["kind_counts"]["dispatch"] == 1
assert s["kind_counts"]["commit"] == 1
def test_max_events_limit(self, writer):
for i in range(10):
writer.ingest(AgentEvent(kind=EventKind.IDLE, agent="a", detail=str(i)))
text = writer.render(max_events=3)
# Only 3 events should appear in prose — check coverage header
assert "3 event(s)" in text
# ---------------------------------------------------------------------------
# Arc detection
# ---------------------------------------------------------------------------
class TestArcDetection:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def _ingest(self, writer, *kinds, agent="claude"):
for k in kinds:
writer.ingest(AgentEvent(kind=k, agent=agent, detail="x"))
def test_struggle_and_recovery_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ERROR, EventKind.RECOVERY)
text = writer.render()
assert "struggle" in text.lower() or "trouble" in text.lower()
def test_no_arc_when_no_pattern(self, writer):
self._ingest(writer, EventKind.IDLE)
text = writer.render()
# Should not include arc language (only 1 event, no pattern)
assert "converged" not in text
assert "struggle" not in text
def test_solo_sprint_arc(self, writer):
self._ingest(
writer,
EventKind.DISPATCH,
EventKind.COMMIT,
EventKind.PR_OPEN,
EventKind.PR_MERGE,
)
text = writer.render()
assert "solo" in text.lower() or "alone" in text.lower()
def test_fleet_convergence_arc(self, writer, tmp_path):
writer2 = ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
writer2.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COLLABORATION, agent="mimo", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="x"))
text = writer2.render()
assert "converged" in text.lower() or "fleet" in text.lower()
def test_silent_grind_arc(self, writer):
self._ingest(writer, EventKind.COMMIT, EventKind.COMMIT, EventKind.COMMIT)
text = writer.render()
assert "steady" in text.lower() or "quiet" in text.lower() or "grind" in text.lower()
def test_abandon_then_retry_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ABANDON, EventKind.DISPATCH)
text = writer.render()
assert "let go" in text.lower() or "abandon" in text.lower() or "called again" in text.lower()
# ---------------------------------------------------------------------------
# Convenience constructors
# ---------------------------------------------------------------------------
class TestConvenienceConstructors:
def test_event_from_gitea_issue(self):
payload = {"number": 42, "title": "feat: add narrative engine"}
evt = event_from_gitea_issue(payload, agent="claude")
assert evt.kind == EventKind.DISPATCH
assert "42" in evt.detail
assert evt.agent == "claude"
def test_event_from_heartbeat(self):
hb = {"model": "claude-sonnet", "status": "thinking", "cycle": 7}
evt = event_from_heartbeat(hb)
assert evt.kind == EventKind.HEARTBEAT
assert evt.agent == "claude-sonnet"
assert "7" in evt.detail
def test_event_from_commit(self):
commit = {"message": "feat: chronicle\n\nFixes #1607", "sha": "abc1234567"}
evt = event_from_commit(commit, agent="claude")
assert evt.kind == EventKind.COMMIT
assert evt.detail == "feat: chronicle" # subject line only
assert evt.metadata["sha"] == "abc12345"

View File

@@ -1,58 +0,0 @@
from pathlib import Path
import yaml
from nexus.mempalace.config import CORE_ROOMS
from nexus.mempalace.conversation_artifacts import (
ConversationArtifact,
build_request_response_artifact,
extract_alexander_request_pairs,
normalize_speaker,
)
def test_sovereign_room_is_core_room() -> None:
assert "sovereign" in CORE_ROOMS
rooms_yaml = yaml.safe_load(Path("mempalace/rooms.yaml").read_text())
assert any(room["key"] == "sovereign" for room in rooms_yaml["core_rooms"])
def test_normalize_speaker_maps_alexander_variants() -> None:
assert normalize_speaker("Alexander Whitestone") == "alexander"
assert normalize_speaker("Rockachopa") == "alexander"
assert normalize_speaker(" ALEXANDER ") == "alexander"
assert normalize_speaker("Bezalel") == "bezalel"
def test_build_request_response_artifact_creates_sovereign_metadata() -> None:
artifact = build_request_response_artifact(
requester="Alexander Whitestone",
responder="Allegro",
request_text="Please organize my conversation artifacts.",
response_text="I will catalog them under a sovereign room.",
source="telegram:timmy-time",
timestamp="2026-04-16T01:30:00Z",
)
assert isinstance(artifact, ConversationArtifact)
assert artifact.room == "sovereign"
assert artifact.metadata["speaker_tags"] == ["speaker:alexander", "speaker:allegro"]
assert artifact.metadata["artifact_type"] == "alexander_request_response"
assert artifact.metadata["responder"] == "allegro"
assert "## Alexander Request" in artifact.text
assert "## Wizard Response" in artifact.text
def test_extract_alexander_request_pairs_finds_following_wizard_response() -> None:
turns = [
{"speaker": "Alexander Whitestone", "text": "Catalog my requests as artifacts.", "timestamp": "2026-04-16T01:00:00Z"},
{"speaker": "Allegro", "text": "I'll build a sovereign room contract.", "timestamp": "2026-04-16T01:01:00Z"},
{"speaker": "Alexander", "text": "Make sure my words are easy to recall.", "timestamp": "2026-04-16T01:02:00Z"},
{"speaker": "Allegro", "text": "I will tag them with speaker metadata.", "timestamp": "2026-04-16T01:03:00Z"},
]
artifacts = extract_alexander_request_pairs(turns, responder="Allegro", source="telegram")
assert len(artifacts) == 2
assert artifacts[0].metadata["request_timestamp"] == "2026-04-16T01:00:00Z"
assert artifacts[1].metadata["response_timestamp"] == "2026-04-16T01:03:00Z"

109
tests/test_review_gate.py Normal file
View File

@@ -0,0 +1,109 @@
"""
Tests for scripts/review_gate.py — Anti-rubber-stamping PR validation.
"""
import unittest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from review_gate import check_empty_pr, check_approvals, check_rubber_stamp
class TestCheckEmptyPr(unittest.TestCase):
def test_valid_pr(self):
pr = {"additions": 10, "deletions": 5, "changed_files": 2}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_empty_pr(self):
pr = {"additions": 0, "deletions": 0, "changed_files": 0}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
self.assertIn("ZOMBIE", msg)
def test_additions_only(self):
pr = {"additions": 50, "deletions": 0, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_deletions_only(self):
pr = {"additions": 0, "deletions": 30, "changed_files": 1}
ok, msg = check_empty_pr(pr)
self.assertTrue(ok)
def test_missing_fields_treated_as_zero(self):
pr = {}
ok, msg = check_empty_pr(pr)
self.assertFalse(ok)
class TestCheckApprovals(unittest.TestCase):
def test_has_approval(self):
reviews = [{"state": "APPROVED"}, {"state": "COMMENT"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 1)
def test_multiple_approvals(self):
reviews = [{"state": "APPROVED"}, {"state": "APPROVED"}]
ok, count = check_approvals(reviews)
self.assertTrue(ok)
self.assertEqual(count, 2)
def test_no_approvals(self):
reviews = [{"state": "COMMENT"}, {"state": "REQUEST_CHANGES"}]
ok, count = check_approvals(reviews)
self.assertFalse(ok)
self.assertEqual(count, 0)
def test_empty_reviews(self):
ok, count = check_approvals([])
self.assertFalse(ok)
self.assertEqual(count, 0)
class TestCheckRubberStamp(unittest.TestCase):
def test_substantive_pr_no_warning(self):
pr = {"additions": 100, "deletions": 50}
reviews = [{"state": "APPROVED", "body": "Looks good, nice changes"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
self.assertIsNone(msg)
def test_trivial_pr_empty_review_detected(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_short_review_detected(self):
pr = {"additions": 1, "deletions": 1}
reviews = [{"state": "APPROVED", "body": "ok"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertFalse(ok)
self.assertIn("SUSPICIOUS", msg)
def test_trivial_pr_substantive_review_ok(self):
pr = {"additions": 2, "deletions": 0}
reviews = [{"state": "APPROVED", "body": "This small fix is correct. Tested locally."}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_no_approvals_skips_check(self):
pr = {"additions": 0, "deletions": 0}
reviews = [{"state": "COMMENT"}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok)
def test_large_pr_with_empty_review_ok(self):
pr = {"additions": 500, "deletions": 200}
reviews = [{"state": "APPROVED", "body": ""}]
ok, msg = check_rubber_stamp(pr, reviews)
self.assertTrue(ok) # Large PR, empty review is less suspicious
if __name__ == "__main__":
unittest.main()