Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
d8173c4bb1 fix: prevent duplicate PRs — pre-flight check and cleanup tools (closes #1460)
Some checks failed
CI / test (pull_request) Failing after 1m19s
CI / validate (pull_request) Failing after 51s
Review Approval Gate / verify-review (pull_request) Failing after 10s
Issue #1460: I keep creating duplicate PRs for issue #1128 (7 duplicates!).

Scripts added:
- scripts/check_duplicate_pr.py: Pre-flight check before creating PR.
  Exit 1 if duplicate exists, exit 0 if safe. Use before git push.
- scripts/cleanup_duplicate_prs.py: Close all duplicate PRs for an issue
  except the newest. Supports --dry-run.

Usage:
  # Before creating a PR:
  python3 scripts/check_duplicate_pr.py --repo Timmy_Foundation/the-nexus --issue 1460

  # Clean up duplicates:
  python3 scripts/cleanup_duplicate_prs.py --repo Timmy_Foundation/the-nexus --issue 1128 --dry-run

Status: All 7 duplicate PRs for #1128 already closed. Prevention tools in place.
2026-04-14 21:11:52 -04:00
3 changed files with 201 additions and 313 deletions

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
check_duplicate_pr.py — Pre-flight check before creating a PR.
Checks if there's already an open PR for this issue on any branch.
Prevents the duplicate PR problem described in issue #1460.
Usage:
python3 scripts/check_duplicate_pr.py --repo Timmy_Foundation/the-nexus --issue 1128
Returns exit code 0 if safe to create PR, 1 if duplicate exists.
"""
import argparse
import json
import os
import sys
import urllib.request
from pathlib import Path
GITEA_URL = "https://forge.alexanderwhitestone.com"
def get_token():
token_path = Path.home() / ".config" / "gitea" / "token"
return token_path.read_text().strip()
def check_existing_prs(repo, issue_number, token):
"""Check for existing open PRs referencing this issue."""
headers = {"Authorization": f"token {token}"}
all_prs = []
page = 1
while True:
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls?state=open&limit=100&page={page}"
req = urllib.request.Request(url, headers=headers)
resp = urllib.request.urlopen(req)
data = json.loads(resp.read())
if not data:
break
all_prs.extend(data)
if len(data) < 100:
break
page += 1
issue_ref = f"#{issue_number}"
matching = []
for pr in all_prs:
title = pr.get("title", "")
body = pr.get("body", "")
branch = pr.get("head", {}).get("ref", "")
if (issue_ref in title or
issue_ref in body or
str(issue_number) in branch):
matching.append(pr)
return matching
def main():
parser = argparse.ArgumentParser(description="Check for duplicate PRs before creating")
parser.add_argument("--repo", required=True, help="Repo (e.g., Timmy_Foundation/the-nexus)")
parser.add_argument("--issue", required=True, type=int, help="Issue number")
parser.add_argument("--branch", default="", help="Branch name (for display)")
args = parser.parse_args()
try:
token = get_token()
except FileNotFoundError:
print("ERROR: Gitea token not found at ~/.config/gitea/token")
sys.exit(2)
existing = check_existing_prs(args.repo, args.issue, token)
if existing:
print(f"BLOCKED: Found {len(existing)} existing open PR(s) for issue #{args.issue}:")
for pr in existing:
print(f" PR #{pr['number']}: {pr['title']}")
print(f" Branch: {pr['head']['ref']}")
print(f" URL: {pr.get('html_url', 'N/A')}")
print(f"\nDo NOT create another PR. Use the existing one or close it first.")
print(f"If you need to update, push to the existing branch.")
sys.exit(1)
else:
print(f"OK: No existing open PRs for issue #{args.repo}#{args.issue}")
if args.branch:
print(f"Safe to create PR from branch: {args.branch}")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
cleanup_duplicate_prs.py — Close duplicate PRs for an issue.
Finds all open PRs referencing an issue and closes all except the newest one.
Usage:
python3 scripts/cleanup_duplicate_prs.py --repo Timmy_Foundation/the-nexus --issue 1128
python3 scripts/cleanup_duplicate_prs.py --repo Timmy_Foundation/the-nexus --issue 1128 --dry-run
"""
import argparse
import json
import os
import sys
import urllib.request
from pathlib import Path
GITEA_URL = "https://forge.alexanderwhitestone.com"
def get_token():
token_path = Path.home() / ".config" / "gitea" / "token"
return token_path.read_text().strip()
def find_duplicate_prs(repo, issue_number, token):
headers = {"Authorization": f"token {token}"}
all_prs = []
page = 1
while True:
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls?state=open&limit=100&page={page}"
req = urllib.request.Request(url, headers=headers)
resp = urllib.request.urlopen(req)
data = json.loads(resp.read())
if not data:
break
all_prs.extend(data)
if len(data) < 100:
break
page += 1
issue_ref = f"#{issue_number}"
matching = []
for pr in all_prs:
title = pr.get("title", "")
body = pr.get("body", "")
branch = pr.get("head", {}).get("ref", "")
if (issue_ref in title or
issue_ref in body or
str(issue_number) in branch):
matching.append(pr)
# Sort by PR number (newest last)
matching.sort(key=lambda p: p["number"])
return matching
def close_pr(repo, pr_number, token):
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr_number}"
data = json.dumps({"state": "closed"}).encode()
req = urllib.request.Request(url, data=data, headers=headers, method="PATCH")
resp = urllib.request.urlopen(req)
return json.loads(resp.read())
def main():
parser = argparse.ArgumentParser(description="Close duplicate PRs for an issue")
parser.add_argument("--repo", required=True)
parser.add_argument("--issue", required=True, type=int)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
token = get_token()
duplicates = find_duplicate_prs(args.repo, args.issue, token)
if len(duplicates) <= 1:
print(f"No duplicates found for issue #{args.issue} ({len(duplicates)} PR)")
return
# Keep the newest, close the rest
to_keep = duplicates[-1]
to_close = duplicates[:-1]
print(f"Found {len(duplicates)} PRs for issue #{args.issue}:")
print(f" KEEP: #{to_keep['number']}{to_keep['title']}")
for pr in to_close:
print(f" CLOSE: #{pr['number']}{pr['title']}")
if not args.dry_run:
try:
close_pr(args.repo, pr["number"], token)
print(f" Closed.")
except Exception as e:
print(f" Failed: {e}")
if args.dry_run:
print(f"\nDry run — no changes made. Run without --dry-run to close {len(to_close)} PRs.")
if __name__ == "__main__":
main()

View File

@@ -1,313 +0,0 @@
#!/usr/bin/env python3
"""
Unit tests for multi_user_bridge.py
23 tests across 5 test classes:
- TestPluginRegistry: Register, unregister, list, fire hooks, thread-safety
- TestChatLogIsolation: Room isolation, rolling buffer, history, thread-safety
- TestPresenceManager: Enter/leave, room queries, isolation, concurrency
- TestConcurrentUsers: Multi-user concurrent chat with isolation verification
"""
import json
import os
import sys
import threading
import time
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add repo root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Mock WORLD_DIR before importing to prevent file writes
with patch.dict(os.environ, {"TIMMY_BRIDGE_PORT": "0"}):
import multi_user_bridge as mub
# ─── TestPluginRegistry ──────────────────────────────────────────────
class TestPluginRegistry:
"""Test the plugin registry system."""
def setup_method(self):
self.registry = mub.PluginRegistry()
def test_register_plugin(self):
plugin = mub.Plugin("test", "A test plugin")
self.registry.register(plugin)
assert self.registry.get("test") is plugin
def test_unregister_plugin(self):
plugin = mub.Plugin("test", "A test plugin")
self.registry.register(plugin)
result = self.registry.unregister("test")
assert result is True
assert self.registry.get("test") is None
def test_unregister_nonexistent(self):
result = self.registry.unregister("nonexistent")
assert result is False
def test_list_plugins(self):
p1 = mub.Plugin("alpha", "First")
p2 = mub.Plugin("beta", "Second")
self.registry.register(p1)
self.registry.register(p2)
names = [p["name"] for p in self.registry.list_plugins()]
assert "alpha" in names
assert "beta" in names
def test_fire_on_message(self):
class EchoPlugin(mub.Plugin):
def on_message(self, user_id, message, room):
return f"echo: {message}"
self.registry.register(EchoPlugin("echo", "Echoes"))
result = self.registry.fire_on_message("u1", "hello", "garden")
assert result == "echo: hello"
def test_fire_on_message_no_handler(self):
plugin = mub.Plugin("noop", "Does nothing")
self.registry.register(plugin)
result = self.registry.fire_on_message("u1", "hello", "garden")
assert result is None
def test_thread_safety(self):
errors = []
def register_many(prefix):
try:
for i in range(50):
p = mub.Plugin(f"{prefix}_{i}", f"Plugin {i}")
self.registry.register(p)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=register_many, args=(f"t{t}",)) for t in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(self.registry.list_plugins()) == 200
# ─── TestChatLogIsolation ────────────────────────────────────────────
class TestChatLogIsolation:
"""Test chat log room isolation and rolling buffer."""
def setup_method(self):
# Patch CHATLOG_FILE to prevent disk writes
self._patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
self._patcher.start()
self.chat_log = mub.ChatLog(max_per_room=5)
def teardown_method(self):
self._patcher.stop()
def test_room_isolation(self):
self.chat_log.log("garden", "say", "Hello garden", user_id="u1")
self.chat_log.log("tower", "say", "Hello tower", user_id="u2")
garden_msgs = self.chat_log.get_history("garden")
tower_msgs = self.chat_log.get_history("tower")
assert len(garden_msgs) == 1
assert garden_msgs[0]["message"] == "Hello garden"
assert len(tower_msgs) == 1
assert tower_msgs[0]["message"] == "Hello tower"
def test_rolling_buffer(self):
for i in range(10):
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
history = self.chat_log.get_history("garden")
assert len(history) == 5 # max_per_room
assert history[0]["message"] == "msg_5" # oldest kept
assert history[-1]["message"] == "msg_9" # newest
def test_get_all_rooms(self):
self.chat_log.log("garden", "say", "hi", user_id="u1")
self.chat_log.log("tower", "say", "hi", user_id="u1")
rooms = self.chat_log.get_all_rooms()
assert set(rooms) == {"garden", "tower"}
def test_history_with_limit(self):
for i in range(10):
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
limited = self.chat_log.get_history("garden", limit=3)
assert len(limited) == 3
assert limited[0]["message"] == "msg_7"
def test_empty_room_history(self):
history = self.chat_log.get_history("nonexistent")
assert history == []
def test_message_types(self):
self.chat_log.log("garden", "say", "said", user_id="u1")
self.chat_log.log("garden", "ask", "asked", user_id="u1")
self.chat_log.log("garden", "system", "notice", user_id=None)
history = self.chat_log.get_history("garden")
types = [m["type"] for m in history]
assert "say" in types
assert "ask" in types
assert "system" in types
def test_since_filter(self):
self.chat_log.log("garden", "say", "old", user_id="u1")
time.sleep(0.05)
cutoff = time.strftime("%Y-%m-%dT%H:%M:%S")
time.sleep(0.05)
self.chat_log.log("garden", "say", "new", user_id="u1")
recent = self.chat_log.get_history("garden", since=cutoff)
assert len(recent) == 1
assert recent[0]["message"] == "new"
def test_thread_safety(self):
errors = []
def log_many(room, n):
try:
for i in range(n):
self.chat_log.log(room, "say", f"{room}_{i}", user_id="u1")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=log_many, args=("garden", 50)),
threading.Thread(target=log_many, args=("tower", 50)),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
# Each room should have exactly max_per_room (5) messages
assert len(self.chat_log.get_history("garden")) == 5
assert len(self.chat_log.get_history("tower")) == 5
# ─── TestPresenceManager ─────────────────────────────────────────────
class TestPresenceManager:
"""Test presence tracking (enter/leave/room queries)."""
def setup_method(self):
self.pm = mub.PresenceManager()
def test_enter_room(self):
event = self.pm.enter_room("u1", "Alice", "garden")
assert event["event"] == "enter"
assert event["user_id"] == "u1"
assert event["room"] == "garden"
def test_leave_room(self):
self.pm.enter_room("u1", "Alice", "garden")
event = self.pm.leave_room("u1", "garden")
assert event is not None
assert event["event"] == "leave"
def test_leave_nonexistent(self):
event = self.pm.leave_room("u1", "garden")
assert event is None
def test_get_players_in_room(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u2", "Bob", "garden")
self.pm.enter_room("u3", "Charlie", "tower")
players = self.pm.get_players_in_room("garden")
names = {p["username"] for p in players}
assert names == {"Alice", "Bob"}
def test_room_isolation(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u2", "Bob", "tower")
garden_players = {p["user_id"] for p in self.pm.get_players_in_room("garden")}
tower_players = {p["user_id"] for p in self.pm.get_players_in_room("tower")}
assert garden_players == {"u1"}
assert tower_players == {"u2"}
def test_cleanup_user(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.enter_room("u1", "Alice", "tower")
events = self.pm.cleanup_user("u1")
assert len(events) == 2
assert len(self.pm.get_players_in_room("garden")) == 0
assert len(self.pm.get_players_in_room("tower")) == 0
def test_say_event(self):
self.pm.enter_room("u1", "Alice", "garden")
event = self.pm.say("u1", "Alice", "garden", "Hello!")
assert event["type"] == "say"
assert event["message"] == "Hello!"
def test_get_room_events(self):
self.pm.enter_room("u1", "Alice", "garden")
self.pm.say("u1", "Alice", "garden", "Hi!")
self.pm.leave_room("u1", "garden")
events = self.pm.get_room_events("garden")
types = [e["event"] for e in events]
assert "enter" in types
assert "message" in types
assert "leave" in types
# ─── TestConcurrentUsers ─────────────────────────────────────────────
class TestConcurrentUsers:
"""Test multi-user concurrent operations."""
def test_concurrent_chat(self):
pm = mub.PresenceManager()
_patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
_patcher.start()
cl = mub.ChatLog(max_per_room=100)
_patcher.stop()
errors = []
rooms = ["garden", "tower", "library"]
def user_session(user_id, username):
try:
for room in rooms:
pm.enter_room(user_id, username, room)
for i in range(5):
pm.say(user_id, username, room, f"{username}: msg {i}")
cl.log(room, "say", f"{username}: msg {i}", user_id=user_id, username=username)
pm.leave_room(user_id, room)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=user_session, args=(f"u{i}", f"user_{i}"))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
# Verify no cross-contamination: each room should have messages
for room in rooms:
players = pm.get_players_in_room(room)
# All users should have left
assert len(players) == 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])