Compare commits
1 Commits
fix/issue-
...
feat/1541-
| Author | SHA1 | Date | |
|---|---|---|---|
| 64a46e6b37 |
@@ -1,135 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# claim-issue.sh — Claim a Gitea issue with duplicate-PR detection
|
||||
#
|
||||
# Before an agent starts work on an issue, this script checks:
|
||||
# 1. Is the issue already assigned?
|
||||
# 2. Do open PRs already reference this issue?
|
||||
# 3. Is the issue closed?
|
||||
#
|
||||
# Only proceeds to assign if all checks pass.
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/claim-issue.sh <issue_number> [repo] [assignee]
|
||||
#
|
||||
# Exit codes:
|
||||
# 0 — Claimed successfully
|
||||
# 1 — BLOCKED (duplicate PR exists, already assigned, or issue closed)
|
||||
# 2 — Error (missing args, API failure)
|
||||
#
|
||||
# Issue #1492: Duplicate-PR detection in agent claim workflow.
|
||||
# Issue #1480: The meta-problem this prevents.
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
set -euo pipefail
|
||||
|
||||
ISSUE_NUM="${1:-}"
|
||||
REPO="${2:-Timmy_Foundation/the-nexus}"
|
||||
ASSIGNEE="${3:-timmy}"
|
||||
|
||||
if [ -z "$ISSUE_NUM" ]; then
|
||||
echo "Usage: $0 <issue_number> [repo] [assignee]"
|
||||
echo "Example: $0 1128"
|
||||
echo " $0 1339 Timmy_Foundation/the-nexus allegro"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
|
||||
GITEA_TOKEN="${GITEA_TOKEN:-}"
|
||||
if [ -z "$GITEA_TOKEN" ]; then
|
||||
TOKEN_FILE="${HOME}/.config/gitea/token"
|
||||
if [ -f "$TOKEN_FILE" ]; then
|
||||
GITEA_TOKEN=$(cat "$TOKEN_FILE" | tr -d '[:space:]')
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$GITEA_TOKEN" ]; then
|
||||
echo "Error: No GITEA_TOKEN. Set env var or create ~/.config/gitea/token"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
API="$GITEA_URL/api/v1"
|
||||
AUTH="Authorization: token $GITEA_TOKEN"
|
||||
|
||||
log() { echo "[$(date -u +%H:%M:%S)] $*"; }
|
||||
|
||||
echo "═══ Claim Issue #$ISSUE_NUM ═══"
|
||||
echo ""
|
||||
|
||||
# ── Step 1: Fetch the issue ──────────────────────────────────
|
||||
ISSUE=$(curl -s -H "$AUTH" "$API/repos/$REPO/issues/$ISSUE_NUM")
|
||||
|
||||
if echo "$ISSUE" | jq -e '.message' > /dev/null 2>&1; then
|
||||
ERROR=$(echo "$ISSUE" | jq -r '.message')
|
||||
echo "✗ Error fetching issue: $ERROR"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
ISSUE_STATE=$(echo "$ISSUE" | jq -r '.state')
|
||||
ISSUE_TITLE=$(echo "$ISSUE" | jq -r '.title')
|
||||
ISSUE_ASSIGNEES=$(echo "$ISSUE" | jq -r '.assignees // [] | map(.login) | join(", ")')
|
||||
|
||||
echo "Issue: #$ISSUE_NUM — $ISSUE_TITLE"
|
||||
echo "State: $ISSUE_STATE"
|
||||
echo "Assignees: ${ISSUE_ASSIGNEES:-none}"
|
||||
echo ""
|
||||
|
||||
# ── Step 2: Check if issue is CLOSED ────────────────────────
|
||||
if [ "$ISSUE_STATE" = "closed" ]; then
|
||||
echo "✗ BLOCKED: Issue #$ISSUE_NUM is CLOSED."
|
||||
echo " Do not work on closed issues."
|
||||
exit 1
|
||||
fi
|
||||
log "✓ Issue is open"
|
||||
|
||||
# ── Step 3: Check if already assigned to someone else ───────
|
||||
if [ -n "$ISSUE_ASSIGNEES" ] && [ "$ISSUE_ASSIGNEES" != "null" ]; then
|
||||
if echo "$ISSUE_ASSIGNEES" | grep -qi "$ASSIGNEE"; then
|
||||
log "✓ Already assigned to $ASSIGNEE — proceeding"
|
||||
else
|
||||
echo "✗ BLOCKED: Issue #$ISSUE_NUM is assigned to: $ISSUE_ASSIGNEES"
|
||||
echo " Not assigned to $ASSIGNEE. Do not work on others' issues."
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
log "✓ Issue is unassigned"
|
||||
fi
|
||||
|
||||
# ── Step 4: Check for existing open PRs ─────────────────────
|
||||
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100")
|
||||
|
||||
ISSUE_STR="#$ISSUE_NUM"
|
||||
DUPLICATES=$(echo "$OPEN_PRS" | jq -r ".[] | select(.title | test(\"$ISSUE_STR\"; \"i\") or (.body // \"\") | test(\"$ISSUE_STR\"; \"i\")) | \" PR #\\(.number): \\(.title) [\\(.head.ref)] (\\(.created_at[:10]))\"")
|
||||
|
||||
if [ -n "$DUPLICATES" ]; then
|
||||
echo "✗ BLOCKED: Open PRs already exist for issue #$ISSUE_NUM:"
|
||||
echo ""
|
||||
echo "$DUPLICATES"
|
||||
echo ""
|
||||
echo "Options:"
|
||||
echo " 1. Review and merge an existing PR"
|
||||
echo " 2. Close duplicates: ./scripts/cleanup-duplicate-prs.sh --close"
|
||||
echo " 3. Push to an existing branch"
|
||||
echo ""
|
||||
echo "Do NOT create a new PR. See #1492."
|
||||
exit 1
|
||||
fi
|
||||
log "✓ No existing open PRs"
|
||||
|
||||
# ── Step 5: Assign the issue ────────────────────────────────
|
||||
log "Assigning issue #$ISSUE_NUM to $ASSIGNEE..."
|
||||
|
||||
ASSIGN_RESULT=$(curl -s -X POST -H "$AUTH" -H "Content-Type: application/json" \
|
||||
-d "{\"assignees\":[\"$ASSIGNEE\"]}" \
|
||||
"$API/repos/$REPO/issues/$ISSUE_NUM/assignees")
|
||||
|
||||
if echo "$ASSIGN_RESULT" | jq -e '.number' > /dev/null 2>&1; then
|
||||
echo ""
|
||||
echo "✓ CLAIMED: Issue #$ISSUE_NUM assigned to $ASSIGNEE"
|
||||
echo " Safe to proceed with implementation."
|
||||
exit 0
|
||||
else
|
||||
ERROR=$(echo "$ASSIGN_RESULT" | jq -r '.message // "unknown error"')
|
||||
echo "⚠ Issue passed all checks but assignment failed: $ERROR"
|
||||
echo " Proceed with caution — another agent may claim this."
|
||||
exit 0
|
||||
fi
|
||||
@@ -1,135 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
claim_issue.py — Claim a Gitea issue with duplicate-PR detection.
|
||||
|
||||
Before an agent starts work, checks:
|
||||
1. Is the issue open?
|
||||
2. Is it already assigned to someone else?
|
||||
3. Do open PRs already reference this issue?
|
||||
|
||||
Only assigns if all checks pass.
|
||||
|
||||
Usage:
|
||||
python3 scripts/claim_issue.py 1492
|
||||
python3 scripts/claim_issue.py 1492 Timmy_Foundation/the-nexus allegro
|
||||
|
||||
Exit codes:
|
||||
0 — Claimed (or safe to proceed)
|
||||
1 — BLOCKED (duplicate PR, assigned to other, or issue closed)
|
||||
2 — Error
|
||||
|
||||
Issue #1492: Duplicate-PR detection in agent claim workflow.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
|
||||
def claim_issue(issue_num: int, repo: str = "Timmy_Foundation/the-nexus",
|
||||
assignee: str = "timmy", token: str = None) -> dict:
|
||||
"""Claim an issue with duplicate-PR detection.
|
||||
|
||||
Returns dict with:
|
||||
claimed (bool): True if safe to proceed
|
||||
reason (str): Why blocked or claimed
|
||||
existing_prs (list): Any existing PRs for this issue
|
||||
"""
|
||||
gitea_url = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
token = token or os.environ.get("GITEA_TOKEN", "")
|
||||
if not token:
|
||||
token_path = os.path.expanduser("~/.config/gitea/token")
|
||||
if os.path.exists(token_path):
|
||||
token = open(token_path).read().strip()
|
||||
|
||||
if not token:
|
||||
return {"claimed": False, "reason": "No GITEA_TOKEN", "existing_prs": []}
|
||||
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
api = f"{gitea_url}/api/v1/repos/{repo}"
|
||||
|
||||
# Fetch issue
|
||||
try:
|
||||
req = urllib.request.Request(f"{api}/issues/{issue_num}", headers=headers)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
issue = json.loads(resp.read())
|
||||
except Exception as e:
|
||||
return {"claimed": False, "reason": f"API error: {e}", "existing_prs": []}
|
||||
|
||||
# Check state
|
||||
if issue.get("state") == "closed":
|
||||
return {"claimed": False, "reason": f"Issue #{issue_num} is CLOSED", "existing_prs": []}
|
||||
|
||||
# Check assignees
|
||||
assignees = [a["login"] for a in (issue.get("assignees") or [])]
|
||||
if assignees and assignee not in assignees:
|
||||
return {"claimed": False,
|
||||
"reason": f"Assigned to {', '.join(assignees)}, not {assignee}",
|
||||
"existing_prs": []}
|
||||
|
||||
# Check for existing PRs
|
||||
try:
|
||||
req = urllib.request.Request(f"{api}/pulls?state=open&limit=100", headers=headers)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
prs = json.loads(resp.read())
|
||||
except Exception:
|
||||
prs = []
|
||||
|
||||
issue_str = f"#{issue_num}"
|
||||
matches = []
|
||||
for pr in prs:
|
||||
title = pr.get("title", "")
|
||||
body = pr.get("body") or ""
|
||||
if issue_str in title or issue_str in body:
|
||||
matches.append({
|
||||
"number": pr["number"],
|
||||
"title": title,
|
||||
"branch": pr["head"]["ref"],
|
||||
"created": pr["created_at"][:10],
|
||||
})
|
||||
|
||||
if matches:
|
||||
lines = [f"BLOCKED: {len(matches)} existing PR(s) for #{issue_num}:"]
|
||||
for m in matches:
|
||||
lines.append(f" PR #{m['number']}: {m['title']} [{m['branch']}]")
|
||||
return {"claimed": False, "reason": "\n".join(lines), "existing_prs": matches}
|
||||
|
||||
# All checks passed — assign
|
||||
try:
|
||||
data = json.dumps({"assignees": [assignee]}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{api}/issues/{issue_num}/assignees",
|
||||
data=data, headers={**headers, "Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=10)
|
||||
return {"claimed": True,
|
||||
"reason": f"Issue #{issue_num} claimed by {assignee}",
|
||||
"existing_prs": []}
|
||||
except Exception as e:
|
||||
return {"claimed": True,
|
||||
"reason": f"Checks passed but assignment failed: {e}",
|
||||
"existing_prs": []}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: claim_issue.py <issue_number> [repo] [assignee]")
|
||||
print("Example: claim_issue.py 1492")
|
||||
print(" claim_issue.py 1339 Timmy_Foundation/the-nexus allegro")
|
||||
sys.exit(2)
|
||||
|
||||
issue_num = int(sys.argv[1])
|
||||
repo = sys.argv[2] if len(sys.argv) > 2 else "Timmy_Foundation/the-nexus"
|
||||
assignee = sys.argv[3] if len(sys.argv) > 3 else "timmy"
|
||||
|
||||
result = claim_issue(issue_num, repo, assignee)
|
||||
print(result["reason"])
|
||||
|
||||
if not result["claimed"]:
|
||||
sys.exit(1)
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
313
tests/test_multi_user_bridge.py
Normal file
313
tests/test_multi_user_bridge.py
Normal file
@@ -0,0 +1,313 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for multi_user_bridge.py
|
||||
|
||||
23 tests across 5 test classes:
|
||||
- TestPluginRegistry: Register, unregister, list, fire hooks, thread-safety
|
||||
- TestChatLogIsolation: Room isolation, rolling buffer, history, thread-safety
|
||||
- TestPresenceManager: Enter/leave, room queries, isolation, concurrency
|
||||
- TestConcurrentUsers: Multi-user concurrent chat with isolation verification
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Add repo root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
# Mock WORLD_DIR before importing to prevent file writes
|
||||
with patch.dict(os.environ, {"TIMMY_BRIDGE_PORT": "0"}):
|
||||
import multi_user_bridge as mub
|
||||
|
||||
|
||||
# ─── TestPluginRegistry ──────────────────────────────────────────────
|
||||
|
||||
class TestPluginRegistry:
|
||||
"""Test the plugin registry system."""
|
||||
|
||||
def setup_method(self):
|
||||
self.registry = mub.PluginRegistry()
|
||||
|
||||
def test_register_plugin(self):
|
||||
plugin = mub.Plugin("test", "A test plugin")
|
||||
self.registry.register(plugin)
|
||||
assert self.registry.get("test") is plugin
|
||||
|
||||
def test_unregister_plugin(self):
|
||||
plugin = mub.Plugin("test", "A test plugin")
|
||||
self.registry.register(plugin)
|
||||
result = self.registry.unregister("test")
|
||||
assert result is True
|
||||
assert self.registry.get("test") is None
|
||||
|
||||
def test_unregister_nonexistent(self):
|
||||
result = self.registry.unregister("nonexistent")
|
||||
assert result is False
|
||||
|
||||
def test_list_plugins(self):
|
||||
p1 = mub.Plugin("alpha", "First")
|
||||
p2 = mub.Plugin("beta", "Second")
|
||||
self.registry.register(p1)
|
||||
self.registry.register(p2)
|
||||
names = [p["name"] for p in self.registry.list_plugins()]
|
||||
assert "alpha" in names
|
||||
assert "beta" in names
|
||||
|
||||
def test_fire_on_message(self):
|
||||
class EchoPlugin(mub.Plugin):
|
||||
def on_message(self, user_id, message, room):
|
||||
return f"echo: {message}"
|
||||
|
||||
self.registry.register(EchoPlugin("echo", "Echoes"))
|
||||
result = self.registry.fire_on_message("u1", "hello", "garden")
|
||||
assert result == "echo: hello"
|
||||
|
||||
def test_fire_on_message_no_handler(self):
|
||||
plugin = mub.Plugin("noop", "Does nothing")
|
||||
self.registry.register(plugin)
|
||||
result = self.registry.fire_on_message("u1", "hello", "garden")
|
||||
assert result is None
|
||||
|
||||
def test_thread_safety(self):
|
||||
errors = []
|
||||
|
||||
def register_many(prefix):
|
||||
try:
|
||||
for i in range(50):
|
||||
p = mub.Plugin(f"{prefix}_{i}", f"Plugin {i}")
|
||||
self.registry.register(p)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=register_many, args=(f"t{t}",)) for t in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
assert len(errors) == 0
|
||||
assert len(self.registry.list_plugins()) == 200
|
||||
|
||||
|
||||
# ─── TestChatLogIsolation ────────────────────────────────────────────
|
||||
|
||||
class TestChatLogIsolation:
|
||||
"""Test chat log room isolation and rolling buffer."""
|
||||
|
||||
def setup_method(self):
|
||||
# Patch CHATLOG_FILE to prevent disk writes
|
||||
self._patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
|
||||
self._patcher.start()
|
||||
self.chat_log = mub.ChatLog(max_per_room=5)
|
||||
|
||||
def teardown_method(self):
|
||||
self._patcher.stop()
|
||||
|
||||
def test_room_isolation(self):
|
||||
self.chat_log.log("garden", "say", "Hello garden", user_id="u1")
|
||||
self.chat_log.log("tower", "say", "Hello tower", user_id="u2")
|
||||
|
||||
garden_msgs = self.chat_log.get_history("garden")
|
||||
tower_msgs = self.chat_log.get_history("tower")
|
||||
|
||||
assert len(garden_msgs) == 1
|
||||
assert garden_msgs[0]["message"] == "Hello garden"
|
||||
assert len(tower_msgs) == 1
|
||||
assert tower_msgs[0]["message"] == "Hello tower"
|
||||
|
||||
def test_rolling_buffer(self):
|
||||
for i in range(10):
|
||||
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
|
||||
|
||||
history = self.chat_log.get_history("garden")
|
||||
assert len(history) == 5 # max_per_room
|
||||
assert history[0]["message"] == "msg_5" # oldest kept
|
||||
assert history[-1]["message"] == "msg_9" # newest
|
||||
|
||||
def test_get_all_rooms(self):
|
||||
self.chat_log.log("garden", "say", "hi", user_id="u1")
|
||||
self.chat_log.log("tower", "say", "hi", user_id="u1")
|
||||
rooms = self.chat_log.get_all_rooms()
|
||||
assert set(rooms) == {"garden", "tower"}
|
||||
|
||||
def test_history_with_limit(self):
|
||||
for i in range(10):
|
||||
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
|
||||
|
||||
limited = self.chat_log.get_history("garden", limit=3)
|
||||
assert len(limited) == 3
|
||||
assert limited[0]["message"] == "msg_7"
|
||||
|
||||
def test_empty_room_history(self):
|
||||
history = self.chat_log.get_history("nonexistent")
|
||||
assert history == []
|
||||
|
||||
def test_message_types(self):
|
||||
self.chat_log.log("garden", "say", "said", user_id="u1")
|
||||
self.chat_log.log("garden", "ask", "asked", user_id="u1")
|
||||
self.chat_log.log("garden", "system", "notice", user_id=None)
|
||||
|
||||
history = self.chat_log.get_history("garden")
|
||||
types = [m["type"] for m in history]
|
||||
assert "say" in types
|
||||
assert "ask" in types
|
||||
assert "system" in types
|
||||
|
||||
def test_since_filter(self):
|
||||
self.chat_log.log("garden", "say", "old", user_id="u1")
|
||||
time.sleep(0.05)
|
||||
cutoff = time.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
time.sleep(0.05)
|
||||
self.chat_log.log("garden", "say", "new", user_id="u1")
|
||||
|
||||
recent = self.chat_log.get_history("garden", since=cutoff)
|
||||
assert len(recent) == 1
|
||||
assert recent[0]["message"] == "new"
|
||||
|
||||
def test_thread_safety(self):
|
||||
errors = []
|
||||
|
||||
def log_many(room, n):
|
||||
try:
|
||||
for i in range(n):
|
||||
self.chat_log.log(room, "say", f"{room}_{i}", user_id="u1")
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=log_many, args=("garden", 50)),
|
||||
threading.Thread(target=log_many, args=("tower", 50)),
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
# Each room should have exactly max_per_room (5) messages
|
||||
assert len(self.chat_log.get_history("garden")) == 5
|
||||
assert len(self.chat_log.get_history("tower")) == 5
|
||||
|
||||
|
||||
# ─── TestPresenceManager ─────────────────────────────────────────────
|
||||
|
||||
class TestPresenceManager:
|
||||
"""Test presence tracking (enter/leave/room queries)."""
|
||||
|
||||
def setup_method(self):
|
||||
self.pm = mub.PresenceManager()
|
||||
|
||||
def test_enter_room(self):
|
||||
event = self.pm.enter_room("u1", "Alice", "garden")
|
||||
assert event["event"] == "enter"
|
||||
assert event["user_id"] == "u1"
|
||||
assert event["room"] == "garden"
|
||||
|
||||
def test_leave_room(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
event = self.pm.leave_room("u1", "garden")
|
||||
assert event is not None
|
||||
assert event["event"] == "leave"
|
||||
|
||||
def test_leave_nonexistent(self):
|
||||
event = self.pm.leave_room("u1", "garden")
|
||||
assert event is None
|
||||
|
||||
def test_get_players_in_room(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u2", "Bob", "garden")
|
||||
self.pm.enter_room("u3", "Charlie", "tower")
|
||||
|
||||
players = self.pm.get_players_in_room("garden")
|
||||
names = {p["username"] for p in players}
|
||||
assert names == {"Alice", "Bob"}
|
||||
|
||||
def test_room_isolation(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u2", "Bob", "tower")
|
||||
|
||||
garden_players = {p["user_id"] for p in self.pm.get_players_in_room("garden")}
|
||||
tower_players = {p["user_id"] for p in self.pm.get_players_in_room("tower")}
|
||||
|
||||
assert garden_players == {"u1"}
|
||||
assert tower_players == {"u2"}
|
||||
|
||||
def test_cleanup_user(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u1", "Alice", "tower")
|
||||
|
||||
events = self.pm.cleanup_user("u1")
|
||||
assert len(events) == 2
|
||||
|
||||
assert len(self.pm.get_players_in_room("garden")) == 0
|
||||
assert len(self.pm.get_players_in_room("tower")) == 0
|
||||
|
||||
def test_say_event(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
event = self.pm.say("u1", "Alice", "garden", "Hello!")
|
||||
assert event["type"] == "say"
|
||||
assert event["message"] == "Hello!"
|
||||
|
||||
def test_get_room_events(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.say("u1", "Alice", "garden", "Hi!")
|
||||
self.pm.leave_room("u1", "garden")
|
||||
|
||||
events = self.pm.get_room_events("garden")
|
||||
types = [e["event"] for e in events]
|
||||
assert "enter" in types
|
||||
assert "message" in types
|
||||
assert "leave" in types
|
||||
|
||||
|
||||
# ─── TestConcurrentUsers ─────────────────────────────────────────────
|
||||
|
||||
class TestConcurrentUsers:
|
||||
"""Test multi-user concurrent operations."""
|
||||
|
||||
def test_concurrent_chat(self):
|
||||
pm = mub.PresenceManager()
|
||||
_patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
|
||||
_patcher.start()
|
||||
cl = mub.ChatLog(max_per_room=100)
|
||||
_patcher.stop()
|
||||
|
||||
errors = []
|
||||
rooms = ["garden", "tower", "library"]
|
||||
|
||||
def user_session(user_id, username):
|
||||
try:
|
||||
for room in rooms:
|
||||
pm.enter_room(user_id, username, room)
|
||||
for i in range(5):
|
||||
pm.say(user_id, username, room, f"{username}: msg {i}")
|
||||
cl.log(room, "say", f"{username}: msg {i}", user_id=user_id, username=username)
|
||||
pm.leave_room(user_id, room)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=user_session, args=(f"u{i}", f"user_{i}"))
|
||||
for i in range(10)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
|
||||
# Verify no cross-contamination: each room should have messages
|
||||
for room in rooms:
|
||||
players = pm.get_players_in_room(room)
|
||||
# All users should have left
|
||||
assert len(players) == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user