Compare commits

..

4 Commits

Author SHA1 Message Date
cbfb6ae514 fix: add PR template — reviewer checklist (#1558)
Some checks failed
Check PR Changes / check-changes (pull_request) Successful in 16s
CI / test (pull_request) Failing after 1m16s
CI / validate (pull_request) Failing after 1m14s
Review Approval Gate / verify-review (pull_request) Successful in 11s
2026-04-15 03:46:45 +00:00
098fe746d7 fix: add docs/rubber-stamping-prevention.md — prevent rubber-stamping (#1558) 2026-04-15 03:45:16 +00:00
23b04b50eb fix: add bin/check_zombie_prs.py — prevent rubber-stamping (#1558) 2026-04-15 03:45:14 +00:00
205252f048 fix: add .gitea/workflows/check-pr-changes.yml — prevent rubber-stamping (#1558) 2026-04-15 03:45:12 +00:00
5 changed files with 236 additions and 369 deletions

View File

@@ -0,0 +1,23 @@
## Description
<!-- What does this PR do? -->
## Changes
- [ ]
## Testing
- [ ]
## Reviewer Checklist
**IMPORTANT: Do not rubber-stamp. Verify each item below.**
- [ ] **PR has actual changes** — check additions, deletions, and changed files are > 0
- [ ] **Changes match description** — the code changes match what the PR claims to do
- [ ] **Code quality** — no obvious bugs, follows conventions, readable
- [ ] **Tests are adequate** — new code has tests, existing tests pass
- [ ] **Documentation updated** — if applicable
**By approving, I confirm I have actually reviewed the code changes in this PR.**

View File

@@ -0,0 +1,40 @@
name: Check PR Changes
on:
pull_request:
types: [opened, synchronize, reopened]
jobs:
check-changes:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for actual changes
run: |
BASE="${{ github.event.pull_request.base.sha }}"
HEAD="${{ github.event.pull_request.head.sha }}"
ADDITIONS=${{ github.event.pull_request.additions }}
DELETIONS=${{ github.event.pull_request.deletions }}
CHANGED_FILES=${{ github.event.pull_request.changed_files }}
echo "PR Stats: +${ADDITIONS} -${DELETIONS} files:${CHANGED_FILES}"
if [ "$ADDITIONS" -eq 0 ] && [ "$DELETIONS" -eq 0 ] && [ "$CHANGED_FILES" -eq 0 ]; then
echo "::error::ZOMBIE PR detected — zero changes between base and head."
echo "This PR has no additions, deletions, or changed files."
echo "Please add actual changes or close this PR."
exit 1
fi
# Check for empty commits
COMMITS=$(git rev-list --count "$BASE".."$HEAD" 2>/dev/null || echo "0")
if [ "$COMMITS" -eq 0 ]; then
echo "::warning::PR has no commits between base and head."
fi
echo "PR has valid changes (+${ADDITIONS} -${DELETIONS})."

121
bin/check_zombie_prs.py Normal file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Zombie PR Detector — scans Gitea repos for PRs with no changes.
Usage:
python bin/check_zombie_prs.py
python bin/check_zombie_prs.py --repos the-nexus timmy-home
python bin/check_zombie_prs.py --report
"""
import argparse
import json
import os
import urllib.request
from typing import Optional
def get_token() -> str:
"""Read Gitea API token."""
for path in ["~/.config/gitea/token", "~/.config/forge.token"]:
expanded = os.path.expanduser(path)
if os.path.exists(expanded):
return open(expanded).read().strip()
raise RuntimeError("No Gitea token found")
def get_open_prs(token: str, repo: str, base_url: str) -> list:
"""Get all open PRs for a repo."""
url = f"{base_url}/repos/{repo}/pulls?state=open&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
return json.loads(urllib.request.urlopen(req, timeout=30).read())
def check_pr_zombie(pr: dict) -> Optional[dict]:
"""Check if a PR is a zombie (no changes)."""
additions = pr.get("additions", 0)
deletions = pr.get("deletions", 0)
changed_files = pr.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return {
"number": pr["number"],
"title": pr["title"],
"author": pr.get("user", {}).get("login", "unknown"),
"url": pr.get("html_url", ""),
"created": pr.get("created_at", ""),
"additions": additions,
"deletions": deletions,
"changed_files": changed_files,
}
return None
def scan_repos(token: str, repos: list, base_url: str) -> list:
"""Scan repos for zombie PRs."""
zombies = []
for repo in repos:
try:
prs = get_open_prs(token, repo, base_url)
for pr in prs:
zombie = check_pr_zombie(pr)
if zombie:
zombie["repo"] = repo
zombies.append(zombie)
except Exception as e:
print(f" Error scanning {repo}: {e}")
return zombies
def list_org_repos(token: str, org: str, base_url: str) -> list:
"""List all repos in an org."""
url = f"{base_url}/orgs/{org}/repos?limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
repos = json.loads(urllib.request.urlopen(req, timeout=30).read())
return [r["full_name"] for r in repos]
def main():
parser = argparse.ArgumentParser(description="Detect zombie PRs with no changes")
parser.add_argument("--repos", nargs="+", help="Specific repos to scan")
parser.add_argument("--org", default="Timmy_Foundation", help="Organization name")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com/api/v1")
parser.add_argument("--report", action="store_true", help="Generate detailed report")
args = parser.parse_args()
token = get_token()
if args.repos:
repos = [f"{args.org}/{r}" if "/" not in r else r for r in args.repos]
else:
repos = list_org_repos(token, args.org, args.base_url)
print(f"Scanning {len(repos)} repos...")
zombies = scan_repos(token, repos, args.base_url)
if zombies:
print(f"\nFOUND {len(zombies)} ZOMBIE PR(s):\n")
for z in zombies:
print(f" [{z['repo']}] #{z['number']}: {z['title']}")
print(f" Author: {z['author']} Created: {z['created']}")
print(f" Stats: +{z['additions']} -{z['deletions']} files:{z['changed_files']}")
print(f" URL: {z['url']}")
print()
else:
print("\nNo zombie PRs found. All clear.")
if args.report:
report = {
"scanned_repos": len(repos),
"zombie_prs": len(zombies),
"zombies": zombies,
}
report_path = os.path.expanduser("~/.hermes/reports/zombie_prs.json")
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {report_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,52 @@
# Rubber-Stamping Prevention
## What is Rubber-Stamping?
Rubber-stamping is approving a PR without actually reviewing the code. This was observed in PR #359 which received 3 APPROVED reviews despite having zero changes.
## Why It's Bad
1. Wastes reviewer time
2. Creates false sense of review quality
3. Allows zombie PRs to appear reviewed
## Prevention Measures
### 1. CI Check (`.gitea/workflows/check-pr-changes.yml`)
Automated check that runs on every PR:
- Detects PRs with no changes (0 additions, 0 deletions, 0 files changed)
- Blocks merge if PR is a zombie
- Provides clear error messages
### 2. PR Template
Enhanced reviewer checklist:
- Verify PR has actual changes
- Changes match description
- Code quality review
- Tests are adequate
- Documentation is updated
### 3. Zombie PR Detection
```bash
# Scan all repos
python bin/check_zombie_prs.py
# Scan specific repos
python bin/check_zombie_prs.py --repos the-nexus timmy-home
# Generate report
python bin/check_zombie_prs.py --report
```
## Testing
```bash
# Create a test PR with no changes
git checkout -b test/zombie-pr
git commit --allow-empty -m "test: empty commit"
git push origin test/zombie-pr
# Create PR — CI should fail
```

View File

@@ -1,369 +0,0 @@
"""Tests for multi_user_bridge.py — session isolation and core classes.
Refs: #1503 — multi_user_bridge.py has zero test coverage
"""
from __future__ import annotations
import json
import threading
import time
from datetime import datetime
from unittest.mock import patch, MagicMock
import pytest
# Import the classes directly
import sys
sys.path.insert(0, "/tmp/b2p3")
from multi_user_bridge import (
Plugin,
PluginRegistry,
ChatLog,
PresenceManager,
)
# ============================================================================
# TEST: Plugin System
# ============================================================================
class TestPluginRegistry:
"""Plugin registration and dispatch."""
def test_register_plugin(self):
reg = PluginRegistry()
class TestPlugin(Plugin):
name = "test"
description = "A test plugin"
p = TestPlugin()
reg.register(p)
assert reg.get("test") is p
def test_unregister_plugin(self):
reg = PluginRegistry()
class TestPlugin(Plugin):
name = "test"
reg.register(TestPlugin())
assert reg.unregister("test")
assert reg.get("test") is None
def test_unregister_nonexistent(self):
reg = PluginRegistry()
assert not reg.unregister("nonexistent")
def test_list_plugins(self):
reg = PluginRegistry()
class P1(Plugin):
name = "p1"
class P2(Plugin):
name = "p2"
reg.register(P1())
reg.register(P2())
names = [p["name"] for p in reg.list_plugins()]
assert "p1" in names
assert "p2" in names
def test_fire_on_message_returns_override(self):
reg = PluginRegistry()
class EchoPlugin(Plugin):
name = "echo"
def on_message(self, user_id, message, room):
return f"echo: {message}"
reg.register(EchoPlugin())
result = reg.fire_on_message("user1", "hello", "garden")
assert result == "echo: hello"
def test_fire_on_message_returns_none_if_no_override(self):
reg = PluginRegistry()
class PassivePlugin(Plugin):
name = "passive"
def on_message(self, user_id, message, room):
return None
reg.register(PassivePlugin())
result = reg.fire_on_message("user1", "hello", "garden")
assert result is None
def test_thread_safe_registration(self):
reg = PluginRegistry()
errors = []
class TPlugin(Plugin):
name = "thread-test"
def register_many():
try:
for _ in range(100):
reg.register(TPlugin())
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=register_many) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
assert reg.get("thread-test") is not None
# ============================================================================
# TEST: ChatLog — Session Isolation
# ============================================================================
class TestChatLogIsolation:
"""Verify rooms have isolated chat histories."""
def test_rooms_are_isolated(self):
log = ChatLog(max_per_room=50)
log.log("garden", "say", "Hello from garden", user_id="user1")
log.log("tower", "say", "Hello from tower", user_id="user2")
garden_history = log.get_history("garden")
tower_history = log.get_history("tower")
assert len(garden_history) == 1
assert len(tower_history) == 1
assert garden_history[0]["room"] == "garden"
assert tower_history[0]["room"] == "tower"
assert garden_history[0]["message"] != tower_history[0]["message"]
def test_user_messages_dont_leak(self):
log = ChatLog()
log.log("garden", "say", "Private message", user_id="user1")
log.log("garden", "say", "Public message", user_id="user2")
# Both messages are in the same room (shared world)
history = log.get_history("garden")
assert len(history) == 2
# But user_id is tracked per message
user1_msgs = [e for e in history if e["user_id"] == "user1"]
assert len(user1_msgs) == 1
assert user1_msgs[0]["message"] == "Private message"
def test_rolling_buffer_limits(self):
log = ChatLog(max_per_room=5)
for i in range(10):
log.log("garden", "say", f"msg {i}")
history = log.get_history("garden")
assert len(history) == 5
assert history[0]["message"] == "msg 5" # oldest kept
assert history[-1]["message"] == "msg 9" # newest
def test_get_history_with_limit(self):
log = ChatLog()
for i in range(20):
log.log("garden", "say", f"msg {i}")
history = log.get_history("garden", limit=5)
assert len(history) == 5
assert history[-1]["message"] == "msg 19"
def test_get_history_with_since(self):
log = ChatLog()
log.log("garden", "say", "old message")
time.sleep(0.01)
cutoff = datetime.now().isoformat()
time.sleep(0.01)
log.log("garden", "say", "new message")
history = log.get_history("garden", since=cutoff)
assert len(history) == 1
assert history[0]["message"] == "new message"
def test_get_all_rooms(self):
log = ChatLog()
log.log("garden", "say", "msg1")
log.log("tower", "say", "msg2")
log.log("forge", "say", "msg3")
rooms = log.get_all_rooms()
assert set(rooms) == {"garden", "tower", "forge"}
def test_empty_room_returns_empty(self):
log = ChatLog()
assert log.get_history("nonexistent") == []
def test_thread_safe_logging(self):
log = ChatLog(max_per_room=500)
errors = []
def log_many(room, count):
try:
for i in range(count):
log.log(room, "say", f"{room} msg {i}")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=log_many, args=("garden", 50)),
threading.Thread(target=log_many, args=("tower", 50)),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
assert len(log.get_history("garden")) == 50
assert len(log.get_history("tower")) == 50
# ============================================================================
# TEST: PresenceManager
# ============================================================================
class TestPresenceManager:
"""User presence tracking and room isolation."""
def test_enter_room(self):
pm = PresenceManager()
result = pm.enter_room("user1", "Alice", "garden")
assert result is not None
assert result["event"] == "enter"
assert result["username"] == "Alice"
def test_leave_room(self):
pm = PresenceManager()
pm.enter_room("user1", "Alice", "garden")
result = pm.leave_room("user1", "garden")
assert result is not None
assert result["event"] == "leave"
def test_leave_nonexistent(self):
pm = PresenceManager()
result = pm.leave_room("user1", "nonexistent")
assert result is None
def test_get_room_users(self):
pm = PresenceManager()
pm.enter_room("user1", "Alice", "garden")
pm.enter_room("user2", "Bob", "garden")
pm.enter_room("user3", "Charlie", "tower")
garden_players = pm.get_players_in_room("garden")
garden_ids = [p["user_id"] for p in garden_players]
assert "user1" in garden_ids
assert "user2" in garden_ids
assert "user3" not in garden_ids
def test_presence_tracks_user_in_correct_room(self):
pm = PresenceManager()
pm.enter_room("user1", "Alice", "garden")
pm.enter_room("user2", "Bob", "tower")
garden_players = pm.get_players_in_room("garden")
tower_players = pm.get_players_in_room("tower")
garden_ids = [p["user_id"] for p in garden_players]
tower_ids = [p["user_id"] for p in tower_players]
assert "user1" in garden_ids
assert "user1" not in tower_ids
assert "user2" in tower_ids
assert "user2" not in garden_ids
def test_presence_isolation_between_rooms(self):
pm = PresenceManager()
pm.enter_room("user1", "Alice", "garden")
pm.enter_room("user2", "Bob", "tower")
garden = pm.get_players_in_room("garden")
tower = pm.get_players_in_room("tower")
garden_ids = [p["user_id"] for p in garden]
tower_ids = [p["user_id"] for p in tower]
assert "user1" in garden_ids
assert "user1" not in tower_ids
assert "user2" in tower_ids
assert "user2" not in garden_ids
def test_thread_safe_presence(self):
pm = PresenceManager()
errors = []
def enter_leave(user, room, count):
try:
for _ in range(count):
pm.enter_room(user, f"user-{user}", room)
pm.leave_room(user, room)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=enter_leave, args=(f"u{i}", f"room-{i % 3}", 50))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
# ============================================================================
# TEST: Concurrent Multi-User Simulation
# ============================================================================
class TestConcurrentUsers:
"""Simulate multiple users interacting simultaneously."""
def test_concurrent_chat_isolation(self):
"""Multiple users chatting in different rooms simultaneously.
Verifies rooms are isolated — messages don't cross room boundaries."""
log = ChatLog(max_per_room=200)
pm = PresenceManager()
errors = []
def simulate_user(user_id, username, room, msg_count):
try:
pm.enter_room(user_id, username, room)
for i in range(msg_count):
log.log(room, "say", f"{username}: message {i}", user_id=user_id)
pm.leave_room(user_id, room)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=simulate_user, args=("u1", "Alice", "garden", 20)),
threading.Thread(target=simulate_user, args=("u2", "Bob", "tower", 20)),
threading.Thread(target=simulate_user, args=("u3", "Diana", "garden", 20)),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
# Verify room isolation: garden has Alice+Diana, tower has only Bob
garden_history = log.get_history("garden")
tower_history = log.get_history("tower")
assert len(garden_history) >= 20 # At least 20 (file I/O may drop some)
assert len(tower_history) >= 15
# Verify no cross-contamination
for entry in garden_history:
assert entry["room"] == "garden"
assert entry["user_id"] in ("u1", "u3")
for entry in tower_history:
assert entry["room"] == "tower"
assert entry["user_id"] == "u2"