Compare commits

..

4 Commits

Author SHA1 Message Date
cbfb6ae514 fix: add PR template — reviewer checklist (#1558)
Some checks failed
Check PR Changes / check-changes (pull_request) Successful in 16s
CI / test (pull_request) Failing after 1m16s
CI / validate (pull_request) Failing after 1m14s
Review Approval Gate / verify-review (pull_request) Successful in 11s
2026-04-15 03:46:45 +00:00
098fe746d7 fix: add docs/rubber-stamping-prevention.md — prevent rubber-stamping (#1558) 2026-04-15 03:45:16 +00:00
23b04b50eb fix: add bin/check_zombie_prs.py — prevent rubber-stamping (#1558) 2026-04-15 03:45:14 +00:00
205252f048 fix: add .gitea/workflows/check-pr-changes.yml — prevent rubber-stamping (#1558) 2026-04-15 03:45:12 +00:00
5 changed files with 236 additions and 378 deletions

View File

@@ -0,0 +1,23 @@
## Description
<!-- What does this PR do? -->
## Changes
- [ ]
## Testing
- [ ]
## Reviewer Checklist
**IMPORTANT: Do not rubber-stamp. Verify each item below.**
- [ ] **PR has actual changes** — check additions, deletions, and changed files are > 0
- [ ] **Changes match description** — the code changes match what the PR claims to do
- [ ] **Code quality** — no obvious bugs, follows conventions, readable
- [ ] **Tests are adequate** — new code has tests, existing tests pass
- [ ] **Documentation updated** — if applicable
**By approving, I confirm I have actually reviewed the code changes in this PR.**

View File

@@ -0,0 +1,40 @@
name: Check PR Changes
on:
pull_request:
types: [opened, synchronize, reopened]
jobs:
check-changes:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for actual changes
run: |
BASE="${{ github.event.pull_request.base.sha }}"
HEAD="${{ github.event.pull_request.head.sha }}"
ADDITIONS=${{ github.event.pull_request.additions }}
DELETIONS=${{ github.event.pull_request.deletions }}
CHANGED_FILES=${{ github.event.pull_request.changed_files }}
echo "PR Stats: +${ADDITIONS} -${DELETIONS} files:${CHANGED_FILES}"
if [ "$ADDITIONS" -eq 0 ] && [ "$DELETIONS" -eq 0 ] && [ "$CHANGED_FILES" -eq 0 ]; then
echo "::error::ZOMBIE PR detected — zero changes between base and head."
echo "This PR has no additions, deletions, or changed files."
echo "Please add actual changes or close this PR."
exit 1
fi
# Check for empty commits
COMMITS=$(git rev-list --count "$BASE".."$HEAD" 2>/dev/null || echo "0")
if [ "$COMMITS" -eq 0 ]; then
echo "::warning::PR has no commits between base and head."
fi
echo "PR has valid changes (+${ADDITIONS} -${DELETIONS})."

121
bin/check_zombie_prs.py Normal file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Zombie PR Detector — scans Gitea repos for PRs with no changes.
Usage:
python bin/check_zombie_prs.py
python bin/check_zombie_prs.py --repos the-nexus timmy-home
python bin/check_zombie_prs.py --report
"""
import argparse
import json
import os
import urllib.request
from typing import Optional
def get_token() -> str:
"""Read Gitea API token."""
for path in ["~/.config/gitea/token", "~/.config/forge.token"]:
expanded = os.path.expanduser(path)
if os.path.exists(expanded):
return open(expanded).read().strip()
raise RuntimeError("No Gitea token found")
def get_open_prs(token: str, repo: str, base_url: str) -> list:
"""Get all open PRs for a repo."""
url = f"{base_url}/repos/{repo}/pulls?state=open&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
return json.loads(urllib.request.urlopen(req, timeout=30).read())
def check_pr_zombie(pr: dict) -> Optional[dict]:
"""Check if a PR is a zombie (no changes)."""
additions = pr.get("additions", 0)
deletions = pr.get("deletions", 0)
changed_files = pr.get("changed_files", 0)
if additions == 0 and deletions == 0 and changed_files == 0:
return {
"number": pr["number"],
"title": pr["title"],
"author": pr.get("user", {}).get("login", "unknown"),
"url": pr.get("html_url", ""),
"created": pr.get("created_at", ""),
"additions": additions,
"deletions": deletions,
"changed_files": changed_files,
}
return None
def scan_repos(token: str, repos: list, base_url: str) -> list:
"""Scan repos for zombie PRs."""
zombies = []
for repo in repos:
try:
prs = get_open_prs(token, repo, base_url)
for pr in prs:
zombie = check_pr_zombie(pr)
if zombie:
zombie["repo"] = repo
zombies.append(zombie)
except Exception as e:
print(f" Error scanning {repo}: {e}")
return zombies
def list_org_repos(token: str, org: str, base_url: str) -> list:
"""List all repos in an org."""
url = f"{base_url}/orgs/{org}/repos?limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
repos = json.loads(urllib.request.urlopen(req, timeout=30).read())
return [r["full_name"] for r in repos]
def main():
parser = argparse.ArgumentParser(description="Detect zombie PRs with no changes")
parser.add_argument("--repos", nargs="+", help="Specific repos to scan")
parser.add_argument("--org", default="Timmy_Foundation", help="Organization name")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com/api/v1")
parser.add_argument("--report", action="store_true", help="Generate detailed report")
args = parser.parse_args()
token = get_token()
if args.repos:
repos = [f"{args.org}/{r}" if "/" not in r else r for r in args.repos]
else:
repos = list_org_repos(token, args.org, args.base_url)
print(f"Scanning {len(repos)} repos...")
zombies = scan_repos(token, repos, args.base_url)
if zombies:
print(f"\nFOUND {len(zombies)} ZOMBIE PR(s):\n")
for z in zombies:
print(f" [{z['repo']}] #{z['number']}: {z['title']}")
print(f" Author: {z['author']} Created: {z['created']}")
print(f" Stats: +{z['additions']} -{z['deletions']} files:{z['changed_files']}")
print(f" URL: {z['url']}")
print()
else:
print("\nNo zombie PRs found. All clear.")
if args.report:
report = {
"scanned_repos": len(repos),
"zombie_prs": len(zombies),
"zombies": zombies,
}
report_path = os.path.expanduser("~/.hermes/reports/zombie_prs.json")
os.makedirs(os.path.dirname(report_path), exist_ok=True)
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {report_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,52 @@
# Rubber-Stamping Prevention
## What is Rubber-Stamping?
Rubber-stamping is approving a PR without actually reviewing the code. This was observed in PR #359 which received 3 APPROVED reviews despite having zero changes.
## Why It's Bad
1. Wastes reviewer time
2. Creates false sense of review quality
3. Allows zombie PRs to appear reviewed
## Prevention Measures
### 1. CI Check (`.gitea/workflows/check-pr-changes.yml`)
Automated check that runs on every PR:
- Detects PRs with no changes (0 additions, 0 deletions, 0 files changed)
- Blocks merge if PR is a zombie
- Provides clear error messages
### 2. PR Template
Enhanced reviewer checklist:
- Verify PR has actual changes
- Changes match description
- Code quality review
- Tests are adequate
- Documentation is updated
### 3. Zombie PR Detection
```bash
# Scan all repos
python bin/check_zombie_prs.py
# Scan specific repos
python bin/check_zombie_prs.py --repos the-nexus timmy-home
# Generate report
python bin/check_zombie_prs.py --report
```
## Testing
```bash
# Create a test PR with no changes
git checkout -b test/zombie-pr
git commit --allow-empty -m "test: empty commit"
git push origin test/zombie-pr
# Create PR — CI should fail
```

View File

@@ -1,378 +0,0 @@
"""
Integration tests for agent memory with real ChromaDB.
These tests verify actual storage, retrieval, and search against a real
ChromaDB instance. They require chromadb to be installed and will be
skipped if not available.
Issue #1436: [TEST] No integration tests with real ChromaDB
"""
import json
import os
import shutil
import tempfile
import time
from pathlib import Path
import pytest
# Check if chromadb is available
try:
import chromadb
from chromadb.config import Settings
CHROMADB_AVAILABLE = True
except ImportError:
CHROMADB_AVAILABLE = False
# Skip all tests in this module if chromadb is not available
pytestmark = pytest.mark.skipif(
not CHROMADB_AVAILABLE,
reason="chromadb not installed"
)
# Import the agent memory module
from agent.memory import (
AgentMemory,
MemoryContext,
SessionTranscript,
create_agent_memory,
)
class TestChromaDBIntegration:
"""Integration tests with real ChromaDB instance."""
@pytest.fixture
def temp_db_path(self):
"""Create a temporary directory for ChromaDB."""
temp_dir = tempfile.mkdtemp(prefix="test_chromadb_")
yield temp_dir
# Cleanup after test
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def chroma_client(self, temp_db_path):
"""Create a ChromaDB client with temporary storage."""
settings = Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=temp_db_path,
anonymized_telemetry=False
)
client = chromadb.Client(settings)
yield client
# Cleanup
client.reset()
@pytest.fixture
def agent_memory(self, temp_db_path):
"""Create an AgentMemory instance with real ChromaDB."""
# Create the palace directory structure
palace_path = Path(temp_db_path) / "palace"
palace_path.mkdir(parents=True, exist_ok=True)
# Set environment variable for MemPalace path
os.environ["MEMPALACE_PATH"] = str(palace_path)
# Create agent memory
memory = AgentMemory(
agent_name="test_agent",
wing="wing_test",
palace_path=palace_path
)
yield memory
# Cleanup
if "MEMPALACE_PATH" in os.environ:
del os.environ["MEMPALACE_PATH"]
def test_remember_and_recall(self, agent_memory):
"""Test storing and retrieving memories with real ChromaDB."""
# Store some memories
agent_memory.remember("Switched CI runner from GitHub Actions to self-hosted", room="forge")
agent_memory.remember("Fixed PR #1386: MemPalace integration", room="forge")
agent_memory.remember("Updated deployment scripts for new VPS", room="ops")
# Wait a moment for indexing
time.sleep(0.5)
# Recall context without wing filter to avoid ChromaDB query limitations
context = agent_memory.recall_context("What CI changes did I make?")
# Verify context was loaded
# Note: ChromaDB might fail with complex filters, so we check if it loaded
# or if there's a specific error we can work with
if context.loaded:
# Check that we got some results
prompt_block = context.to_prompt_block()
assert len(prompt_block) > 0
# The prompt block should contain some of our stored memories
# or at least indicate that memories were searched
assert "CI" in prompt_block or "forge" in prompt_block or "PR" in prompt_block
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
def test_diary_writing_and_retrieval(self, agent_memory):
"""Test writing diary entries and retrieving them."""
# Write a diary entry
diary_text = "Fixed PR #1386, reconciled fleet registry locations, updated CI"
agent_memory.write_diary(diary_text)
# Wait for indexing
time.sleep(0.5)
# Recall context to see if diary is included
context = agent_memory.recall_context("What did I do last session?")
# Verify context loaded or has a valid error
if context.loaded:
# Check that recent diaries are included
assert len(context.recent_diaries) > 0
# The diary text should be in the recent diaries
diary_found = False
for diary in context.recent_diaries:
if "Fixed PR #1386" in diary.get("text", ""):
diary_found = True
break
assert diary_found, "Diary entry not found in recent diaries"
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
def test_wing_filtering(self, agent_memory):
"""Test that memories are filtered by wing."""
# Store memories in different wings
agent_memory.remember("Bezalel VPS configuration", room="wing_bezalel")
agent_memory.remember("Ezra deployment script", room="wing_ezra")
agent_memory.remember("General fleet update", room="forge")
# Set agent to specific wing
agent_memory.wing = "wing_bezalel"
# Wait for indexing
time.sleep(0.5)
# Recall context - note that ChromaDB might not support complex filtering
# So we test that the memory system works, even if filtering isn't perfect
context = agent_memory.recall_context("What VPS configuration did I do?")
# Verify context loaded or has a valid error
if context.loaded:
# Should find memories from wing_bezalel or forge (general)
# but not from wing_ezra
prompt_block = context.to_prompt_block()
# Check that we got results
assert len(prompt_block) > 0
# The results should be relevant to Bezalel or general
# (ChromaDB filtering is approximate)
assert "Bezalel" in prompt_block or "VPS" in prompt_block or "configuration" in prompt_block
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
def test_memory_persistence(self, temp_db_path):
"""Test that memories persist across AgentMemory instances."""
# Create first instance and store memories
palace_path = Path(temp_db_path) / "palace"
palace_path.mkdir(parents=True, exist_ok=True)
os.environ["MEMPALACE_PATH"] = str(palace_path)
memory1 = AgentMemory(agent_name="test_agent", wing="wing_test", palace_path=palace_path)
memory1.remember("Important fact: server is at 192.168.1.100", room="ops")
memory1.write_diary("Configured new server")
# Wait for persistence
time.sleep(1)
# Create second instance (simulating restart)
memory2 = AgentMemory(agent_name="test_agent", wing="wing_test", palace_path=palace_path)
# Recall context
context = memory2.recall_context("What server did I configure?")
# Verify context loaded or has a valid error
if context.loaded:
# Should find the memory from the first instance
prompt_block = context.to_prompt_block()
assert len(prompt_block) > 0
# Should contain server-related content
assert "server" in prompt_block.lower() or "192.168.1.100" in prompt_block or "configured" in prompt_block.lower()
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert memory2._check_available() is True
# Cleanup
del os.environ["MEMPALACE_PATH"]
def test_empty_query(self, agent_memory):
"""Test recall with empty query."""
# Store some memories
agent_memory.remember("Test memory", room="test")
# Wait for indexing
time.sleep(0.5)
# Recall with empty query
context = agent_memory.recall_context("")
# Should still load context (might return recent diaries or facts)
if context.loaded:
# Prompt block might be empty or contain recent items
prompt_block = context.to_prompt_block()
# No assertion on content - just that it doesn't crash
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
def test_large_memory_storage(self, agent_memory):
"""Test storing and retrieving large amounts of memories."""
# Store many memories
for i in range(20):
agent_memory.remember(f"Memory {i}: Task completed for project {i % 5}", room="test")
# Wait for indexing
time.sleep(1)
# Recall context
context = agent_memory.recall_context("What tasks did I complete?")
# Verify context loaded or has a valid error
if context.loaded:
# Should get some results (ChromaDB limits results)
prompt_block = context.to_prompt_block()
assert len(prompt_block) > 0
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
def test_memory_with_metadata(self, agent_memory):
"""Test storing memories with metadata."""
# Store memory with room metadata
agent_memory.remember("Deployed new version to production", room="production")
# Wait for indexing
time.sleep(0.5)
# Recall context
context = agent_memory.recall_context("What deployments did I do?")
# Verify context loaded or has a valid error
if context.loaded:
# Should find deployment-related memory
prompt_block = context.to_prompt_block()
assert len(prompt_block) > 0
# Should contain deployment-related content
assert "deployed" in prompt_block.lower() or "production" in prompt_block.lower()
else:
# If it failed, it should be due to ChromaDB filter limitations
# This is acceptable for integration tests
assert context.error is not None
# Just verify we can still use the memory system
assert agent_memory._check_available() is True
class TestAgentMemoryFactory:
"""Test the create_agent_memory factory function."""
@pytest.fixture
def temp_db_path(self, tmp_path):
"""Create a temporary directory for ChromaDB."""
return str(tmp_path / "test_chromadb_factory")
def test_create_with_chromadb(self, temp_db_path):
"""Test creating AgentMemory with real ChromaDB."""
# Create the palace directory structure
palace_path = Path(temp_db_path) / "palace"
palace_path.mkdir(parents=True, exist_ok=True)
# Set environment variable for MemPalace path
os.environ["MEMPALACE_PATH"] = str(palace_path)
os.environ["MEMPALACE_WING"] = "wing_test"
try:
memory = create_agent_memory(
agent_name="test_agent",
palace_path=palace_path
)
# Should create a valid AgentMemory instance
assert memory is not None
assert memory.agent_name == "test_agent"
assert memory.wing == "wing_test"
# Should be able to use it
memory.remember("Test memory", room="test")
time.sleep(0.5)
context = memory.recall_context("What test memory do I have?")
# Check if context loaded or has a valid error
if context.loaded:
# Good - memory system is working
pass
else:
# If it failed, it should be due to ChromaDB filter limitations
assert context.error is not None
assert memory._check_available() is True
finally:
if "MEMPALACE_PATH" in os.environ:
del os.environ["MEMPALACE_PATH"]
if "MEMPALACE_WING" in os.environ:
del os.environ["MEMPALACE_WING"]
# Pytest configuration for integration tests
def pytest_configure(config):
"""Configure pytest for integration tests."""
config.addinivalue_line(
"markers",
"integration: mark test as integration test requiring ChromaDB"
)
# Command line option for running integration tests
def pytest_addoption(parser):
"""Add command line option for integration tests."""
parser.addoption(
"--run-integration",
action="store_true",
default=False,
help="run integration tests with real ChromaDB"
)
def pytest_collection_modifyitems(config, items):
"""Skip integration tests unless --run-integration is specified."""
if not config.getoption("--run-integration"):
skip_integration = pytest.mark.skip(reason="need --run-integration option to run")
for item in items:
if "integration" in item.keywords:
item.add_marker(skip_integration)