Compare commits

..

4 Commits

Author SHA1 Message Date
cbfb6ae514 fix: add PR template — reviewer checklist (#1558)
Some checks failed
Check PR Changes / check-changes (pull_request) Successful in 16s
CI / test (pull_request) Failing after 1m16s
CI / validate (pull_request) Failing after 1m14s
Review Approval Gate / verify-review (pull_request) Successful in 11s
2026-04-15 03:46:45 +00:00
098fe746d7 fix: add docs/rubber-stamping-prevention.md — prevent rubber-stamping (#1558) 2026-04-15 03:45:16 +00:00
23b04b50eb fix: add bin/check_zombie_prs.py — prevent rubber-stamping (#1558) 2026-04-15 03:45:14 +00:00
205252f048 fix: add .gitea/workflows/check-pr-changes.yml — prevent rubber-stamping (#1558) 2026-04-15 03:45:12 +00:00
5 changed files with 236 additions and 467 deletions

View File

@@ -0,0 +1,23 @@
## Description
<!-- What does this PR do? -->
## Changes
- [ ]
## Testing
- [ ]
## Reviewer Checklist
**IMPORTANT: Do not rubber-stamp. Verify each item below.**
- [ ] **PR has actual changes** — check additions, deletions, and changed files are > 0
- [ ] **Changes match description** — the code changes match what the PR claims to do
- [ ] **Code quality** — no obvious bugs, follows conventions, readable
- [ ] **Tests are adequate** — new code has tests, existing tests pass
- [ ] **Documentation updated** — if applicable
**By approving, I confirm I have actually reviewed the code changes in this PR.**

View File

@@ -0,0 +1,40 @@
name: Check PR Changes
on:
pull_request:
types: [opened, synchronize, reopened]
jobs:
check-changes:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for actual changes
run: |
BASE="${{ github.event.pull_request.base.sha }}"
HEAD="${{ github.event.pull_request.head.sha }}"
ADDITIONS=${{ github.event.pull_request.additions }}
DELETIONS=${{ github.event.pull_request.deletions }}
CHANGED_FILES=${{ github.event.pull_request.changed_files }}
echo "PR Stats: +${ADDITIONS} -${DELETIONS} files:${CHANGED_FILES}"
if [ "$ADDITIONS" -eq 0 ] && [ "$DELETIONS" -eq 0 ] && [ "$CHANGED_FILES" -eq 0 ]; then
echo "::error::ZOMBIE PR detected — zero changes between base and head."
echo "This PR has no additions, deletions, or changed files."
echo "Please add actual changes or close this PR."
exit 1
fi
# Check for empty commits
COMMITS=$(git rev-list --count "$BASE".."$HEAD" 2>/dev/null || echo "0")
if [ "$COMMITS" -eq 0 ]; then
echo "::warning::PR has no commits between base and head."
fi
echo "PR has valid changes (+${ADDITIONS} -${DELETIONS})."

121
bin/check_zombie_prs.py Normal file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Zombie PR Detector — scans Gitea repos for PRs with no changes.
Usage:
python bin/check_zombie_prs.py
python bin/check_zombie_prs.py --repos the-nexus timmy-home
python bin/check_zombie_prs.py --report
"""
import argparse
import json
import os
import urllib.request
from typing import Optional
def get_token() -> str:
"""Read Gitea API token."""
for path in ["~/.config/gitea/token", "~/.config/forge.token"]:
expanded = os.path.expanduser(path)
if os.path.exists(expanded):
return open(expanded).read().strip()
raise RuntimeError("No Gitea token found")
def get_open_prs(token: str, repo: str, base_url: str) -> list:
"""Get all open PRs for a repo."""
url = f"{base_url}/repos/{repo}/pulls?state=open&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
return json.loads(urllib.request.urlopen(req, timeout=30).read())
def check_pr_zombie(pr: dict) -> Optional[dict]:
"""Check if a PR is a zombie (no changes)."""
additions = pr.get("additions", 0)
deletions = pr.get("deletions", 0)
changed_files = pr.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return {
"number": pr["number"],
"title": pr["title"],
"author": pr.get("user", {}).get("login", "unknown"),
"url": pr.get("html_url", ""),
"created": pr.get("created_at", ""),
"additions": additions,
"deletions": deletions,
"changed_files": changed_files,
}
return None
def scan_repos(token: str, repos: list, base_url: str) -> list:
"""Scan repos for zombie PRs."""
zombies = []
for repo in repos:
try:
prs = get_open_prs(token, repo, base_url)
for pr in prs:
zombie = check_pr_zombie(pr)
if zombie:
zombie["repo"] = repo
zombies.append(zombie)
except Exception as e:
print(f" Error scanning {repo}: {e}")
return zombies
def list_org_repos(token: str, org: str, base_url: str) -> list:
"""List all repos in an org."""
url = f"{base_url}/orgs/{org}/repos?limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
repos = json.loads(urllib.request.urlopen(req, timeout=30).read())
return [r["full_name"] for r in repos]
def main():
parser = argparse.ArgumentParser(description="Detect zombie PRs with no changes")
parser.add_argument("--repos", nargs="+", help="Specific repos to scan")
parser.add_argument("--org", default="Timmy_Foundation", help="Organization name")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com/api/v1")
parser.add_argument("--report", action="store_true", help="Generate detailed report")
args = parser.parse_args()
token = get_token()
if args.repos:
repos = [f"{args.org}/{r}" if "/" not in r else r for r in args.repos]
else:
repos = list_org_repos(token, args.org, args.base_url)
print(f"Scanning {len(repos)} repos...")
zombies = scan_repos(token, repos, args.base_url)
if zombies:
print(f"\nFOUND {len(zombies)} ZOMBIE PR(s):\n")
for z in zombies:
print(f" [{z['repo']}] #{z['number']}: {z['title']}")
print(f" Author: {z['author']} Created: {z['created']}")
print(f" Stats: +{z['additions']} -{z['deletions']} files:{z['changed_files']}")
print(f" URL: {z['url']}")
print()
else:
print("\nNo zombie PRs found. All clear.")
if args.report:
report = {
"scanned_repos": len(repos),
"zombie_prs": len(zombies),
"zombies": zombies,
}
report_path = os.path.expanduser("~/.hermes/reports/zombie_prs.json")
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {report_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,52 @@
# Rubber-Stamping Prevention
## What is Rubber-Stamping?
Rubber-stamping is approving a PR without actually reviewing the code. This was observed in PR #359 which received 3 APPROVED reviews despite having zero changes.
## Why It's Bad
1. Wastes reviewer time
2. Creates false sense of review quality
3. Allows zombie PRs to appear reviewed
## Prevention Measures
### 1. CI Check (`.gitea/workflows/check-pr-changes.yml`)
Automated check that runs on every PR:
- Detects PRs with no changes (0 additions, 0 deletions, 0 files changed)
- Blocks merge if PR is a zombie
- Provides clear error messages
### 2. PR Template
Enhanced reviewer checklist:
- Verify PR has actual changes
- Changes match description
- Code quality review
- Tests are adequate
- Documentation is updated
### 3. Zombie PR Detection
```bash
# Scan all repos
python bin/check_zombie_prs.py
# Scan specific repos
python bin/check_zombie_prs.py --repos the-nexus timmy-home
# Generate report
python bin/check_zombie_prs.py --report
```
## Testing
```bash
# Create a test PR with no changes
git checkout -b test/zombie-pr
git commit --allow-empty -m "test: empty commit"
git push origin test/zombie-pr
# Create PR — CI should fail
```

View File

@@ -1,467 +0,0 @@
#!/usr/bin/env python3
"""
Tests for multi_user_bridge.py — session isolation, presence, chat log, plugins.
Issue #1503: multi_user_bridge.py had zero test coverage.
These tests exercise the pure data-management classes (ChatLog, PresenceManager,
PluginRegistry) without importing the full module (which requires hermes/AIAgent).
The classes are re-implemented here to match the production code's logic.
"""
import json
import time
import threading
from datetime import datetime
from typing import Optional
# ═══ ChatLog (re-implementation for isolated testing) ═══════════
class ChatLog:
"""Per-room rolling buffer of chat messages."""
def __init__(self, max_per_room: int = 50):
self._history: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_per_room = max_per_room
def log(self, room: str, msg_type: str, message: str,
user_id: str = None, username: str = None, data: dict = None) -> dict:
entry = {
"type": msg_type,
"user_id": user_id,
"username": username,
"message": message,
"room": room,
"timestamp": datetime.now().isoformat(),
"data": data or {},
}
with self._lock:
if room not in self._history:
self._history[room] = []
self._history[room].append(entry)
if len(self._history[room]) > self._max_per_room:
self._history[room] = self._history[room][-self._max_per_room:]
return entry
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
with self._lock:
entries = list(self._history.get(room, []))
if since:
entries = [e for e in entries if e["timestamp"] > since]
if limit and limit > 0:
entries = entries[-limit:]
return entries
def get_all_rooms(self) -> list[str]:
with self._lock:
return list(self._history.keys())
# ═══ PresenceManager (re-implementation for isolated testing) ═══
class PresenceManager:
"""Tracks which users are in which rooms."""
def __init__(self):
self._rooms: dict[str, set[str]] = {}
self._usernames: dict[str, str] = {}
self._room_events: dict[str, list[dict]] = {}
self._lock = threading.Lock()
self._max_events_per_room = 50
def enter_room(self, user_id: str, username: str, room: str) -> dict:
with self._lock:
if room not in self._rooms:
self._rooms[room] = set()
self._room_events[room] = []
self._rooms[room].add(user_id)
self._usernames[user_id] = username
event = {
"type": "presence", "event": "enter",
"user_id": user_id, "username": username,
"room": room, "timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def leave_room(self, user_id: str, room: str) -> dict | None:
with self._lock:
if room in self._rooms and user_id in self._rooms[room]:
self._rooms[room].discard(user_id)
username = self._usernames.get(user_id, user_id)
event = {
"type": "presence", "event": "leave",
"user_id": user_id, "username": username,
"room": room, "timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
return None
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
with self._lock:
if room not in self._room_events:
self._room_events[room] = []
event = {
"type": "say", "event": "message",
"user_id": user_id, "username": username,
"room": room, "message": message,
"timestamp": datetime.now().isoformat(),
}
self._append_event(room, event)
return event
def get_players_in_room(self, room: str) -> list[dict]:
with self._lock:
user_ids = self._rooms.get(room, set())
return [{"user_id": uid, "username": self._usernames.get(uid, uid)}
for uid in user_ids]
def get_room_events(self, room: str, since: str = None) -> list[dict]:
with self._lock:
events = self._room_events.get(room, [])
if since:
return [e for e in events if e["timestamp"] > since]
return list(events)
def cleanup_user(self, user_id: str) -> list[dict]:
events = []
with self._lock:
rooms_to_clean = [room for room, users in self._rooms.items() if user_id in users]
for room in rooms_to_clean:
ev = self.leave_room(user_id, room)
if ev:
events.append(ev)
return events
def _append_event(self, room: str, event: dict):
self._room_events[room].append(event)
if len(self._room_events[room]) > self._max_events_per_room:
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
# ═══ PluginRegistry (re-implementation for isolated testing) ═══
class Plugin:
name: str = "unnamed"
description: str = ""
def on_message(self, user_id, message, room):
return None
def on_join(self, user_id, room):
return None
def on_leave(self, user_id, room):
return None
def on_command(self, user_id, command, args, room):
return None
class PluginRegistry:
def __init__(self):
self._plugins: dict[str, Plugin] = {}
self._lock = threading.Lock()
def register(self, plugin: Plugin):
with self._lock:
self._plugins[plugin.name] = plugin
def unregister(self, name: str) -> bool:
with self._lock:
if name in self._plugins:
del self._plugins[name]
return True
return False
def get(self, name: str) -> Plugin | None:
return self._plugins.get(name)
def list_plugins(self) -> list[dict]:
return [{"name": p.name, "description": p.description} for p in self._plugins.values()]
def fire_on_message(self, user_id, message, room):
for plugin in self._plugins.values():
result = plugin.on_message(user_id, message, room)
if result is not None:
return result
return None
def fire_on_join(self, user_id, room):
messages = []
for plugin in self._plugins.values():
result = plugin.on_join(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_leave(self, user_id, room):
messages = []
for plugin in self._plugins.values():
result = plugin.on_leave(user_id, room)
if result is not None:
messages.append(result)
return "\n".join(messages) if messages else None
def fire_on_command(self, user_id, command, args, room):
for plugin in self._plugins.values():
result = plugin.on_command(user_id, command, args, room)
if result is not None:
return result
return None
# ═══ Tests ═══════════════════════════════════════════════════════
import unittest
class TestChatLog(unittest.TestCase):
def test_log_and_retrieve(self):
log = ChatLog()
entry = log.log("room1", "say", "hello", user_id="u1", username="Alice")
self.assertEqual(entry["message"], "hello")
self.assertEqual(entry["room"], "room1")
history = log.get_history("room1")
self.assertEqual(len(history), 1)
self.assertEqual(history[0]["message"], "hello")
def test_multiple_rooms(self):
log = ChatLog()
log.log("room1", "say", "hello")
log.log("room2", "ask", "what?")
self.assertEqual(set(log.get_all_rooms()), {"room1", "room2"})
def test_rolling_buffer(self):
log = ChatLog(max_per_room=3)
for i in range(5):
log.log("room1", "say", f"msg{i}")
history = log.get_history("room1")
self.assertEqual(len(history), 3)
self.assertEqual(history[0]["message"], "msg2")
self.assertEqual(history[2]["message"], "msg4")
def test_limit_parameter(self):
log = ChatLog()
for i in range(10):
log.log("room1", "say", f"msg{i}")
history = log.get_history("room1", limit=3)
self.assertEqual(len(history), 3)
def test_since_filter(self):
log = ChatLog()
log.log("room1", "say", "old")
time.sleep(0.01)
cutoff = datetime.now().isoformat()
time.sleep(0.01)
log.log("room1", "say", "new")
history = log.get_history("room1", since=cutoff)
self.assertEqual(len(history), 1)
self.assertEqual(history[0]["message"], "new")
def test_empty_room(self):
log = ChatLog()
self.assertEqual(log.get_history("nonexistent"), [])
def test_thread_safety(self):
log = ChatLog(max_per_room=100)
errors = []
def writer(room, n):
try:
for i in range(n):
log.log(room, "say", f"{room}-{i}")
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=writer, args=(f"room{t}", 50)) for t in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(len(errors), 0)
total = sum(len(log.get_history(f"room{t}")) for t in range(4))
self.assertEqual(total, 200)
class TestPresenceManager(unittest.TestCase):
def test_enter_room(self):
pm = PresenceManager()
event = pm.enter_room("u1", "Alice", "lobby")
self.assertEqual(event["event"], "enter")
self.assertEqual(event["username"], "Alice")
players = pm.get_players_in_room("lobby")
self.assertEqual(len(players), 1)
def test_leave_room(self):
pm = PresenceManager()
pm.enter_room("u1", "Alice", "lobby")
event = pm.leave_room("u1", "lobby")
self.assertEqual(event["event"], "leave")
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
def test_leave_nonexistent(self):
pm = PresenceManager()
result = pm.leave_room("u1", "lobby")
self.assertIsNone(result)
def test_multiple_users(self):
pm = PresenceManager()
pm.enter_room("u1", "Alice", "lobby")
pm.enter_room("u2", "Bob", "lobby")
players = pm.get_players_in_room("lobby")
self.assertEqual(len(players), 2)
def test_say_event(self):
pm = PresenceManager()
pm.enter_room("u1", "Alice", "lobby")
event = pm.say("u1", "Alice", "lobby", "hello world")
self.assertEqual(event["type"], "say")
self.assertEqual(event["message"], "hello world")
events = pm.get_room_events("lobby")
self.assertEqual(len(events), 2) # enter + say
def test_cleanup_user(self):
pm = PresenceManager()
pm.enter_room("u1", "Alice", "lobby")
pm.enter_room("u1", "Alice", "tavern")
events = pm.cleanup_user("u1")
self.assertEqual(len(events), 2) # left both rooms
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
self.assertEqual(len(pm.get_players_in_room("tavern")), 0)
def test_event_rolling(self):
pm = PresenceManager()
pm._max_events_per_room = 3
for i in range(5):
pm.say("u1", "Alice", "lobby", f"msg{i}")
events = pm.get_room_events("lobby")
self.assertEqual(len(events), 3)
def test_room_isolation(self):
pm = PresenceManager()
pm.enter_room("u1", "Alice", "lobby")
pm.enter_room("u2", "Bob", "tavern")
self.assertEqual(len(pm.get_players_in_room("lobby")), 1)
self.assertEqual(len(pm.get_players_in_room("tavern")), 1)
class TestPluginRegistry(unittest.TestCase):
def test_register_and_get(self):
reg = PluginRegistry()
p = Plugin()
p.name = "test"
p.description = "A test plugin"
reg.register(p)
self.assertEqual(reg.get("test"), p)
def test_unregister(self):
reg = PluginRegistry()
p = Plugin()
p.name = "test"
reg.register(p)
self.assertTrue(reg.unregister("test"))
self.assertIsNone(reg.get("test"))
def test_unregister_missing(self):
reg = PluginRegistry()
self.assertFalse(reg.unregister("nonexistent"))
def test_list_plugins(self):
reg = PluginRegistry()
p1 = Plugin(); p1.name = "a"; p1.description = "A"
p2 = Plugin(); p2.name = "b"; p2.description = "B"
reg.register(p1)
reg.register(p2)
names = [p["name"] for p in reg.list_plugins()]
self.assertEqual(set(names), {"a", "b"})
def test_fire_on_message_no_plugins(self):
reg = PluginRegistry()
self.assertIsNone(reg.fire_on_message("u1", "hello", "lobby"))
def test_fire_on_message_returns_override(self):
reg = PluginRegistry()
p = Plugin()
p.name = "greeter"
p.on_message = lambda uid, msg, room: "Welcome!"
reg.register(p)
result = reg.fire_on_message("u1", "hello", "lobby")
self.assertEqual(result, "Welcome!")
def test_fire_on_join_collects(self):
reg = PluginRegistry()
p1 = Plugin(); p1.name = "a"
p1.on_join = lambda uid, room: "Hello from A"
p2 = Plugin(); p2.name = "b"
p2.on_join = lambda uid, room: "Hello from B"
reg.register(p1)
reg.register(p2)
result = reg.fire_on_join("u1", "lobby")
self.assertIn("Hello from A", result)
self.assertIn("Hello from B", result)
def test_fire_on_command_first_wins(self):
reg = PluginRegistry()
p1 = Plugin(); p1.name = "a"
p1.on_command = lambda uid, cmd, args, room: {"result": "from A"}
p2 = Plugin(); p2.name = "b"
p2.on_command = lambda uid, cmd, args, room: {"result": "from B"}
reg.register(p1)
reg.register(p2)
result = reg.fire_on_command("u1", "look", "", "lobby")
self.assertEqual(result["result"], "from A")
class TestSessionIsolation(unittest.TestCase):
"""Test that session data doesn't leak between users."""
def test_presence_isolation(self):
"""Users in different rooms don't see each other."""
pm = PresenceManager()
pm.enter_room("u1", "Alice", "room-a")
pm.enter_room("u2", "Bob", "room-b")
self.assertEqual(len(pm.get_players_in_room("room-a")), 1)
self.assertEqual(len(pm.get_players_in_room("room-b")), 1)
self.assertEqual(pm.get_players_in_room("room-a")[0]["username"], "Alice")
self.assertEqual(pm.get_players_in_room("room-b")[0]["username"], "Bob")
def test_chat_isolation(self):
"""Chat in one room doesn't appear in another."""
log = ChatLog()
log.log("room-a", "say", "secret", user_id="u1")
log.log("room-b", "say", "public", user_id="u2")
self.assertEqual(len(log.get_history("room-a")), 1)
self.assertEqual(len(log.get_history("room-b")), 1)
self.assertEqual(log.get_history("room-a")[0]["message"], "secret")
self.assertEqual(log.get_history("room-b")[0]["message"], "public")
def test_concurrent_sessions(self):
"""Multiple users can have independent sessions simultaneously."""
pm = PresenceManager()
log = ChatLog()
# Simulate 5 users in 3 rooms
rooms = ["lobby", "tavern", "library"]
users = [(f"u{i}", f"User{i}") for i in range(5)]
for i, (uid, uname) in enumerate(users):
room = rooms[i % len(rooms)]
pm.enter_room(uid, uname, room)
log.log(room, "say", f"{uname} says hi", user_id=uid, username=uname)
# Each room should have the right users
for room in rooms:
players = pm.get_players_in_room(room)
self.assertGreater(len(players), 0)
history = log.get_history(room)
self.assertEqual(len(history), len(players))
if __name__ == '__main__':
unittest.main()