Compare commits
1 Commits
feat/1541-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8173c4bb1 |
95
scripts/check_duplicate_pr.py
Normal file
95
scripts/check_duplicate_pr.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
check_duplicate_pr.py — Pre-flight check before creating a PR.
|
||||
|
||||
Checks if there's already an open PR for this issue on any branch.
|
||||
Prevents the duplicate PR problem described in issue #1460.
|
||||
|
||||
Usage:
|
||||
python3 scripts/check_duplicate_pr.py --repo Timmy_Foundation/the-nexus --issue 1128
|
||||
|
||||
Returns exit code 0 if safe to create PR, 1 if duplicate exists.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
||||
|
||||
|
||||
def get_token():
|
||||
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||
return token_path.read_text().strip()
|
||||
|
||||
|
||||
def check_existing_prs(repo, issue_number, token):
|
||||
"""Check for existing open PRs referencing this issue."""
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
|
||||
all_prs = []
|
||||
page = 1
|
||||
while True:
|
||||
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls?state=open&limit=100&page={page}"
|
||||
req = urllib.request.Request(url, headers=headers)
|
||||
resp = urllib.request.urlopen(req)
|
||||
data = json.loads(resp.read())
|
||||
if not data:
|
||||
break
|
||||
all_prs.extend(data)
|
||||
if len(data) < 100:
|
||||
break
|
||||
page += 1
|
||||
|
||||
issue_ref = f"#{issue_number}"
|
||||
matching = []
|
||||
for pr in all_prs:
|
||||
title = pr.get("title", "")
|
||||
body = pr.get("body", "")
|
||||
branch = pr.get("head", {}).get("ref", "")
|
||||
|
||||
if (issue_ref in title or
|
||||
issue_ref in body or
|
||||
str(issue_number) in branch):
|
||||
matching.append(pr)
|
||||
|
||||
return matching
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Check for duplicate PRs before creating")
|
||||
parser.add_argument("--repo", required=True, help="Repo (e.g., Timmy_Foundation/the-nexus)")
|
||||
parser.add_argument("--issue", required=True, type=int, help="Issue number")
|
||||
parser.add_argument("--branch", default="", help="Branch name (for display)")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
token = get_token()
|
||||
except FileNotFoundError:
|
||||
print("ERROR: Gitea token not found at ~/.config/gitea/token")
|
||||
sys.exit(2)
|
||||
|
||||
existing = check_existing_prs(args.repo, args.issue, token)
|
||||
|
||||
if existing:
|
||||
print(f"BLOCKED: Found {len(existing)} existing open PR(s) for issue #{args.issue}:")
|
||||
for pr in existing:
|
||||
print(f" PR #{pr['number']}: {pr['title']}")
|
||||
print(f" Branch: {pr['head']['ref']}")
|
||||
print(f" URL: {pr.get('html_url', 'N/A')}")
|
||||
print(f"\nDo NOT create another PR. Use the existing one or close it first.")
|
||||
print(f"If you need to update, push to the existing branch.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print(f"OK: No existing open PRs for issue #{args.repo}#{args.issue}")
|
||||
if args.branch:
|
||||
print(f"Safe to create PR from branch: {args.branch}")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
106
scripts/cleanup_duplicate_prs.py
Normal file
106
scripts/cleanup_duplicate_prs.py
Normal file
@@ -0,0 +1,106 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
cleanup_duplicate_prs.py — Close duplicate PRs for an issue.
|
||||
|
||||
Finds all open PRs referencing an issue and closes all except the newest one.
|
||||
|
||||
Usage:
|
||||
python3 scripts/cleanup_duplicate_prs.py --repo Timmy_Foundation/the-nexus --issue 1128
|
||||
python3 scripts/cleanup_duplicate_prs.py --repo Timmy_Foundation/the-nexus --issue 1128 --dry-run
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
||||
|
||||
|
||||
def get_token():
|
||||
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||
return token_path.read_text().strip()
|
||||
|
||||
|
||||
def find_duplicate_prs(repo, issue_number, token):
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
|
||||
all_prs = []
|
||||
page = 1
|
||||
while True:
|
||||
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls?state=open&limit=100&page={page}"
|
||||
req = urllib.request.Request(url, headers=headers)
|
||||
resp = urllib.request.urlopen(req)
|
||||
data = json.loads(resp.read())
|
||||
if not data:
|
||||
break
|
||||
all_prs.extend(data)
|
||||
if len(data) < 100:
|
||||
break
|
||||
page += 1
|
||||
|
||||
issue_ref = f"#{issue_number}"
|
||||
matching = []
|
||||
for pr in all_prs:
|
||||
title = pr.get("title", "")
|
||||
body = pr.get("body", "")
|
||||
branch = pr.get("head", {}).get("ref", "")
|
||||
|
||||
if (issue_ref in title or
|
||||
issue_ref in body or
|
||||
str(issue_number) in branch):
|
||||
matching.append(pr)
|
||||
|
||||
# Sort by PR number (newest last)
|
||||
matching.sort(key=lambda p: p["number"])
|
||||
return matching
|
||||
|
||||
|
||||
def close_pr(repo, pr_number, token):
|
||||
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
||||
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr_number}"
|
||||
data = json.dumps({"state": "closed"}).encode()
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method="PATCH")
|
||||
resp = urllib.request.urlopen(req)
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Close duplicate PRs for an issue")
|
||||
parser.add_argument("--repo", required=True)
|
||||
parser.add_argument("--issue", required=True, type=int)
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
token = get_token()
|
||||
duplicates = find_duplicate_prs(args.repo, args.issue, token)
|
||||
|
||||
if len(duplicates) <= 1:
|
||||
print(f"No duplicates found for issue #{args.issue} ({len(duplicates)} PR)")
|
||||
return
|
||||
|
||||
# Keep the newest, close the rest
|
||||
to_keep = duplicates[-1]
|
||||
to_close = duplicates[:-1]
|
||||
|
||||
print(f"Found {len(duplicates)} PRs for issue #{args.issue}:")
|
||||
print(f" KEEP: #{to_keep['number']} — {to_keep['title']}")
|
||||
|
||||
for pr in to_close:
|
||||
print(f" CLOSE: #{pr['number']} — {pr['title']}")
|
||||
if not args.dry_run:
|
||||
try:
|
||||
close_pr(args.repo, pr["number"], token)
|
||||
print(f" Closed.")
|
||||
except Exception as e:
|
||||
print(f" Failed: {e}")
|
||||
|
||||
if args.dry_run:
|
||||
print(f"\nDry run — no changes made. Run without --dry-run to close {len(to_close)} PRs.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,313 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for multi_user_bridge.py
|
||||
|
||||
23 tests across 5 test classes:
|
||||
- TestPluginRegistry: Register, unregister, list, fire hooks, thread-safety
|
||||
- TestChatLogIsolation: Room isolation, rolling buffer, history, thread-safety
|
||||
- TestPresenceManager: Enter/leave, room queries, isolation, concurrency
|
||||
- TestConcurrentUsers: Multi-user concurrent chat with isolation verification
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Add repo root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
# Mock WORLD_DIR before importing to prevent file writes
|
||||
with patch.dict(os.environ, {"TIMMY_BRIDGE_PORT": "0"}):
|
||||
import multi_user_bridge as mub
|
||||
|
||||
|
||||
# ─── TestPluginRegistry ──────────────────────────────────────────────
|
||||
|
||||
class TestPluginRegistry:
|
||||
"""Test the plugin registry system."""
|
||||
|
||||
def setup_method(self):
|
||||
self.registry = mub.PluginRegistry()
|
||||
|
||||
def test_register_plugin(self):
|
||||
plugin = mub.Plugin("test", "A test plugin")
|
||||
self.registry.register(plugin)
|
||||
assert self.registry.get("test") is plugin
|
||||
|
||||
def test_unregister_plugin(self):
|
||||
plugin = mub.Plugin("test", "A test plugin")
|
||||
self.registry.register(plugin)
|
||||
result = self.registry.unregister("test")
|
||||
assert result is True
|
||||
assert self.registry.get("test") is None
|
||||
|
||||
def test_unregister_nonexistent(self):
|
||||
result = self.registry.unregister("nonexistent")
|
||||
assert result is False
|
||||
|
||||
def test_list_plugins(self):
|
||||
p1 = mub.Plugin("alpha", "First")
|
||||
p2 = mub.Plugin("beta", "Second")
|
||||
self.registry.register(p1)
|
||||
self.registry.register(p2)
|
||||
names = [p["name"] for p in self.registry.list_plugins()]
|
||||
assert "alpha" in names
|
||||
assert "beta" in names
|
||||
|
||||
def test_fire_on_message(self):
|
||||
class EchoPlugin(mub.Plugin):
|
||||
def on_message(self, user_id, message, room):
|
||||
return f"echo: {message}"
|
||||
|
||||
self.registry.register(EchoPlugin("echo", "Echoes"))
|
||||
result = self.registry.fire_on_message("u1", "hello", "garden")
|
||||
assert result == "echo: hello"
|
||||
|
||||
def test_fire_on_message_no_handler(self):
|
||||
plugin = mub.Plugin("noop", "Does nothing")
|
||||
self.registry.register(plugin)
|
||||
result = self.registry.fire_on_message("u1", "hello", "garden")
|
||||
assert result is None
|
||||
|
||||
def test_thread_safety(self):
|
||||
errors = []
|
||||
|
||||
def register_many(prefix):
|
||||
try:
|
||||
for i in range(50):
|
||||
p = mub.Plugin(f"{prefix}_{i}", f"Plugin {i}")
|
||||
self.registry.register(p)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=register_many, args=(f"t{t}",)) for t in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
assert len(errors) == 0
|
||||
assert len(self.registry.list_plugins()) == 200
|
||||
|
||||
|
||||
# ─── TestChatLogIsolation ────────────────────────────────────────────
|
||||
|
||||
class TestChatLogIsolation:
|
||||
"""Test chat log room isolation and rolling buffer."""
|
||||
|
||||
def setup_method(self):
|
||||
# Patch CHATLOG_FILE to prevent disk writes
|
||||
self._patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
|
||||
self._patcher.start()
|
||||
self.chat_log = mub.ChatLog(max_per_room=5)
|
||||
|
||||
def teardown_method(self):
|
||||
self._patcher.stop()
|
||||
|
||||
def test_room_isolation(self):
|
||||
self.chat_log.log("garden", "say", "Hello garden", user_id="u1")
|
||||
self.chat_log.log("tower", "say", "Hello tower", user_id="u2")
|
||||
|
||||
garden_msgs = self.chat_log.get_history("garden")
|
||||
tower_msgs = self.chat_log.get_history("tower")
|
||||
|
||||
assert len(garden_msgs) == 1
|
||||
assert garden_msgs[0]["message"] == "Hello garden"
|
||||
assert len(tower_msgs) == 1
|
||||
assert tower_msgs[0]["message"] == "Hello tower"
|
||||
|
||||
def test_rolling_buffer(self):
|
||||
for i in range(10):
|
||||
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
|
||||
|
||||
history = self.chat_log.get_history("garden")
|
||||
assert len(history) == 5 # max_per_room
|
||||
assert history[0]["message"] == "msg_5" # oldest kept
|
||||
assert history[-1]["message"] == "msg_9" # newest
|
||||
|
||||
def test_get_all_rooms(self):
|
||||
self.chat_log.log("garden", "say", "hi", user_id="u1")
|
||||
self.chat_log.log("tower", "say", "hi", user_id="u1")
|
||||
rooms = self.chat_log.get_all_rooms()
|
||||
assert set(rooms) == {"garden", "tower"}
|
||||
|
||||
def test_history_with_limit(self):
|
||||
for i in range(10):
|
||||
self.chat_log.log("garden", "say", f"msg_{i}", user_id="u1")
|
||||
|
||||
limited = self.chat_log.get_history("garden", limit=3)
|
||||
assert len(limited) == 3
|
||||
assert limited[0]["message"] == "msg_7"
|
||||
|
||||
def test_empty_room_history(self):
|
||||
history = self.chat_log.get_history("nonexistent")
|
||||
assert history == []
|
||||
|
||||
def test_message_types(self):
|
||||
self.chat_log.log("garden", "say", "said", user_id="u1")
|
||||
self.chat_log.log("garden", "ask", "asked", user_id="u1")
|
||||
self.chat_log.log("garden", "system", "notice", user_id=None)
|
||||
|
||||
history = self.chat_log.get_history("garden")
|
||||
types = [m["type"] for m in history]
|
||||
assert "say" in types
|
||||
assert "ask" in types
|
||||
assert "system" in types
|
||||
|
||||
def test_since_filter(self):
|
||||
self.chat_log.log("garden", "say", "old", user_id="u1")
|
||||
time.sleep(0.05)
|
||||
cutoff = time.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
time.sleep(0.05)
|
||||
self.chat_log.log("garden", "say", "new", user_id="u1")
|
||||
|
||||
recent = self.chat_log.get_history("garden", since=cutoff)
|
||||
assert len(recent) == 1
|
||||
assert recent[0]["message"] == "new"
|
||||
|
||||
def test_thread_safety(self):
|
||||
errors = []
|
||||
|
||||
def log_many(room, n):
|
||||
try:
|
||||
for i in range(n):
|
||||
self.chat_log.log(room, "say", f"{room}_{i}", user_id="u1")
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=log_many, args=("garden", 50)),
|
||||
threading.Thread(target=log_many, args=("tower", 50)),
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
# Each room should have exactly max_per_room (5) messages
|
||||
assert len(self.chat_log.get_history("garden")) == 5
|
||||
assert len(self.chat_log.get_history("tower")) == 5
|
||||
|
||||
|
||||
# ─── TestPresenceManager ─────────────────────────────────────────────
|
||||
|
||||
class TestPresenceManager:
|
||||
"""Test presence tracking (enter/leave/room queries)."""
|
||||
|
||||
def setup_method(self):
|
||||
self.pm = mub.PresenceManager()
|
||||
|
||||
def test_enter_room(self):
|
||||
event = self.pm.enter_room("u1", "Alice", "garden")
|
||||
assert event["event"] == "enter"
|
||||
assert event["user_id"] == "u1"
|
||||
assert event["room"] == "garden"
|
||||
|
||||
def test_leave_room(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
event = self.pm.leave_room("u1", "garden")
|
||||
assert event is not None
|
||||
assert event["event"] == "leave"
|
||||
|
||||
def test_leave_nonexistent(self):
|
||||
event = self.pm.leave_room("u1", "garden")
|
||||
assert event is None
|
||||
|
||||
def test_get_players_in_room(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u2", "Bob", "garden")
|
||||
self.pm.enter_room("u3", "Charlie", "tower")
|
||||
|
||||
players = self.pm.get_players_in_room("garden")
|
||||
names = {p["username"] for p in players}
|
||||
assert names == {"Alice", "Bob"}
|
||||
|
||||
def test_room_isolation(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u2", "Bob", "tower")
|
||||
|
||||
garden_players = {p["user_id"] for p in self.pm.get_players_in_room("garden")}
|
||||
tower_players = {p["user_id"] for p in self.pm.get_players_in_room("tower")}
|
||||
|
||||
assert garden_players == {"u1"}
|
||||
assert tower_players == {"u2"}
|
||||
|
||||
def test_cleanup_user(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.enter_room("u1", "Alice", "tower")
|
||||
|
||||
events = self.pm.cleanup_user("u1")
|
||||
assert len(events) == 2
|
||||
|
||||
assert len(self.pm.get_players_in_room("garden")) == 0
|
||||
assert len(self.pm.get_players_in_room("tower")) == 0
|
||||
|
||||
def test_say_event(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
event = self.pm.say("u1", "Alice", "garden", "Hello!")
|
||||
assert event["type"] == "say"
|
||||
assert event["message"] == "Hello!"
|
||||
|
||||
def test_get_room_events(self):
|
||||
self.pm.enter_room("u1", "Alice", "garden")
|
||||
self.pm.say("u1", "Alice", "garden", "Hi!")
|
||||
self.pm.leave_room("u1", "garden")
|
||||
|
||||
events = self.pm.get_room_events("garden")
|
||||
types = [e["event"] for e in events]
|
||||
assert "enter" in types
|
||||
assert "message" in types
|
||||
assert "leave" in types
|
||||
|
||||
|
||||
# ─── TestConcurrentUsers ─────────────────────────────────────────────
|
||||
|
||||
class TestConcurrentUsers:
|
||||
"""Test multi-user concurrent operations."""
|
||||
|
||||
def test_concurrent_chat(self):
|
||||
pm = mub.PresenceManager()
|
||||
_patcher = patch.object(mub, "CHATLOG_FILE", Path("/dev/null"))
|
||||
_patcher.start()
|
||||
cl = mub.ChatLog(max_per_room=100)
|
||||
_patcher.stop()
|
||||
|
||||
errors = []
|
||||
rooms = ["garden", "tower", "library"]
|
||||
|
||||
def user_session(user_id, username):
|
||||
try:
|
||||
for room in rooms:
|
||||
pm.enter_room(user_id, username, room)
|
||||
for i in range(5):
|
||||
pm.say(user_id, username, room, f"{username}: msg {i}")
|
||||
cl.log(room, "say", f"{username}: msg {i}", user_id=user_id, username=username)
|
||||
pm.leave_room(user_id, room)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=user_session, args=(f"u{i}", f"user_{i}"))
|
||||
for i in range(10)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0
|
||||
|
||||
# Verify no cross-contamination: each room should have messages
|
||||
for room in rooms:
|
||||
players = pm.get_players_in_room(room)
|
||||
# All users should have left
|
||||
assert len(players) == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user