Compare commits

..

1 Commits

Author SHA1 Message Date
64a46e6b37 test: add unit tests for multi_user_bridge.py (#1541)
Some checks failed
CI / test (pull_request) Failing after 1m24s
Review Approval Gate / verify-review (pull_request) Successful in 14s
CI / validate (pull_request) Failing after 1m33s
2026-04-15 04:03:47 +00:00
3 changed files with 313 additions and 270 deletions

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════
# claim-issue.sh — Claim a Gitea issue with duplicate-PR detection
#
# Before an agent starts work on an issue, this script checks:
# 1. Is the issue already assigned?
# 2. Do open PRs already reference this issue?
# 3. Is the issue closed?
#
# Only proceeds to assign if all checks pass.
#
# Usage:
# ./scripts/claim-issue.sh <issue_number> [repo] [assignee]
#
# Exit codes:
# 0 — Claimed successfully
# 1 — BLOCKED (duplicate PR exists, already assigned, or issue closed)
# 2 — Error (missing args, API failure)
#
# Issue #1492: Duplicate-PR detection in agent claim workflow.
# Issue #1480: The meta-problem this prevents.
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
ISSUE_NUM="${1:-}"
REPO="${2:-Timmy_Foundation/the-nexus}"
ASSIGNEE="${3:-timmy}"
if [ -z "$ISSUE_NUM" ]; then
echo "Usage: $0 <issue_number> [repo] [assignee]"
echo "Example: $0 1128"
echo " $0 1339 Timmy_Foundation/the-nexus allegro"
exit 2
fi
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:-}"
if [ -z "$GITEA_TOKEN" ]; then
TOKEN_FILE="${HOME}/.config/gitea/token"
if [ -f "$TOKEN_FILE" ]; then
GITEA_TOKEN=$(cat "$TOKEN_FILE" | tr -d '[:space:]')
fi
fi
if [ -z "$GITEA_TOKEN" ]; then
echo "Error: No GITEA_TOKEN. Set env var or create ~/.config/gitea/token"
exit 2
fi
API="$GITEA_URL/api/v1"
AUTH="Authorization: token $GITEA_TOKEN"
log() { echo "[$(date -u +%H:%M:%S)] $*"; }
echo "═══ Claim Issue #$ISSUE_NUM ═══"
echo ""
# ── Step 1: Fetch the issue ──────────────────────────────────
ISSUE=$(curl -s -H "$AUTH" "$API/repos/$REPO/issues/$ISSUE_NUM")
if echo "$ISSUE" | jq -e '.message' > /dev/null 2>&1; then
ERROR=$(echo "$ISSUE" | jq -r '.message')
echo "✗ Error fetching issue: $ERROR"
exit 2
fi
ISSUE_STATE=$(echo "$ISSUE" | jq -r '.state')
ISSUE_TITLE=$(echo "$ISSUE" | jq -r '.title')
ISSUE_ASSIGNEES=$(echo "$ISSUE" | jq -r '.assignees // [] | map(.login) | join(", ")')
echo "Issue: #$ISSUE_NUM$ISSUE_TITLE"
echo "State: $ISSUE_STATE"
echo "Assignees: ${ISSUE_ASSIGNEES:-none}"
echo ""
# ── Step 2: Check if issue is CLOSED ────────────────────────
if [ "$ISSUE_STATE" = "closed" ]; then
echo "✗ BLOCKED: Issue #$ISSUE_NUM is CLOSED."
echo " Do not work on closed issues."
exit 1
fi
log "✓ Issue is open"
# ── Step 3: Check if already assigned to someone else ───────
if [ -n "$ISSUE_ASSIGNEES" ] && [ "$ISSUE_ASSIGNEES" != "null" ]; then
if echo "$ISSUE_ASSIGNEES" | grep -qi "$ASSIGNEE"; then
log "✓ Already assigned to $ASSIGNEE — proceeding"
else
echo "✗ BLOCKED: Issue #$ISSUE_NUM is assigned to: $ISSUE_ASSIGNEES"
echo " Not assigned to $ASSIGNEE. Do not work on others' issues."
exit 1
fi
else
log "✓ Issue is unassigned"
fi
# ── Step 4: Check for existing open PRs ─────────────────────
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100")
ISSUE_STR="#$ISSUE_NUM"
DUPLICATES=$(echo "$OPEN_PRS" | jq -r ".[] | select(.title | test(\"$ISSUE_STR\"; \"i\") or (.body // \"\") | test(\"$ISSUE_STR\"; \"i\")) | \" PR #\\(.number): \\(.title) [\\(.head.ref)] (\\(.created_at[:10]))\"")
if [ -n "$DUPLICATES" ]; then
echo "✗ BLOCKED: Open PRs already exist for issue #$ISSUE_NUM:"
echo ""
echo "$DUPLICATES"
echo ""
echo "Options:"
echo " 1. Review and merge an existing PR"
echo " 2. Close duplicates: ./scripts/cleanup-duplicate-prs.sh --close"
echo " 3. Push to an existing branch"
echo ""
echo "Do NOT create a new PR. See #1492."
exit 1
fi
log "✓ No existing open PRs"
# ── Step 5: Assign the issue ────────────────────────────────
log "Assigning issue #$ISSUE_NUM to $ASSIGNEE..."
ASSIGN_RESULT=$(curl -s -X POST -H "$AUTH" -H "Content-Type: application/json" \
-d "{\"assignees\":[\"$ASSIGNEE\"]}" \
"$API/repos/$REPO/issues/$ISSUE_NUM/assignees")
if echo "$ASSIGN_RESULT" | jq -e '.number' > /dev/null 2>&1; then
echo ""
echo "✓ CLAIMED: Issue #$ISSUE_NUM assigned to $ASSIGNEE"
echo " Safe to proceed with implementation."
exit 0
else
ERROR=$(echo "$ASSIGN_RESULT" | jq -r '.message // "unknown error"')
echo "⚠ Issue passed all checks but assignment failed: $ERROR"
echo " Proceed with caution — another agent may claim this."
exit 0
fi

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""
claim_issue.py — Claim a Gitea issue with duplicate-PR detection.
Before an agent starts work, checks:
1. Is the issue open?
2. Is it already assigned to someone else?
3. Do open PRs already reference this issue?
Only assigns if all checks pass.
Usage:
python3 scripts/claim_issue.py 1492
python3 scripts/claim_issue.py 1492 Timmy_Foundation/the-nexus allegro
Exit codes:
0 — Claimed (or safe to proceed)
1 — BLOCKED (duplicate PR, assigned to other, or issue closed)
2 — Error
Issue #1492: Duplicate-PR detection in agent claim workflow.
"""
import json
import os
import sys
import urllib.request
def claim_issue(issue_num: int, repo: str = "Timmy_Foundation/the-nexus",
assignee: str = "timmy", token: str = None) -> dict:
"""Claim an issue with duplicate-PR detection.
Returns dict with:
claimed (bool): True if safe to proceed
reason (str): Why blocked or claimed
existing_prs (list): Any existing PRs for this issue
"""
gitea_url = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
token = token or os.environ.get("GITEA_TOKEN", "")
if not token:
token_path = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(token_path):
token = open(token_path).read().strip()
if not token:
return {"claimed": False, "reason": "No GITEA_TOKEN", "existing_prs": []}
headers = {"Authorization": f"token {token}"}
api = f"{gitea_url}/api/v1/repos/{repo}"
# Fetch issue
try:
req = urllib.request.Request(f"{api}/issues/{issue_num}", headers=headers)
with urllib.request.urlopen(req, timeout=10) as resp:
issue = json.loads(resp.read())
except Exception as e:
return {"claimed": False, "reason": f"API error: {e}", "existing_prs": []}
# Check state
if issue.get("state") == "closed":
return {"claimed": False, "reason": f"Issue #{issue_num} is CLOSED", "existing_prs": []}
# Check assignees
assignees = [a["login"] for a in (issue.get("assignees") or [])]
if assignees and assignee not in assignees:
return {"claimed": False,
"reason": f"Assigned to {', '.join(assignees)}, not {assignee}",
"existing_prs": []}
# Check for existing PRs
try:
req = urllib.request.Request(f"{api}/pulls?state=open&limit=100", headers=headers)
with urllib.request.urlopen(req, timeout=10) as resp:
prs = json.loads(resp.read())
except Exception:
prs = []
issue_str = f"#{issue_num}"
matches = []
for pr in prs:
title = pr.get("title", "")
body = pr.get("body") or ""
if issue_str in title or issue_str in body:
matches.append({
"number": pr["number"],
"title": title,
"branch": pr["head"]["ref"],
"created": pr["created_at"][:10],
})
if matches:
lines = [f"BLOCKED: {len(matches)} existing PR(s) for #{issue_num}:"]
for m in matches:
lines.append(f" PR #{m['number']}: {m['title']} [{m['branch']}]")
return {"claimed": False, "reason": "\n".join(lines), "existing_prs": matches}
# All checks passed — assign
try:
data = json.dumps({"assignees": [assignee]}).encode()
req = urllib.request.Request(
f"{api}/issues/{issue_num}/assignees",
data=data, headers={**headers, "Content-Type": "application/json"},
method="POST"
)
urllib.request.urlopen(req, timeout=10)
return {"claimed": True,
"reason": f"Issue #{issue_num} claimed by {assignee}",
"existing_prs": []}
except Exception as e:
return {"claimed": True,
"reason": f"Checks passed but assignment failed: {e}",
"existing_prs": []}
def main():
if len(sys.argv) < 2:
print("Usage: claim_issue.py <issue_number> [repo] [assignee]")
print("Example: claim_issue.py 1492")
print(" claim_issue.py 1339 Timmy_Foundation/the-nexus allegro")
sys.exit(2)
issue_num = int(sys.argv[1])
repo = sys.argv[2] if len(sys.argv) > 2 else "Timmy_Foundation/the-nexus"
assignee = sys.argv[3] if len(sys.argv) > 3 else "timmy"
result = claim_issue(issue_num, repo, assignee)
print(result["reason"])
if not result["claimed"]:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,313 @@
#!/usr/bin/env python3
"""
Unit tests for multi_user_bridge.py
23 tests across 5 test classes:
- TestPluginRegistry: Register, unregister, list, fire hooks, thread-safety
- TestChatLogIsolation: Room isolation, rolling buffer, history, thread-safety
- TestPresenceManager: Enter/leave, room queries, isolation, concurrency
- TestConcurrentUsers: Multi-user concurrent chat with isolation verification
"""
import json
import os
import sys
import threading
import time
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add repo root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Mock WORLD_DIR before importing to prevent file writes
with patch.dict(os.environ, {"TIMMY_BRIDGE_PORT": "0"}):
import multi_user_bridge as mub
# ─── TestPluginRegistry ──────────────────────────────────────────────
class TestPluginRegistry:
"""Test the plugin registry system."""
def setup_method(self):
self.registry = mub.PluginRegistry()
def test_register_plugin(self):
plugin = mub.Plugin("test", "A test plugin")
self.registry.register(plugin)
assert self.registry.get("test") is plugin
def test_unregister_plugin(self):
plugin = mub.Plugin("test", "A test plugin")
self.registry.register(plugin)
result = self.registry.unregister("test")
assert result is True
assert self.registry.get("test") is None
def test_unregister_nonexistent(self):
result = self.registry.unregister("nonexistent")
assert result is False
def test_list_plugins(self):
p1 = mub.Plugin("alpha", "First")
p2 = mub.Plugin("beta", "Second")
self.registry.register(p1)
self.registry.register(p2)
names = [p["name"] for p in self.registry.list_plugins()]
assert "alpha" in names
assert "beta" in names
def test_fire_on_message(self):
class EchoPlugin(mub.Plugin):
def on_message(self, user_id, message, room):
return f"echo: {message}"
self.registry.register(EchoPlugin("echo", "Echoes"))
result = self.registry.fire_on_message("u1", "hello", "garden")
assert result == "echo: hello"
def test_fire_on_message_no_handler(self):
plugin = mub.Plugin("noop", "Does nothing")
self.registry.register(plugin)
result = self.registry.fire_on_message("u1", "hello", "garden")
assert result is None
def test_thread_safety(self):
errors = []
def register_many(prefix):
try:
for i in range(50):
p = mub.Plugin(f"{prefix}_{i}", f"Plugin {i}")
self.registry.register(p)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=register_many, args=(f"t{t}",)) for t in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(self.registry.list_plugins()) == 200
# ─── TestChatLogIsolation ────────────────────────────────────────────
class TestChatLogIsolation:
"""Test chat log room isolation and rolling buffer."""
def setup_method(self):
# Patch CHATLOG_FILE to prevent disk writes
self._patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
self._patcher.start()
self.chat_log = mub.ChatLog(max_per_room=5)
def teardown_method(self):
self._patcher.stop()
def test_room_isolation(self):
self.chat_log.log("garden", "say", "Hello garden", user_id="u1")
self.chat_log.log("tower", "say", "Hello tower", user_id="u2")
garden_msgs = self.chat_log.get_history("garden")
tower_msgs = self.chat_log.get_history("tower")
assert len(garden_msgs) == 1
assert garden_msgs[0]["message"] == "Hello garden"
assert len(tower_msgs) == 1
assert tower_msgs[0]["message"] == "Hello tower"
def test_rolling_buffer(self):
for i in range(10):
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
history = self.chat_log.get_history("garden")
assert len(history) == 5 # max_per_room
assert history[0]["message"] == "msg_5" # oldest kept
assert history[-1]["message"] == "msg_9" # newest
def test_get_all_rooms(self):
self.chat_log.log("garden", "say", "hi", user_id="u1")
self.chat_log.log("tower", "say", "hi", user_id="u1")
rooms = self.chat_log.get_all_rooms()
assert set(rooms) == {"garden", "tower"}
def test_history_with_limit(self):
for i in range(10):
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
limited = self.chat_log.get_history("garden", limit=3)
assert len(limited) == 3
assert limited[0]["message"] == "msg_7"
def test_empty_room_history(self):
history = self.chat_log.get_history("nonexistent")
assert history == []
def test_message_types(self):
self.chat_log.log("garden", "say", "said", user_id="u1")
self.chat_log.log("garden", "ask", "asked", user_id="u1")
self.chat_log.log("garden", "system", "notice", user_id=None)
history = self.chat_log.get_history("garden")
types = [m["type"] for m in history]
assert "say" in types
assert "ask" in types
assert "system" in types
def test_since_filter(self):
self.chat_log.log("garden", "say", "old", user_id="u1")
time.sleep(0.05)
cutoff = time.strftime("%Y-%m-%dT%H:%M:%S")
time.sleep(0.05)
self.chat_log.log("garden", "say", "new", user_id="u1")
recent = self.chat_log.get_history("garden", since=cutoff)
assert len(recent) == 1
assert recent[0]["message"] == "new"
def test_thread_safety(self):
errors = []
def log_many(room, n):
try:
for i in range(n):
self.chat_log.log(room, "say", f"{room}_{i}", user_id="u1")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=log_many, args=("garden", 50)),
threading.Thread(target=log_many, args=("tower", 50)),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
# Each room should have exactly max_per_room (5) messages
assert len(self.chat_log.get_history("garden")) == 5
assert len(self.chat_log.get_history("tower")) == 5
# ─── TestPresenceManager ─────────────────────────────────────────────
class TestPresenceManager:
"""Test presence tracking (enter/leave/room queries)."""
def setup_method(self):
self.pm = mub.PresenceManager()
def test_enter_room(self):
event = self.pm.enter_room("u1", "Alice", "garden")
assert event["event"] == "enter"
assert event["user_id"] == "u1"
assert event["room"] == "garden"
def test_leave_room(self):
self.pm.enter_room("u1", "Alice", "garden")
event = self.pm.leave_room("u1", "garden")
assert event is not None
assert event["event"] == "leave"
def test_leave_nonexistent(self):
event = self.pm.leave_room("u1", "garden")
assert event is None
def test_get_players_in_room(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u2", "Bob", "garden")
self.pm.enter_room("u3", "Charlie", "tower")
players = self.pm.get_players_in_room("garden")
names = {p["username"] for p in players}
assert names == {"Alice", "Bob"}
def test_room_isolation(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u2", "Bob", "tower")
garden_players = {p["user_id"] for p in self.pm.get_players_in_room("garden")}
tower_players = {p["user_id"] for p in self.pm.get_players_in_room("tower")}
assert garden_players == {"u1"}
assert tower_players == {"u2"}
def test_cleanup_user(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u1", "Alice", "tower")
events = self.pm.cleanup_user("u1")
assert len(events) == 2
assert len(self.pm.get_players_in_room("garden")) == 0
assert len(self.pm.get_players_in_room("tower")) == 0
def test_say_event(self):
self.pm.enter_room("u1", "Alice", "garden")
event = self.pm.say("u1", "Alice", "garden", "Hello!")
assert event["type"] == "say"
assert event["message"] == "Hello!"
def test_get_room_events(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.say("u1", "Alice", "garden", "Hi!")
self.pm.leave_room("u1", "garden")
events = self.pm.get_room_events("garden")
types = [e["event"] for e in events]
assert "enter" in types
assert "message" in types
assert "leave" in types
# ─── TestConcurrentUsers ─────────────────────────────────────────────
class TestConcurrentUsers:
"""Test multi-user concurrent operations."""
def test_concurrent_chat(self):
pm = mub.PresenceManager()
_patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
_patcher.start()
cl = mub.ChatLog(max_per_room=100)
_patcher.stop()
errors = []
rooms = ["garden", "tower", "library"]
def user_session(user_id, username):
try:
for room in rooms:
pm.enter_room(user_id, username, room)
for i in range(5):
pm.say(user_id, username, room, f"{username}: msg {i}")
cl.log(room, "say", f"{username}: msg {i}", user_id=user_id, username=username)
pm.leave_room(user_id, room)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=user_session, args=(f"u{i}", f"user_{i}"))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
# Verify no cross-contamination: each room should have messages
for room in rooms:
players = pm.get_players_in_room(room)
# All users should have left
assert len(players) == 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])