Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
52e3f6a253 fix: add test suite for quality gate modules (#629)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 35s
Smoke Test / smoke (pull_request) Failing after 25s
Validate Config / YAML Lint (pull_request) Failing after 17s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m23s
Validate Config / Shell Script Lint (pull_request) Failing after 54s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
PR Checklist / pr-checklist (pull_request) Failing after 6m34s
Validate Config / Python Test Suite (pull_request) Has been cancelled
Architecture Lint / Lint Repository (pull_request) Has been cancelled
34 tests covering both quality gate modules:

ci_automation_gate.py (17 tests):
- Trailing whitespace detection and auto-fix
- Missing final newline detection and fix
- JS/TS function length warnings (20 lines) and failures (50 lines)
- Python files excluded from JS function length check
- File type filtering (.py, .js, .ts checked; .md, .json skipped)
- Directory traversal with node_modules and .git exclusions
- Exit codes (0 for clean, 1 for failures)

task_gate.py (17 tests):
- Filter tag detection (EPIC, DO NOT CLOSE, PERMANENT, MORNING REPORT)
- Agent lane checking (no config, missing agent, present agent)
- Pre-task gate: issue not found, filter tags, assigned agents,
  existing PRs, clean pass
- Post-task gate: missing branch, naming convention warning,
  no commits failure
- Integration: gate runs on its own source files

Closes #629
2026-04-14 21:18:39 -04:00
8 changed files with 468 additions and 307 deletions

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Full Nostr agent-to-agent communication demo - FINAL WORKING
"""

View File

@@ -1,271 +0,0 @@
#!/usr/bin/env python3
"""
Pre-Flight Provider Check Script
Issue #508: [Robustness] Credential drain detection — provider health checks
Pre-flight check before session launch: verifies provider credentials and balance.
Usage:
python3 preflight-provider-check.py # Check all providers
python3 preflight-provider-check.py --launch # Check and return exit code
python3 preflight-provider-check.py --balance # Check OpenRouter balance
"""
import os, sys, json, yaml, urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
LOG_DIR = Path.home() / ".local" / "timmy" / "fleet-health"
LOG_FILE = LOG_DIR / "preflight-check.log"
def log(msg):
"""Log message to file and optionally console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def check_openrouter_balance(api_key):
"""Check OpenRouter balance via /api/v1/auth/key."""
if not api_key:
return False, "No API key", 0
try:
req = urllib.request.Request(
"https://openrouter.ai/api/v1/auth/key",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
data = json.loads(resp.read())
# Check for credits
credits = data.get("data", {}).get("limit", 0)
usage = data.get("data", {}).get("usage", 0)
remaining = credits - usage if credits else None
if remaining is not None and remaining <= 0:
return False, "No credits remaining", 0
elif remaining is not None:
return True, "Credits available", remaining
else:
return True, "Unlimited or unknown balance", None
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key", 0
else:
return False, "HTTP " + str(e.code), 0
except Exception as e:
return False, str(e)[:100], 0
def check_nous_key(api_key):
"""Check Nous API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://inference.nousresearch.com/v1/models",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_anthropic_key(api_key):
"""Check Anthropic API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://api.anthropic.com/v1/models",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_ollama():
"""Check if Ollama is running."""
try:
req = urllib.request.Request("http://localhost:11434/api/tags")
resp = urllib.request.urlopen(req, timeout=5)
if resp.status == 200:
data = json.loads(resp.read())
models = data.get("models", [])
return True, str(len(models)) + " models loaded"
else:
return False, "HTTP " + str(resp.status)
except Exception as e:
return False, str(e)[:100]
def get_configured_provider():
"""Get the configured provider from global config."""
config_file = HERMES_HOME / "config.yaml"
if not config_file.exists():
return None
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if isinstance(model_config, dict):
return model_config.get("provider")
except:
pass
return None
def run_preflight_check():
"""Run pre-flight check on all providers."""
log("=== Pre-Flight Provider Check ===")
results = {}
# Check OpenRouter
or_key = get_provider_api_key("openrouter")
or_ok, or_msg, or_balance = check_openrouter_balance(or_key)
results["openrouter"] = {"healthy": or_ok, "message": or_msg, "balance": or_balance}
# Check Nous
nous_key = get_provider_api_key("nous")
nous_ok, nous_msg = check_nous_key(nous_key)
results["nous"] = {"healthy": nous_ok, "message": nous_msg}
# Check Anthropic
anthropic_key = get_provider_api_key("anthropic")
anthropic_ok, anthropic_msg = check_anthropic_key(anthropic_key)
results["anthropic"] = {"healthy": anthropic_ok, "message": anthropic_msg}
# Check Ollama
ollama_ok, ollama_msg = check_ollama()
results["ollama"] = {"healthy": ollama_ok, "message": ollama_msg}
# Get configured provider
configured = get_configured_provider()
# Summary
healthy_count = sum(1 for r in results.values() if r["healthy"])
total_count = len(results)
log("Results: " + str(healthy_count) + "/" + str(total_count) + " providers healthy")
for provider, result in results.items():
status = "HEALTHY" if result["healthy"] else "UNHEALTHY"
extra = ""
if provider == "openrouter" and result.get("balance") is not None:
extra = " (balance: " + str(result["balance"]) + ")"
log(" " + provider + ": " + status + " - " + result["message"] + extra)
if configured:
log("Configured provider: " + configured)
if configured in results and not results[configured]["healthy"]:
log("WARNING: Configured provider " + configured + " is UNHEALTHY!")
return results, configured
def check_launch_readiness():
"""Check if we're ready to launch sessions."""
results, configured = run_preflight_check()
# Check if configured provider is healthy
if configured and configured in results:
if not results[configured]["healthy"]:
log("LAUNCH BLOCKED: Configured provider " + configured + " is unhealthy")
return False, configured + " is unhealthy"
# Check if at least one provider is healthy
healthy_providers = [p for p, r in results.items() if r["healthy"]]
if not healthy_providers:
log("LAUNCH BLOCKED: No healthy providers available")
return False, "No healthy providers"
log("LAUNCH READY: " + str(len(healthy_providers)) + " healthy providers available")
return True, "Ready"
def show_balance():
"""Show OpenRouter balance."""
api_key = get_provider_api_key("openrouter")
if not api_key:
print("No OpenRouter API key found")
return
ok, msg, balance = check_openrouter_balance(api_key)
if ok:
if balance is not None:
print("OpenRouter balance: " + str(balance) + " credits")
else:
print("OpenRouter: " + msg)
else:
print("OpenRouter: " + msg)
def main():
if "--balance" in sys.argv:
show_balance()
elif "--launch" in sys.argv:
ready, message = check_launch_readiness()
if ready:
print("READY")
sys.exit(0)
else:
print("BLOCKED: " + message)
sys.exit(1)
else:
run_preflight_check()
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Soul Eval Gate — The Conscience of the Training Pipeline

View File

@@ -196,37 +196,7 @@
"paused_reason": null,
"skills": [],
"skill": null
},
{
"id": "tmux-supervisor-513",
"name": "Autonomous Cron Supervisor",
"prompt": "Load the tmux-supervisor skill and execute the monitoring protocol.\n\nCheck both `dev` and `timmy` tmux sessions for idle panes. Only send Telegram notifications on actionable events (idle, overflow, failure). Be silent when all agents are working.\n\nSteps:\n1. List all tmux sessions (skip 'Alexander')\n2. For each session, list windows and panes\n3. Capture each pane and classify state (idle vs active)\n4. For idle panes: read context, craft context-aware prompt\n5. Send /queue prompts to idle panes\n6. Verify prompts landed\n7. Only notify via Telegram if:\n - A pane was prompted (idle detected)\n - A pane shows context overflow (>80%)\n - A pane is stuck or crashed\n8. If all panes are active: respond with [SILENT]",
"schedule": {
"kind": "interval",
"minutes": 7,
"display": "every 7m"
},
"schedule_display": "every 7m",
"repeat": {
"times": null,
"completed": 0
},
"enabled": true,
"created_at": "2026-04-15T03:00:00.000000+00:00",
"next_run_at": null,
"last_run_at": null,
"last_status": null,
"last_error": null,
"deliver": "telegram",
"origin": null,
"state": "scheduled",
"paused_at": null,
"paused_reason": null,
"skills": [
"tmux-supervisor"
],
"skill": "tmux-supervisor"
}
],
"updated_at": "2026-04-13T02:00:00+00:00"
}
}

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

467
tests/test_quality_gate.py Normal file
View File

@@ -0,0 +1,467 @@
"""Tests for the Quality Gate modules.
Tests for:
- ci_automation_gate.py: linting, function length, auto-fix, counters
- task_gate.py: pre/post task gate logic, lane checking, filter tags
Refs: #629
"""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add scripts/ to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from ci_automation_gate import QualityGate
# ===========================================================================
# CI AUTOMATION GATE TESTS
# ===========================================================================
# -- helpers ---------------------------------------------------------------
def _write_file(dirpath, relpath, content):
"""Write a file in a temp directory and return its Path."""
p = Path(dirpath) / relpath
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return p
def _run_gate_on_file(dirpath, relpath, content, fix=False):
"""Write a file, run QualityGate on it, return the gate instance."""
p = _write_file(dirpath, relpath, content)
gate = QualityGate(fix=fix)
gate.check_file(p)
return gate
# -- trailing whitespace ---------------------------------------------------
def test_trailing_whitespace_warns():
"""Lines with trailing whitespace should produce a warning."""
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.py", "x = 1 \ny = 2\n")
assert gate.warnings >= 1, "Expected warning for trailing whitespace"
def test_trailing_whitespace_fixes():
"""With fix=True, trailing whitespace should be removed."""
with tempfile.TemporaryDirectory() as tmp:
p = _write_file(tmp, "test.py", "x = 1 \ny = 2\n")
gate = QualityGate(fix=True)
gate.check_file(p)
fixed = p.read_text()
assert "x = 1 \n" not in fixed, "Trailing whitespace should be removed"
assert fixed == "x = 1\ny = 2\n"
def test_clean_file_no_warnings():
"""A clean file should produce no warnings."""
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.py", "x = 1\ny = 2\n")
assert gate.warnings == 0
assert gate.failures == 0
# -- missing final newline -------------------------------------------------
def test_missing_final_newline_warns():
"""File without trailing newline should warn."""
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.py", "x = 1")
assert gate.warnings >= 1, "Expected warning for missing final newline"
def test_missing_final_newline_fixed():
"""With fix=True, missing final newline should be added."""
with tempfile.TemporaryDirectory() as tmp:
p = _write_file(tmp, "test.py", "x = 1")
gate = QualityGate(fix=True)
gate.check_file(p)
fixed = p.read_text()
assert fixed.endswith("\n"), "Fixed file should end with newline"
# -- function length (JS/TS) -----------------------------------------------
def test_short_function_passes():
"""A short JS function should not warn or fail."""
with tempfile.TemporaryDirectory() as tmp:
code = "function hello() {\n return 1;\n}\n"
gate = _run_gate_on_file(tmp, "test.js", code)
assert gate.failures == 0
assert gate.warnings == 0
def test_medium_function_warns():
"""JS function over 20 lines should warn."""
body = "\n".join(f" console.log({i});" for i in range(22))
code = f"function big() {{\n{body}\n}}\n"
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.js", code)
assert gate.warnings >= 1, "Expected warning for function over 20 lines"
def test_long_function_fails():
"""JS function over 50 lines should fail."""
body = "\n".join(f" console.log({i});" for i in range(52))
code = f"function huge() {{\n{body}\n}}\n"
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.js", code)
assert gate.failures >= 1, "Expected failure for function over 50 lines"
def test_python_function_length_not_checked():
"""Python functions should not be checked by the JS regex."""
body = "\n".join(f" print({i})" for i in range(60))
code = f"def huge():\n{body}\n"
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.py", code)
assert gate.failures == 0, "Python functions should not trigger JS length check"
# -- file type filtering ---------------------------------------------------
def test_non_code_file_skipped():
"""Non-code files (.md, .json, .txt) should be skipped."""
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "README.md", "# Title \ntrailing ws\n")
assert gate.warnings == 0, "Markdown files should be skipped"
assert gate.failures == 0
def test_typescript_checked():
"""TypeScript files should be checked."""
with tempfile.TemporaryDirectory() as tmp:
gate = _run_gate_on_file(tmp, "test.ts", "x = 1 \n")
assert gate.warnings >= 1, "TypeScript files should be checked"
# -- directory traversal ---------------------------------------------------
def test_run_scans_directory():
"""Gate.run() should scan all files in a directory tree."""
with tempfile.TemporaryDirectory() as tmp:
_write_file(tmp, "clean.py", "x = 1\n")
_write_file(tmp, "dirty.js", "x = 1 \n")
_write_file(tmp, "sub/nested.ts", "y = 2 \n")
gate = QualityGate()
gate.run(tmp)
assert gate.warnings >= 2, "Should find trailing whitespace in both dirty files"
def test_run_skips_node_modules():
"""Gate.run() should skip node_modules directories."""
with tempfile.TemporaryDirectory() as tmp:
_write_file(tmp, "clean.py", "x = 1\n")
_write_file(tmp, "node_modules/pkg/index.js", "x = 1 \n")
gate = QualityGate()
gate.run(tmp)
assert gate.warnings == 0, "node_modules should be skipped"
def test_run_skips_git_dir():
"""Gate.run() should skip .git directories."""
with tempfile.TemporaryDirectory() as tmp:
_write_file(tmp, "clean.py", "x = 1\n")
_write_file(tmp, ".git/hooks/pre-commit", "x = 1 \n")
gate = QualityGate()
gate.run(tmp)
assert gate.warnings == 0, ".git should be skipped"
# -- exit code -------------------------------------------------------------
def test_failures_cause_exit_code_1():
"""Gate with failures should exit with code 1."""
import subprocess
with tempfile.TemporaryDirectory() as tmp:
body = "\n".join(f" console.log({i});" for i in range(52))
_write_file(tmp, "huge.js", f"function f() {{\n{body}\n}}\n")
r = subprocess.run(
[sys.executable, str(Path(__file__).resolve().parent.parent / "scripts" / "ci_automation_gate.py"), tmp],
capture_output=True, text=True
)
assert r.returncode == 1, f"Expected exit 1, got {r.returncode}"
def test_clean_directory_exits_0():
"""Gate on clean directory should exit 0."""
import subprocess
with tempfile.TemporaryDirectory() as tmp:
_write_file(tmp, "clean.py", "x = 1\ny = 2\n")
r = subprocess.run(
[sys.executable, str(Path(__file__).resolve().parent.parent / "scripts" / "ci_automation_gate.py"), tmp],
capture_output=True, text=True
)
assert r.returncode == 0, f"Expected exit 0, got {r.returncode}"
# ===========================================================================
# TASK GATE TESTS
# ===========================================================================
# Import task_gate functions directly — test the pure logic
from task_gate import check_agent_lane, FILTER_TAGS, AGENT_USERNAMES
# -- filter tags -----------------------------------------------------------
def test_epic_tag_filtered():
"""Issues with [EPIC] tag should be filtered."""
title = "[EPIC] Build the thing"
for tag in FILTER_TAGS:
tag_clean = tag.upper().replace("[", "").replace("]", "")
if tag_clean in title.upper():
return # Found
assert False, "EPIC tag should be detected by FILTER_TAGS"
def test_permanent_tag_filtered():
"""Issues with [DO NOT CLOSE] tag should be filtered."""
title = "[DO NOT CLOSE] Keep this open forever"
title_upper = title.upper()
matched = any(
tag.upper().replace("[", "").replace("]", "") in title_upper
for tag in FILTER_TAGS
)
assert matched, "[DO NOT CLOSE] should be filtered"
def test_normal_title_not_filtered():
"""Normal issue titles should not be filtered."""
title = "Fix the login bug in auth.py"
title_upper = title.upper()
matched = any(
tag.upper().replace("[", "").replace("]", "") in title_upper
for tag in FILTER_TAGS
)
assert not matched, "Normal title should not be filtered"
def test_morning_report_filtered():
"""[MORNING REPORT] issues should be filtered."""
title = "[MORNING REPORT] Fleet status 2026-04-13"
title_upper = title.upper()
matched = any(
tag.upper().replace("[", "").replace("]", "") in title_upper
for tag in FILTER_TAGS
)
assert matched, "[MORNING REPORT] should be filtered"
# -- agent lane checker ----------------------------------------------------
def test_lane_check_no_config():
"""With no lane config, lane check should pass."""
ok, msg = check_agent_lane("groq", "Fix bug", [], {})
assert ok
assert "No lane config" in msg
def test_lane_check_agent_not_in_config():
"""Agent not in lane config should pass."""
lanes = {"ezra": ["docs"]}
ok, msg = check_agent_lane("groq", "Fix bug", [], lanes)
assert ok
assert "No lanes defined" in msg
def test_lane_check_agent_in_config():
"""Agent in lane config should return their lanes."""
lanes = {"groq": ["code", "infra"]}
ok, msg = check_agent_lane("groq", "Fix bug", [], lanes)
assert ok
assert "groq" in msg
assert "code" in msg
# -- agent usernames -------------------------------------------------------
def test_known_agents_in_usernames():
"""Core agent usernames should be registered."""
assert "groq" in AGENT_USERNAMES
assert "ezra" in AGENT_USERNAMES
assert "bezalel" in AGENT_USERNAMES
assert "timmy" in AGENT_USERNAMES
assert "codex-agent" in AGENT_USERNAMES
# -- pre-task gate (mocked API) -------------------------------------------
def test_pre_task_gate_issue_not_found():
"""Pre-task gate should fail if issue doesn't exist."""
from task_gate import pre_task_gate
with patch("task_gate.gitea_get", return_value=None):
passed, msgs = pre_task_gate("timmy-config", 99999, "groq")
assert not passed
assert any("not found" in m for m in msgs)
def test_pre_task_gate_filter_tag_blocks():
"""Pre-task gate should block filtered issues."""
from task_gate import pre_task_gate
mock_issue = {
"title": "[EPIC] Big thing",
"assignees": [],
"labels": [],
}
def mock_gitea_get(path):
if "issues/100" in path:
return mock_issue
if "branches" in path:
return []
if "pulls" in path:
return []
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = pre_task_gate("timmy-config", 100, "groq")
assert not passed
assert any("filter" in m.lower() for m in msgs)
def test_pre_task_gate_assigned_agent_blocks():
"""Pre-task gate should block issues assigned to other agents."""
from task_gate import pre_task_gate
mock_issue = {
"title": "Fix bug",
"assignees": [{"login": "ezra"}],
"labels": [],
}
def mock_gitea_get(path):
if "issues/100" in path:
return mock_issue
if "branches" in path:
return []
if "pulls" in path:
return []
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = pre_task_gate("timmy-config", 100, "groq")
assert not passed
assert any("Already assigned" in m for m in msgs)
def test_pre_task_gate_existing_pr_blocks():
"""Pre-task gate should block issues with existing PRs."""
from task_gate import pre_task_gate
mock_issue = {
"title": "Fix bug",
"assignees": [],
"labels": [],
}
mock_prs = [{"number": 50, "title": "Fix for #100", "body": "Closes #100"}]
def mock_gitea_get(path):
if "issues/100" in path:
return mock_issue
if "branches" in path:
return []
if "pulls" in path:
return mock_prs
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = pre_task_gate("timmy-config", 100, "groq")
assert not passed
assert any("Open PR" in m for m in msgs)
def test_pre_task_gate_clean_passes():
"""Pre-task gate should pass for clean issues."""
from task_gate import pre_task_gate
def mock_gitea_get(path):
if "issues/100" in path:
return {"title": "Fix bug", "assignees": [], "labels": []}
if "branches" in path:
return []
if "pulls" in path:
return []
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = pre_task_gate("timmy-config", 100, "groq")
assert passed
# -- post-task gate (mocked API) ------------------------------------------
def test_post_task_gate_missing_branch():
"""Post-task gate should fail if branch doesn't exist."""
from task_gate import post_task_gate
with patch("task_gate.gitea_get", return_value=None):
passed, msgs = post_task_gate("timmy-config", 100, "groq", "groq/fix-100")
assert not passed
assert any("does not exist" in m for m in msgs)
def test_post_task_gate_no_agent_prefix_warns():
"""Post-task gate should warn if branch doesn't start with agent name."""
from task_gate import post_task_gate
def mock_gitea_get(path):
if "branches/fix-100" in path:
return {"name": "fix-100"}
if "compare" in path:
return {"commits": [{"id": "abc"}], "diff_files": ["file.py"]}
if "pulls" in path:
return []
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = post_task_gate("timmy-config", 100, "groq", "fix-100")
assert passed # Warning, not failure
assert any("doesn't start with agent" in m or "convention" in m for m in msgs)
def test_post_task_gate_no_commits_fails():
"""Post-task gate should fail if branch has no commits ahead of main."""
from task_gate import post_task_gate
def mock_gitea_get(path):
if "branches/" in path:
return {"name": "groq/fix-100"}
if "compare" in path:
return {"commits": [], "diff_files": []}
if "pulls" in path:
return []
return None
with patch("task_gate.gitea_get", side_effect=mock_gitea_get):
passed, msgs = post_task_gate("timmy-config", 100, "groq", "groq/fix-100")
assert not passed
assert any("no commits" in m.lower() for m in msgs)
# ===========================================================================
# INTEGRATION: gate on real script files
# ===========================================================================
def test_ci_gate_on_actual_task_gate():
"""Run QualityGate on task_gate.py itself — should pass."""
gate_path = Path(__file__).resolve().parent.parent / "scripts" / "task_gate.py"
if gate_path.exists():
gate = QualityGate()
gate.check_file(gate_path)
assert gate.failures == 0, f"task_gate.py should pass quality gate, got {gate.failures} failures"
def test_ci_gate_on_actual_ci_automation_gate():
"""Run QualityGate on ci_automation_gate.py itself — should pass."""
gate_path = Path(__file__).resolve().parent.parent / "scripts" / "ci_automation_gate.py"
if gate_path.exists():
gate = QualityGate()
gate.check_file(gate_path)
assert gate.failures == 0, f"ci_automation_gate.py should pass quality gate, got {gate.failures} failures"