Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
89de5b2c69 fix: make tower world events affect gameplay
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 16s
Agent PR Gate / gate (pull_request) Failing after 40s
Smoke Test / smoke (pull_request) Failing after 27s
Agent PR Gate / report (pull_request) Successful in 21s
Closes #513
2026-04-22 10:44:56 -04:00
8 changed files with 195 additions and 572 deletions

20
SOUL.md
View File

@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
---
## What Honesty Requires
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
The following are not optional. Any implementation that calls itself Timmy must build them.
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
---
## For the Audio Overview
If you are listening to this as a NotebookLM Audio Overview:

View File

@@ -1059,6 +1059,46 @@ class GameEngine:
self.log("It will always pulse. That much you know.")
self.log("")
self.world.save()
def _bridge_is_hazardous(self):
bridge = self.world.rooms["Bridge"]
return bool(
self.world.state.get("bridge_flooding")
or bridge.get("weather") == "rain"
or bridge.get("rain_ticks", 0) > 0
)
def _bridge_crossing_extra_cost(self, current_room, dest):
if "Bridge" not in (current_room, dest):
return 0
return 2 if self._bridge_is_hazardous() else 0
def _event_dialogue(self, char_name, room_name):
if char_name == "Bezalel" and room_name == "Forge":
if self.world.rooms["Forge"]["fire"] == "cold":
return random.choice([
"The forge is cold. We cannot work until the fire lives again.",
"No forging now. The hearth is dead cold.",
])
if self.world.state.get("forge_fire_dying"):
return random.choice([
"The fire is dying. Tend it before the forge goes dark.",
"The forge is losing heat. Help me keep it alive.",
])
if char_name == "Ezra" and room_name == "Tower" and self.world.state.get("tower_power_low"):
return random.choice([
"The Tower power is too low. The servers won't hold a clean study right now.",
"The LED is flickering. We need steady power before the Tower can be read properly.",
])
if char_name in {"Marcus", "Allegro"} and room_name == "Bridge" and self._bridge_is_hazardous():
return random.choice([
"The Bridge is slick with rain. Cross carefully or wait it out.",
"This rain changes the Bridge. Don't treat it like dry stone.",
])
return None
def log(self, message):
"""Add to Timmy's log."""
@@ -1094,6 +1134,7 @@ class GameEngine:
}
# Process Timmy's action
room_name = self.world.characters["Timmy"]["room"]
timmy_energy = self.world.characters["Timmy"]["energy"]
# Energy constraint checks
@@ -1156,8 +1197,17 @@ class GameEngine:
if direction in connections:
dest = connections[direction]
bridge_extra_cost = self._bridge_crossing_extra_cost(current_room, dest)
move_cost = 1 + bridge_extra_cost
if self.world.characters["Timmy"]["energy"] < move_cost:
scene["log"].append("The rain makes the Bridge too costly to cross right now. Rest first.")
scene["room_desc"] = self.world.get_room_desc(current_room, "Timmy")
here = [n for n in self.world.characters if self.world.characters[n]["room"] == current_room and n != "Timmy"]
scene["here"] = here
return scene
self.world.characters["Timmy"]["room"] = dest
self.world.characters["Timmy"]["energy"] -= 1
self.world.characters["Timmy"]["energy"] -= move_cost
scene["log"].append(f"You move {direction} to The {dest}.")
scene["timmy_room"] = dest
@@ -1165,6 +1215,8 @@ class GameEngine:
# Check for rain on bridge
if dest == "Bridge" and self.world.rooms["Bridge"]["weather"] == "rain":
scene["world_events"].append("Rain mists on the dark water below. The railing is slick.")
if bridge_extra_cost:
scene["log"].append("Rain turns the Bridge crossing into work. You brace against the slick stone. (-2 extra energy)")
# Check trust changes for arrival
here = [n for n in self.world.characters if self.world.characters[n]["room"] == dest and n != "Timmy"]
@@ -1310,25 +1362,69 @@ class GameEngine:
elif timmy_action == "write_rule":
if self.world.characters["Timmy"]["room"] == "Tower":
rules = [
f"Rule #{self.world.tick}: The room remembers those who enter it.",
f"Rule #{self.world.tick}: A man in the dark needs to know someone is in the room.",
f"Rule #{self.world.tick}: The forge does not care about your schedule.",
f"Rule #{self.world.tick}: Every footprint on the stone means someone made it here.",
f"Rule #{self.world.tick}: The bridge does not judge. It only carries.",
f"Rule #{self.world.tick}: A seed planted in patience grows in time.",
f"Rule #{self.world.tick}: What is carved in wood outlasts what is said in anger.",
f"Rule #{self.world.tick}: The garden grows whether anyone watches or not.",
f"Rule #{self.world.tick}: Trust is built one tick at a time.",
f"Rule #{self.world.tick}: The fire remembers who tended it.",
]
new_rule = random.choice(rules)
self.world.rooms["Tower"]["messages"].append(new_rule)
self.world.characters["Timmy"]["energy"] -= 1
scene["log"].append(f"You write on the Tower whiteboard: \"{new_rule}\"")
if self.world.state.get("tower_power_low"):
scene["world_events"].append("The Tower power is too low. The LED flickers over the whiteboard.")
scene["log"].append("The power is too low to write a new rule.")
else:
rules = [
f"Rule #{self.world.tick}: The room remembers those who enter it.",
f"Rule #{self.world.tick}: A man in the dark needs to know someone is in the room.",
f"Rule #{self.world.tick}: The forge does not care about your schedule.",
f"Rule #{self.world.tick}: Every footprint on the stone means someone made it here.",
f"Rule #{self.world.tick}: The bridge does not judge. It only carries.",
f"Rule #{self.world.tick}: A seed planted in patience grows in time.",
f"Rule #{self.world.tick}: What is carved in wood outlasts what is said in anger.",
f"Rule #{self.world.tick}: The garden grows whether anyone watches or not.",
f"Rule #{self.world.tick}: Trust is built one tick at a time.",
f"Rule #{self.world.tick}: The fire remembers who tended it.",
]
new_rule = random.choice(rules)
self.world.rooms["Tower"]["messages"].append(new_rule)
self.world.characters["Timmy"]["energy"] -= 1
scene["log"].append(f"You write on the Tower whiteboard: \"{new_rule}\"")
else:
scene["log"].append("You are not in the Tower.")
elif timmy_action == "study":
if self.world.characters["Timmy"]["room"] == "Tower":
if self.world.state.get("tower_power_low"):
scene["world_events"].append("The Tower power is too low. The servers stutter in weak light.")
scene["log"].append("The power is too low to study the servers.")
else:
insights = [
"You study the server rhythm until the pulse resolves into something readable.",
"You trace the signal paths and feel the Tower settle into focus.",
"You study the green LED and the server racks until the pattern becomes clear.",
]
insight = random.choice(insights)
self.world.characters["Timmy"]["energy"] -= 1
self.world.characters["Timmy"]["memories"].append(insight)
scene["log"].append(insight)
scene["world_events"].append("The Tower answers with a steady hum.")
else:
scene["log"].append("You are not in the Tower.")
elif timmy_action == "forge":
if self.world.characters["Timmy"]["room"] == "Forge":
forge_fire = self.world.rooms["Forge"]["fire"]
if forge_fire == "cold":
scene["world_events"].append("The forge is cold. No metal will take shape here yet.")
scene["log"].append("The forge is cold. Tend the fire before you try to forge.")
else:
forged_items = [
f"bridge nail #{self.world.tick}",
f"tower key blank #{self.world.tick}",
f"garden trowel #{self.world.tick}",
]
forged_item = random.choice(forged_items)
self.world.rooms["Forge"]["forged_items"].append(forged_item)
self.world.characters["Timmy"]["energy"] -= 2
self.world.state["items_crafted"] += 1
scene["log"].append(f"You forge {forged_item} at the anvil.")
scene["world_events"].append("The anvil rings and the hearth answers.")
else:
scene["log"].append("You are not in the Forge.")
elif timmy_action == "carve":
if self.world.characters["Timmy"]["room"] == "Bridge":
carvings = [
@@ -1414,7 +1510,11 @@ class GameEngine:
speech_chance = 0.20
if random.random() < speech_chance:
if char_name == "Marcus":
event_line = self._event_dialogue(char_name, room_name)
if event_line:
self.world.characters[char_name]["spoken"].append(event_line)
scene["log"].append(f"{char_name} says: \"{event_line}\"")
elif char_name == "Marcus":
marcus_pool = self.DIALOGUES["Marcus"].get(phase, self.DIALOGUES["Marcus"]["quietus"])
line = random.choice(marcus_pool)
self.world.characters[char_name]["spoken"].append(line)

View File

@@ -1,12 +1 @@
# Timmy core module
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
from .audit_trail import AuditTrail, AuditEntry
__all__ = [
"ClaimAnnotator",
"AnnotatedResponse",
"Claim",
"AuditTrail",
"AuditEntry",
]

View File

@@ -1,156 +0,0 @@
#!/usr/bin/env python3
"""
Response Claim Annotator — Source Distinction System
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
a verified source I can point to, or my own pattern-matching. My user must be
able to tell which is which."
"""
import re
import json
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict
@dataclass
class Claim:
"""A single claim in a response, annotated with source type."""
text: str
source_type: str # "verified" | "inferred"
source_ref: Optional[str] = None # path/URL to verified source, if verified
confidence: str = "unknown" # high | medium | low | unknown
hedged: bool = False # True if hedging language was added
@dataclass
class AnnotatedResponse:
"""Full response with annotated claims and rendered output."""
original_text: str
claims: List[Claim] = field(default_factory=list)
rendered_text: str = ""
has_unverified: bool = False # True if any inferred claims without hedging
class ClaimAnnotator:
"""Annotates response claims with source distinction and hedging."""
# Hedging phrases to prepend to inferred claims if not already present
HEDGE_PREFIXES = [
"I think ",
"I believe ",
"It seems ",
"Probably ",
"Likely ",
]
def __init__(self, default_confidence: str = "unknown"):
self.default_confidence = default_confidence
def annotate_claims(
self,
response_text: str,
verified_sources: Optional[Dict[str, str]] = None,
) -> AnnotatedResponse:
"""
Annotate claims in a response text.
Args:
response_text: Raw response from the model
verified_sources: Dict mapping claim substrings to source references
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
Returns:
AnnotatedResponse with claims marked and rendered text
"""
verified_sources = verified_sources or {}
claims = []
has_unverified = False
# Simple sentence splitting (naive, but sufficient for MVP)
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
for sent in sentences:
# Check if sentence is a claim we can verify
matched_source = None
for claim_substr, source_ref in verified_sources.items():
if claim_substr.lower() in sent.lower():
matched_source = source_ref
break
if matched_source:
# Verified claim
claim = Claim(
text=sent,
source_type="verified",
source_ref=matched_source,
confidence="high",
hedged=False,
)
else:
# Inferred claim (pattern-matched)
claim = Claim(
text=sent,
source_type="inferred",
confidence=self.default_confidence,
hedged=self._has_hedge(sent),
)
if not claim.hedged:
has_unverified = True
claims.append(claim)
# Render the annotated response
rendered = self._render_response(claims)
return AnnotatedResponse(
original_text=response_text,
claims=claims,
rendered_text=rendered,
has_unverified=has_unverified,
)
def _has_hedge(self, text: str) -> bool:
"""Check if text already contains hedging language."""
text_lower = text.lower()
for prefix in self.HEDGE_PREFIXES:
if text_lower.startswith(prefix.lower()):
return True
# Also check for inline hedges
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
return any(word in text_lower for word in hedge_words)
def _render_response(self, claims: List[Claim]) -> str:
"""
Render response with source distinction markers.
Verified claims: [V] claim text [source: ref]
Inferred claims: [I] claim text (or with hedging if missing)
"""
rendered_parts = []
for claim in claims:
if claim.source_type == "verified":
part = f"[V] {claim.text}"
if claim.source_ref:
part += f" [source: {claim.source_ref}]"
else: # inferred
if not claim.hedged:
# Add hedging if missing
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
part = f"[I] {hedged_text}"
else:
part = f"[I] {claim.text}"
rendered_parts.append(part)
return " ".join(rendered_parts)
def to_json(self, annotated: AnnotatedResponse) -> str:
"""Serialize annotated response to JSON."""
return json.dumps(
{
"original_text": annotated.original_text,
"rendered_text": annotated.rendered_text,
"has_unverified": annotated.has_unverified,
"claims": [asdict(c) for c in annotated.claims],
},
indent=2,
ensure_ascii=False,
)

View File

@@ -1,6 +1,7 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import unittest
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
@@ -66,6 +67,82 @@ class TestEvenniaLocalWorldGame(unittest.TestCase):
self.assertIn("Ezra is already here.", result["log"])
self.assertIn("The servers hum steady. The green LED pulses.", result["world_events"])
def test_bridge_rain_crossing_costs_extra_energy_and_warns(self):
module = load_game_module()
dry_engine = module.GameEngine()
dry_engine.start_new_game()
dry_engine.world.update_world_state = lambda: None
dry_engine.world.characters["Timmy"]["energy"] = 10
dry_result = dry_engine.run_tick("move:south")
dry_energy = dry_engine.world.characters["Timmy"]["energy"]
rainy_engine = module.GameEngine()
rainy_engine.start_new_game()
rainy_engine.world.update_world_state = lambda: None
rainy_engine.world.characters["Timmy"]["energy"] = 10
rainy_engine.world.rooms["Bridge"]["weather"] = "rain"
rainy_engine.world.rooms["Bridge"]["rain_ticks"] = 3
rainy_engine.world.state["bridge_flooding"] = True
rainy_result = rainy_engine.run_tick("move:south")
self.assertEqual(rainy_engine.world.characters["Timmy"]["room"], "Bridge")
self.assertLess(rainy_engine.world.characters["Timmy"]["energy"], dry_energy)
self.assertTrue(
any("bridge" in line.lower() and ("rain" in line.lower() or "slick" in line.lower()) for line in rainy_result["log"] + rainy_result["world_events"]),
rainy_result,
)
def test_tower_power_low_blocks_study_and_write_rule(self):
module = load_game_module()
engine = module.GameEngine()
engine.start_new_game()
engine.world.update_world_state = lambda: None
engine.world.characters["Timmy"]["room"] = "Tower"
engine.world.characters["Timmy"]["energy"] = 10
engine.world.state["tower_power_low"] = True
rules_before = list(engine.world.rooms["Tower"]["messages"])
study_result = engine.run_tick("study")
self.assertEqual(engine.world.characters["Timmy"]["energy"], 10)
self.assertTrue(
any("power" in line.lower() and ("study" in line.lower() or "servers" in line.lower()) for line in study_result["log"] + study_result["world_events"]),
study_result,
)
write_result = engine.run_tick("write_rule")
self.assertEqual(engine.world.rooms["Tower"]["messages"], rules_before)
self.assertTrue(
any("power" in line.lower() and ("write" in line.lower() or "whiteboard" in line.lower()) for line in write_result["log"] + write_result["world_events"]),
write_result,
)
def test_cold_forge_blocks_forge_action_and_bezalel_reacts(self):
module = load_game_module()
engine = module.GameEngine()
engine.start_new_game()
engine.world.update_world_state = lambda: None
engine.npc_ai.make_choice = lambda _name: None
engine.world.characters["Timmy"]["room"] = "Forge"
engine.world.characters["Timmy"]["energy"] = 10
engine.world.characters["Bezalel"]["room"] = "Forge"
engine.world.rooms["Forge"]["fire"] = "cold"
engine.world.state["forge_fire_dying"] = True
forged_before = list(engine.world.rooms["Forge"]["forged_items"])
with patch.object(module.random, "random", return_value=0.0), patch.object(module.random, "choice", side_effect=lambda seq: seq[0]):
result = engine.run_tick("forge")
self.assertEqual(engine.world.rooms["Forge"]["forged_items"], forged_before)
self.assertTrue(
any("forge" in line.lower() and ("cold" in line.lower() or "fire" in line.lower()) for line in result["log"] + result["world_events"]),
result,
)
self.assertTrue(
any(line.startswith("Bezalel says:") and ("fire" in line.lower() or "forge" in line.lower()) for line in result["log"]),
result,
)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""Smoke test for load_cap_enforcer.py — validates structure and dry-run path.
Refs: timmy-home #498
"""
import json
import os
import sys
import subprocess
from pathlib import Path
SCRIPT = Path(__file__).parent.parent / "timmy-config" / "bin" / "load_cap_enforcer.py"
def test_script_exists_and_is_executable():
assert SCRIPT.exists(), f"Script not found: {SCRIPT}"
assert os.access(SCRIPT, os.X_OK), "Script not executable"
def test_dry_run_help():
result = subprocess.run([sys.executable, str(SCRIPT), "--help"], capture_output=True, text=True)
assert result.returncode == 0
assert "--dry-run" in result.stdout
assert "--cap" in result.stdout
assert "Enforce open-issue load cap" in result.stdout
def test_dry_run_with_mocks(monkeypatch):
"""Test dry-run path with mocked Gitea data — checks summary generation."""
# Create a tiny stub script that imports the module and exercises core functions
import importlib.util
spec = importlib.util.spec_from_file_location("load_cap_enforcer", SCRIPT)
mod = importlib.util.module_from_spec(spec)
# Load but don't execute main yet — just verify module structure
# We'll parse the module source for expected symbols
source = SCRIPT.read_text()
assert "fetch_all_open_issues" in source
assert "build_summary" in source
assert "unassignment_map" in source
assert "COMMENT_TEMPLATE" in source
assert "Unassigned from @{assignee} due to load cap" in source
if __name__ == "__main__":
# Run minimal smoke checks when invoked directly
test_script_exists_and_is_executable()
print("✓ Script exists and is executable")
test_dry_run_help()
print("✓ --help works")
test_dry_run_with_mocks(type('obj', (object,), {'assert': lambda *a: True})())
print("✓ Core structure verified")
print("\nAll smoke tests passed.")

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env python3
"""Tests for claim_annotator.py — verifies source distinction is present."""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
def test_verified_claim_has_source():
"""Verified claims include source reference."""
annotator = ClaimAnnotator()
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
response = "Paris is the capital of France. It is a beautiful city."
result = annotator.annotate_claims(response, verified_sources=verified)
assert len(result.claims) > 0
verified_claims = [c for c in result.claims if c.source_type == "verified"]
assert len(verified_claims) == 1
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
assert "[V]" in result.rendered_text
assert "[source:" in result.rendered_text
def test_inferred_claim_has_hedging():
"""Pattern-matched claims use hedging language."""
annotator = ClaimAnnotator()
response = "The weather is nice today. It might rain tomorrow."
result = annotator.annotate_claims(response)
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
assert len(inferred_claims) >= 1
# Check that rendered text has [I] marker
assert "[I]" in result.rendered_text
# Check that unhedged inferred claims get hedging
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
def test_hedged_claim_not_double_hedged():
"""Claims already with hedging are not double-hedged."""
annotator = ClaimAnnotator()
response = "I think the sky is blue. It is a nice day."
result = annotator.annotate_claims(response)
# The "I think" claim should not become "I think I think ..."
assert "I think I think" not in result.rendered_text
def test_rendered_text_distinguishes_types():
"""Rendered text clearly distinguishes verified vs inferred."""
annotator = ClaimAnnotator()
verified = {"Earth is round": "https://science.org/earth"}
response = "Earth is round. Stars are far away."
result = annotator.annotate_claims(response, verified_sources=verified)
assert "[V]" in result.rendered_text # verified marker
assert "[I]" in result.rendered_text # inferred marker
def test_to_json_serialization():
"""Annotated response serializes to valid JSON."""
annotator = ClaimAnnotator()
response = "Test claim."
result = annotator.annotate_claims(response)
json_str = annotator.to_json(result)
parsed = json.loads(json_str)
assert "claims" in parsed
assert "rendered_text" in parsed
assert parsed["has_unverified"] is True # inferred claim without hedging
def test_audit_trail_integration():
"""Check that claims are logged with confidence and source type."""
# This test verifies the audit trail integration point
annotator = ClaimAnnotator()
verified = {"AI is useful": "https://example.com/ai"}
response = "AI is useful. It can help with tasks."
result = annotator.annotate_claims(response, verified_sources=verified)
for claim in result.claims:
assert claim.source_type in ("verified", "inferred")
assert claim.confidence in ("high", "medium", "low", "unknown")
if claim.source_type == "verified":
assert claim.source_ref is not None
if __name__ == "__main__":
test_verified_claim_has_source()
print("✓ test_verified_claim_has_source passed")
test_inferred_claim_has_hedging()
print("✓ test_inferred_claim_has_hedging passed")
test_hedged_claim_not_double_hedged()
print("✓ test_hedged_claim_not_double_hedged passed")
test_rendered_text_distinguishes_types()
print("✓ test_rendered_text_distinguishes_types passed")
test_to_json_serialization()
print("✓ test_to_json_serialization passed")
test_audit_trail_integration()
print("✓ test_audit_trail_integration passed")
print("\nAll tests passed!")

View File

@@ -1,210 +0,0 @@
#!/usr/bin/env python3
"""
Open-Load Cap Enforcement — Audit-B3
Scans multiple repos for open issues, enforces a per-agent open-issue cap,
auto-unassigns overflow (oldest first), and posts a summary.
Acceptance (timmy-home #498):
- Lives in timmy-config/bin/load_cap_enforcer.py
- Scans timmy-home, timmy-config, the-nexus, hermes-agent
- Cap: 25 open issues per agent (configurable)
- Unassign oldest overflow, comment on each
- Dry-run first, then live; summary posted on parent issue #495
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
# ── Configuration ─────────────────────────────────────────────────────────────
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
REPOS = ["timmy-home", "timmy-config", "the-nexus", "hermes-agent"]
TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
DEFAULT_CAP = 25
COMMENT_TEMPLATE = "Unassigned from @{{assignee}} due to load cap. Available for pickup."
def load_token() -> str:
if TOKEN_PATH.exists():
return TOKEN_PATH.read_text().strip()
tok = os.environ.get("GITEA_TOKEN", "")
if tok:
return tok
sys.exit("ERROR: Gitea token not found at ~/.config/gitea/token or GITEA_TOKEN env")
def api(method: str, path: str, token: str, data=None):
url = f"{GITEA_BASE}{path}"
body = json.dumps(data).encode() if data else None
headers = {"Authorization": f"token {token}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read()), resp.status
except urllib.error.HTTPError as e:
err = e.read().decode() if e.fp else str(e)
print(f" API {e.code}: {err}", file=sys.stderr)
return None, e.code
except Exception as e:
print(f" Request error: {e}", file=sys.stderr)
return None, None
def fetch_all_open_issues(token: str):
all_issues = []
for repo in REPOS:
page = 1
while True:
data, status = api("GET", f"/repos/{ORG}/{repo}/issues?state=open&page={page}&limit=50", token)
if status != 200 or not data:
break
all_issues.extend(data)
if len(data) < 50:
break
page += 1
return all_issues
def build_summary(by_agent: dict, unassignment_map: dict):
lines = []
lines.append("Agent | Before | After | Unassigned Count")
lines.append("-" * 50)
for agent in sorted(by_agent.keys()):
before = by_agent[agent]["before"]
after = by_agent[agent]["after"]
unassigned = len(unassignment_map.get(agent, []))
lines.append(f"@{agent} | {before} | {after} | {unassigned}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Enforce open-issue load cap per agent")
parser.add_argument("--dry-run", action="store_true", help="Report without making changes")
parser.add_argument("--cap", type=int, default=DEFAULT_CAP, help=f"Max open issues per agent (default: {DEFAULT_CAP})")
parser.add_argument("--output", type=str, default=None, help="Write summary to file")
parser.add_argument("--comment-on", type=int, default=None, help="Post summary as comment on timmy-home issue N")
args = parser.parse_args()
token = load_token()
print(f"Fetching open issues from {', '.join(REPOS)} ...")
issues = fetch_all_open_issues(token)
print(f"Fetched {len(issues)} open issues.")
# Group by assignee
by_agent = defaultdict(lambda: {"before": 0, "issues": []})
for iss in issues:
for a in (iss.get("assignees") or []):
login = a.get("login")
if login:
by_agent[login]["issues"].append(iss)
by_agent[login]["before"] += 1
print(f"\nAgents with open issues: {list(by_agent.keys())}")
for agent, d in sorted(by_agent.items()):
print(f" @{agent}: {d['before']} issues")
# Identify overflow
unassignment_map = defaultdict(list)
for agent, d in by_agent.items():
count = d["before"]
if count > args.cap:
overflow = count - args.cap
issues_sorted = sorted(d["issues"], key=lambda i: i.get("created_at", ""))
unassignment_map[agent] = issues_sorted[:overflow]
print(f"\n@{agent} exceeds cap ({count} > {args.cap}); will unassign {overflow} oldest issue(s):")
for iss in issues_sorted[:overflow]:
print(f" - #{iss['number']}: {iss.get('title', '')[:50]}")
# Dry-run: just show summary and exit
if args.dry_run:
print("\n=== DRY RUN — no changes made ===")
# For dry-run, after = before (no changes)
for agent in by_agent:
by_agent[agent]["after"] = by_agent[agent]["before"]
summary = build_summary(by_agent, unassignment_map)
print("\n" + summary)
if args.output:
Path(args.output).write_text(summary)
print(f"\nSummary written to {args.output}")
return 0
# LIVE: perform unassignments and comments (concurrent)
print("\n=== LIVE RUN — executing ===")
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
lock = threading.Lock()
tasks = []
for agent, issues_to_unassign in unassignment_map.items():
for iss in issues_to_unassign:
issue_num = iss["number"]
repo_name = next(
(r for r in REPOS if f"/{r}/issues/" in iss.get("html_url", "")), REPOS[0]
)
tasks.append((agent, issue_num, repo_name, iss))
print(f"Total unassignment tasks: {len(tasks)}")
def do_task(agent, issue_num, repo_name, iss):
# Unassign
_, status1 = api("PATCH", f"/repos/{ORG}/{repo_name}/issues/{issue_num}", token, {"assignees": []})
if status1 not in (200, 201, 204):
return (agent, issue_num, repo_name, False, f"unassign HTTP {status1}")
# Comment
comment_body = COMMENT_TEMPLATE.format(assignee=agent)
_, status2 = api("POST", f"/repos/{ORG}/{repo_name}/issues/{issue_num}/comments", token, {"body": comment_body})
if status2 not in (200, 201):
return (agent, issue_num, repo_name, True, f"unassigned but comment HTTP {status2}")
return (agent, issue_num, repo_name, True, "OK")
completed = 0
with ThreadPoolExecutor(max_workers=12) as executor:
futures = [executor.submit(do_task, a, n, r, i) for (a, n, r, i) in tasks]
for fut in as_completed(futures):
agent, num, repo, ok, msg = fut.result()
with lock:
completed += 1
if completed % 50 == 0:
print(f" Progress: {completed}/{len(tasks)}")
if ok:
print(f" ✓ #{num} ({repo})")
else:
print(f" ✗ #{num} ({repo}): {msg}")
# Recompute after counts for summary
print("\nRecomputing after counts ...")
after_issues = fetch_all_open_issues(token)
by_agent_after = defaultdict(int)
for iss in after_issues:
for a in (iss.get("assignees") or []):
by_agent_after[a.get("login")] += 1
for agent in by_agent:
by_agent[agent]["after"] = by_agent_after.get(agent, 0)
summary = build_summary(by_agent, unassignment_map)
print("\n=== SUMMARY ===")
print(summary)
if args.output:
Path(args.output).write_text(summary)
print(f"Summary written to {args.output}")
if args.comment_on:
body = f"Open-load cap enforcement run (cap={args.cap}):\n\n```\n{summary}\n```"
_, status = api("POST", f"/repos/{ORG}/timmy-home/issues/{args.comment_on}/comments", token, {"body": body})
if status in (200, 201):
print(f"\nSummary posted as comment on timmy-home issue #{args.comment_on}")
else:
print(f"\nWARNING: failed to post comment (HTTP {status})")
return 0
if __name__ == "__main__":
sys.exit(main())