[orchestration] Harden the nervous system — full repo coverage, destructive PR guard, dedup

Changes:
1. REPOS expanded from 2 → 7 (all Foundation repos)
   Previously only the-nexus and timmy-config were monitored.
   timmy-home (37 open issues), the-door, turboquant, hermes-agent,
   and .profile were completely invisible to triage, review,
   heartbeat, and watchdog tasks.

2. Destructive PR detection (prevents PR #788 scenario)
   When a PR deletes >50% of any file with >20 lines deleted,
   review_prs flags it with a 🚨 DESTRUCTIVE PR DETECTED comment.
   This is the automated version of what I did manually when closing
   the-nexus PR #788 during the audit.

3. review_prs deduplication (stops comment spam)
   Before this fix, the same rejection comment was posted every 30
   minutes on the same PR, creating unbounded comment spam.
   Now checks list_comments first and skips already-reviewed PRs.

4. heartbeat_tick issue/PR counts fixed (limit=1 → limit=50)
   The old limit=1 + len() always returned 0 or 1, making the
   heartbeat perception useless. Now uses limit=50 and aggregates
   total_open_issues / total_open_prs across all repos.

5. Carries forward all PR #101 bugfixes
   - NET_LINE_LIMIT 10 → 500
   - memory_compress reads decision.get('actions')
   - good_morning_report reads yesterday's ticks

Tests: 11 new tests in tests/test_orchestration_hardening.py.
Full suite: 23/23 pass.

Signed-off-by: gemini <gemini@hermes.local>
This commit is contained in:
2026-03-30 18:53:14 -04:00
parent 877425bde4
commit 118ca5fcbd
2 changed files with 306 additions and 9 deletions

View File

@@ -0,0 +1,238 @@
"""Tests for orchestration hardening (2026-03-30 deep audit pass 3).
Covers:
- REPOS expanded from 2 → 7 (all Foundation repos monitored)
- Destructive PR detection via DESTRUCTIVE_DELETION_THRESHOLD
- review_prs deduplication (no repeat comment spam)
- heartbeat_tick uses limit=50 for real counts
- All PR #101 fixes carried forward (NET_LINE_LIMIT, memory_compress, morning report)
"""
from pathlib import Path
# ── Helpers ──────────────────────────────────────────────────────────
def _read_tasks():
return (Path(__file__).resolve().parent.parent / "tasks.py").read_text()
def _find_global(text, name):
"""Extract a top-level assignment value from tasks.py source."""
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith(name) and "=" in stripped:
_, _, value = stripped.partition("=")
return value.strip()
return None
def _extract_function_body(text, func_name):
"""Extract the body of a function from source code."""
lines = text.splitlines()
in_func = False
indent = None
body = []
for line in lines:
if f"def {func_name}" in line:
in_func = True
indent = len(line) - len(line.lstrip())
body.append(line)
continue
if in_func:
if line.strip() == "":
body.append(line)
elif len(line) - len(line.lstrip()) > indent or line.strip().startswith("#") or line.strip().startswith("\"\"\"") or line.strip().startswith("'"):
body.append(line)
elif line.strip().startswith("@"):
break
elif len(line) - len(line.lstrip()) <= indent and line.strip().startswith("def "):
break
else:
body.append(line)
return "\n".join(body)
# ── Test: REPOS covers all Foundation repos ──────────────────────────
def test_repos_covers_all_foundation_repos():
"""REPOS must include all 7 Timmy_Foundation repos.
Previously only the-nexus and timmy-config were monitored,
meaning 5 repos were completely invisible to triage, review,
heartbeat, and watchdog tasks.
"""
text = _read_tasks()
required_repos = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/the-door",
"Timmy_Foundation/turboquant",
"Timmy_Foundation/hermes-agent",
]
for repo in required_repos:
assert f'"{repo}"' in text, (
f"REPOS missing {repo}. All Foundation repos must be monitored."
)
def test_repos_has_at_least_six_entries():
"""Sanity check: REPOS should have at least 6 repos."""
text = _read_tasks()
count = text.count("Timmy_Foundation/")
# Each repo appears once in REPOS, plus possibly in agent_config or comments
assert count >= 6, (
f"Found only {count} references to Timmy_Foundation repos. "
"REPOS should have at least 6 real repos."
)
# ── Test: Destructive PR detection ───────────────────────────────────
def test_destructive_deletion_threshold_exists():
"""DESTRUCTIVE_DELETION_THRESHOLD must be defined.
This constant controls the deletion ratio above which a PR file
is flagged as destructive (e.g., the PR #788 scenario).
"""
text = _read_tasks()
value = _find_global(text, "DESTRUCTIVE_DELETION_THRESHOLD")
assert value is not None, "DESTRUCTIVE_DELETION_THRESHOLD not found in tasks.py"
threshold = float(value)
assert 0.3 <= threshold <= 0.8, (
f"DESTRUCTIVE_DELETION_THRESHOLD = {threshold} is out of sane range [0.3, 0.8]. "
"0.5 means 'more than half the file is deleted'."
)
def test_review_prs_checks_for_destructive_prs():
"""review_prs must detect destructive PRs (files losing >50% of content).
This is the primary defense against PR #788-style disasters where
an automated workspace sync deletes the majority of working code.
"""
text = _read_tasks()
body = _extract_function_body(text, "review_prs")
assert "destructive" in body.lower(), (
"review_prs does not contain destructive PR detection logic. "
"Must flag PRs where files lose >50% of content."
)
assert "DESTRUCTIVE_DELETION_THRESHOLD" in body, (
"review_prs must use DESTRUCTIVE_DELETION_THRESHOLD constant."
)
# ── Test: review_prs deduplication ───────────────────────────────────
def test_review_prs_deduplicates_comments():
"""review_prs must skip PRs it has already commented on.
Without deduplication, the bot posts the SAME rejection comment
every 30 minutes on the same PR, creating unbounded comment spam.
"""
text = _read_tasks()
body = _extract_function_body(text, "review_prs")
assert "already_reviewed" in body or "already reviewed" in body.lower(), (
"review_prs does not check for already-reviewed PRs. "
"Must skip PRs where bot has already posted a review comment."
)
assert "list_comments" in body, (
"review_prs must call list_comments to check for existing reviews."
)
def test_review_prs_returns_destructive_count():
"""review_prs return value must include destructive_flagged count."""
text = _read_tasks()
body = _extract_function_body(text, "review_prs")
assert "destructive_flagged" in body, (
"review_prs must return destructive_flagged count in its output dict."
)
# ── Test: heartbeat_tick uses real counts ────────────────────────────
def test_heartbeat_tick_uses_realistic_limit():
"""heartbeat_tick must use limit >= 20 for issue/PR counts.
Previously used limit=1 which meant len() always returned 0 or 1.
This made the heartbeat perception useless for tracking backlog growth.
"""
text = _read_tasks()
body = _extract_function_body(text, "heartbeat_tick")
# Check there's no limit=1 in actual code calls (not docstrings)
for line in body.splitlines():
stripped = line.strip()
if stripped.startswith("#") or stripped.startswith("\"\"\"") or stripped.startswith("'"):
continue
if "limit=1" in stripped and ("list_issues" in stripped or "list_pulls" in stripped):
raise AssertionError(
"heartbeat_tick still uses limit=1 for issue/PR counts. "
"This always returns 0 or 1, making counts meaningless."
)
# Check it aggregates totals
assert "total_open_issues" in body or "total_issues" in body, (
"heartbeat_tick should aggregate total issue counts across all repos."
)
# ── Test: NET_LINE_LIMIT sanity (carried from PR #101) ───────────────
def test_net_line_limit_is_sane():
"""NET_LINE_LIMIT = 10 caused every real PR to be spam-rejected."""
text = _read_tasks()
value = _find_global(text, "NET_LINE_LIMIT")
assert value is not None, "NET_LINE_LIMIT not found"
limit = int(value)
assert 200 <= limit <= 2000, (
f"NET_LINE_LIMIT = {limit} is outside sane range [200, 2000]."
)
# ── Test: memory_compress reads correct action path ──────────────────
def test_memory_compress_reads_decision_actions():
"""Actions live in tick_record['decision']['actions'], not tick_record['actions']."""
text = _read_tasks()
body = _extract_function_body(text, "memory_compress")
assert 'decision' in body and 't.get(' in body, (
"memory_compress does not read from t['decision']. "
"Actions are nested under the decision dict."
)
# The OLD bug pattern
for line in body.splitlines():
stripped = line.strip()
if 't.get("actions"' in stripped and 'decision' not in stripped:
raise AssertionError(
"Bug: memory_compress still reads t.get('actions') directly."
)
# ── Test: good_morning_report reads yesterday's ticks ────────────────
def test_good_morning_report_reads_yesterday_ticks():
"""At 6 AM, the morning report should read yesterday's tick log, not today's."""
text = _read_tasks()
body = _extract_function_body(text, "good_morning_report")
assert "timedelta" in body, (
"good_morning_report does not use timedelta to compute yesterday."
)
# Ensure the old bug pattern is gone
for line in body.splitlines():
stripped = line.strip()
if "yesterday = now.strftime" in stripped and "timedelta" not in stripped:
raise AssertionError(
"Bug: good_morning_report still sets yesterday = now.strftime()."
)
# ── Test: review_prs includes file list in rejection ─────────────────
def test_review_prs_rejection_includes_file_list():
"""Rejection comments should include file names for actionability."""
text = _read_tasks()
body = _extract_function_body(text, "review_prs")
assert "file_list" in body and "filename" in body, (
"review_prs rejection comment should include a file_list."
)