Compare commits

..

7 Commits

Author SHA1 Message Date
6cb611f2b5 test: deploy config validator tests (#690)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 36s
Smoke Test / smoke (pull_request) Failing after 20s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 10s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m15s
PR Checklist / pr-checklist (pull_request) Failing after 8m16s
Validate Config / Shell Script Lint (pull_request) Failing after 1m9s
Validate Config / Cron Syntax Check (pull_request) Successful in 18s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 17s
Validate Config / Playbook Schema Validation (pull_request) Successful in 32s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-15 03:23:06 +00:00
477fc6ce6f feat: sidecar config validation on deploy (#690) 2026-04-15 03:21:37 +00:00
d120526244 fix: add python3 shebang to scripts/visual_pr_reviewer.py (#681) 2026-04-15 02:57:53 +00:00
8596ff761b fix: add python3 shebang to scripts/diagram_meaning_extractor.py (#681) 2026-04-15 02:57:40 +00:00
7553fd4f3e fix: add python3 shebang to scripts/captcha_bypass_handler.py (#681) 2026-04-15 02:57:25 +00:00
71082fe06f fix: add python3 shebang to bin/soul_eval_gate.py (#681) 2026-04-15 02:57:14 +00:00
6d678e938e fix: add python3 shebang to bin/nostr-agent-demo.py (#681) 2026-04-15 02:57:00 +00:00
9 changed files with 483 additions and 523 deletions

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
"""
Full Nostr agent-to-agent communication demo - FINAL WORKING
"""

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
"""
Soul Eval Gate — The Conscience of the Training Pipeline

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
"""
deploy_config_validator.py — Pre-deploy config validation for timmy-config sidecar.
Validates YAML config before writing during deploy. Checks:
1. YAML syntax (pyyaml safe_load)
2. Required keys exist for the config type
3. Value types match expected schema
4. No banned providers referenced
5. Provider chain is well-formed
Usage:
# Validate a config file before deploy
python3 scripts/deploy_config_validator.py config.yaml
# Validate stdin (piped from deploy script)
cat config.yaml | python3 scripts/deploy_config_validator.py -
# Validate with expected type
python3 scripts/deploy_config_validator.py --type hermes config.yaml
# JSON output for CI/CD
python3 scripts/deploy_config_validator.py --json config.yaml
Exit codes:
0 — config is valid
1 — validation failed (errors printed to stderr)
2 — usage error
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Any
try:
import yaml
except ImportError:
print("ERROR: PyYAML not installed. Run: pip install pyyaml", file=sys.stderr)
sys.exit(2)
# ── Schema Definitions ────────────────────────────────────────────────────────
# Required keys per config type
REQUIRED_KEYS = {
"hermes": {
"providers": {"type": list, "description": "List of provider configurations"},
},
"wizard": {
"providers": {"type": list, "description": "List of provider configurations"},
},
"ansible_inventory": {
"all": {"type": dict, "description": "Top-level inventory structure"},
},
"cron": {
"jobs": {"type": list, "description": "List of cron job definitions"},
},
"playbook": {
"name": {"type": str, "description": "Playbook name"},
},
"any": {}, # No required keys for generic validation
}
# Provider schema — each provider must have these keys
PROVIDER_REQUIRED = {"name", "model", "base_url"}
PROVIDER_ALLOWED_TYPES = {
"name": str,
"model": str,
"base_url": str,
"api_key_env": str,
"timeout": (int, float),
"reason": str,
}
# Banned provider patterns (from ansible inventory)
BANNED_PROVIDERS = {"anthropic", "claude"}
BANNED_MODEL_PATTERNS = ["claude-*", "anthropic/*", "*sonnet*", "*opus*", "*haiku*"]
# ── Validators ────────────────────────────────────────────────────────────────
class ValidationError:
def __init__(self, path: str, message: str, severity: str = "error"):
self.path = path
self.message = message
self.severity = severity
def __str__(self):
prefix = {"error": "ERROR", "warning": "WARN", "info": "INFO"}.get(self.severity, "???")
return f"[{prefix}] {self.path}: {self.message}"
def validate_yaml_syntax(text: str) -> tuple[Any | None, list[ValidationError]]:
"""Validate YAML syntax. Returns (parsed_data, errors)."""
errors = []
# Check for tabs
for i, line in enumerate(text.splitlines(), 1):
if "\t" in line:
errors.append(ValidationError(f"line {i}", "contains tab character (use spaces for YAML)", "warning"))
# Parse
try:
data = yaml.safe_load(text)
except yaml.YAMLError as e:
mark = getattr(e, "problem_mark", None)
if mark:
errors.append(ValidationError(
f"line {mark.line + 1}, col {mark.column + 1}",
f"YAML syntax error: {e.problem}"
))
else:
errors.append(ValidationError("(file)", f"YAML syntax error: {e}"))
return None, errors
if data is None:
errors.append(ValidationError("(file)", "empty or null config", "warning"))
return None, errors
return data, errors
def validate_required_keys(data: dict, config_type: str) -> list[ValidationError]:
"""Check that required keys exist."""
errors = []
schema = REQUIRED_KEYS.get(config_type, REQUIRED_KEYS["any"])
for key, spec in schema.items():
if key not in data:
errors.append(ValidationError(
f".{key}",
f"required key missing: {key} ({spec['description']})"
))
elif not isinstance(data[key], spec["type"]):
errors.append(ValidationError(
f".{key}",
f"expected {spec['type'].__name__}, got {type(data[key]).__name__}"
))
return errors
def validate_provider_chain(data: dict) -> list[ValidationError]:
"""Validate provider configurations."""
errors = []
providers = data.get("providers", [])
if not isinstance(providers, list):
return errors # Caught by required_keys check
for i, provider in enumerate(providers):
path = f".providers[{i}]"
if not isinstance(provider, dict):
errors.append(ValidationError(path, "provider must be a dict"))
continue
# Check required provider keys
for key in PROVIDER_REQUIRED:
if key not in provider:
errors.append(ValidationError(f"{path}.{key}", f"provider missing required key: {key}"))
elif not isinstance(provider[key], str):
errors.append(ValidationError(
f"{path}.{key}",
f"expected string, got {type(provider[key]).__name__}"
))
# Check for banned providers
name = provider.get("name", "").lower()
model = provider.get("model", "").lower()
for banned in BANNED_PROVIDERS:
if banned in name:
errors.append(ValidationError(
f"{path}.name",
f"banned provider: '{provider.get('name')}' (contains '{banned}')"
))
import fnmatch
for pattern in BANNED_MODEL_PATTERNS:
if fnmatch.fnmatch(model, pattern.lower()):
errors.append(ValidationError(
f"{path}.model",
f"banned model pattern: '{provider.get('model')}' matches '{pattern}'"
))
# Check value types
for key, val in provider.items():
expected = PROVIDER_ALLOWED_TYPES.get(key)
if expected and not isinstance(val, expected):
errors.append(ValidationError(
f"{path}.{key}",
f"expected {expected if isinstance(expected, type) else expected.__name__}, got {type(val).__name__}",
"warning"
))
# Check provider chain has at least one entry
if not providers:
errors.append(ValidationError(".providers", "provider chain is empty — no inference available"))
return errors
def validate_value_types(data: dict, path: str = "") -> list[ValidationError]:
"""Recursively check for obviously wrong value types."""
errors = []
if isinstance(data, dict):
for key, val in data.items():
full_path = f"{path}.{key}" if path else f".{key}"
# Ports should be integers
if key in ("port", "api_port", "hermes_port", "timeout") and val is not None:
if not isinstance(val, (int, float)):
errors.append(ValidationError(full_path, f"expected number, got {type(val).__name__}", "warning"))
# URLs should be strings starting with http
if key in ("base_url", "gitea_url", "url") and val is not None:
if isinstance(val, str) and not val.startswith(("http://", "https://")):
errors.append(ValidationError(full_path, f"URL should start with http:// or https://", "warning"))
# Recurse
errors.extend(validate_value_types(val, full_path))
elif isinstance(data, list):
for i, item in enumerate(data):
errors.extend(validate_value_types(item, f"{path}[{i}]"))
return errors
def validate_config(text: str, config_type: str = "any") -> list[ValidationError]:
"""Run all validations on a config text."""
# Step 1: YAML syntax
data, errors = validate_yaml_syntax(text)
if data is None:
return errors # Can't continue without parsed data
if not isinstance(data, dict):
if config_type != "any":
errors.append(ValidationError("(file)", f"expected dict for {config_type} config, got {type(data).__name__}"))
return errors
# Step 2: Required keys
errors.extend(validate_required_keys(data, config_type))
# Step 3: Provider chain validation (if providers exist)
if "providers" in data:
errors.extend(validate_provider_chain(data))
# Step 4: Value type checking
errors.extend(validate_value_types(data))
return errors
# ── Auto-detect config type ───────────────────────────────────────────────────
def detect_config_type(data: dict) -> str:
"""Guess config type from contents."""
if "providers" in data and "display" in data:
return "hermes"
if "providers" in data and "wizard_name" in data:
return "wizard"
if "all" in data and "children" in data.get("all", {}):
return "ansible_inventory"
if "jobs" in data:
return "cron"
if "name" in data and "hosts" in data:
return "playbook"
return "any"
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Pre-deploy config validation")
parser.add_argument("file", help="Config file to validate (use - for stdin)")
parser.add_argument("--type", choices=list(REQUIRED_KEYS.keys()),
help="Expected config type (auto-detected if omitted)")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
# Read input
if args.file == "-":
text = sys.stdin.read()
filename = "<stdin>"
else:
path = Path(args.file)
if not path.exists():
print(f"ERROR: File not found: {path}", file=sys.stderr)
sys.exit(2)
text = path.read_text(encoding="utf-8", errors="replace")
filename = str(path)
# Detect type
config_type = args.type
if not config_type:
data, _ = validate_yaml_syntax(text)
if data and isinstance(data, dict):
config_type = detect_config_type(data)
else:
config_type = "any"
# Validate
errors = validate_config(text, config_type)
# Output
if args.json:
result = {
"file": filename,
"type": config_type,
"valid": not any(e.severity == "error" for e in errors),
"error_count": sum(1 for e in errors if e.severity == "error"),
"warning_count": sum(1 for e in errors if e.severity == "warning"),
"errors": [{"path": e.path, "message": e.message, "severity": e.severity} for e in errors],
}
print(json.dumps(result, indent=2))
else:
if errors:
print(f"Config validation FAILED: {filename} (type: {config_type})", file=sys.stderr)
for e in errors:
print(f" {e}", file=sys.stderr)
else:
print(f"Config validation PASSED: {filename} (type: {config_type})")
# Exit code
if any(e.severity == "error" for e in errors):
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,362 +0,0 @@
#!/usr/bin/env python3
"""PR Triage Automation -- Categorize, deduplicate, report, auto-merge (#659).
Enhancements over base implementation:
- Auto-merge for safe PRs (training data with passing tests)
- --all-repos flag for org-wide triage
- JSON output with structured data
- Age-based risk scoring
- Better duplicate detection (title similarity)
- Tests in tests/test_pr_triage.py
Usage:
python scripts/pr_triage.py --repo hermes-agent
python scripts/pr_triage.py --repo hermes-agent --json
python scripts/pr_triage.py --repo hermes-agent --auto-merge --dry-run
python scripts/pr_triage.py --all-repos --owner Timmy_Foundation
"""
import argparse
import json
import os
import re
import sys
import subprocess
from collections import Counter, defaultdict
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.request import Request, urlopen
from urllib.error import HTTPError
def _token():
t = os.environ.get("GITEA_TOKEN", "")
if not t:
p = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(p):
t = open(p).read().strip()
return t
def _api(url, token, method="GET", data=None):
h = {"Authorization": "token " + token, "Accept": "application/json"}
body = json.dumps(data).encode() if data else None
if data:
h["Content-Type"] = "application/json"
req = Request(url, data=body, headers=h, method=method)
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def fetch_prs(base, token, owner, repo):
prs, page = [], 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50&page={page}", token)
if not b:
break
prs.extend(b)
if len(b) < 50:
break
page += 1
return prs
def fetch_issues(base, token, owner, repo):
iss, page = {}, 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/issues?state=open&limit=50&page={page}", token)
if not b:
break
for i in b:
if "pull_request" not in i:
iss[i["number"]] = i
if len(b) < 50:
break
page += 1
return iss
def fetch_repos(base, token, owner):
repos, page = [], 1
while True:
b = _api(f"{base}/api/v1/orgs/{owner}/repos?limit=50&page={page}", token)
if not b:
break
repos.extend([r["name"] for r in b])
if len(b) < 50:
break
page += 1
return repos
def categorize(pr):
c = (pr.get("title", "") + " " + pr.get("body", "") + " " +
" ".join(l.get("name", "") for l in pr.get("labels", []))).lower()
for kw, cat in [
("training data", "training-data"), ("dpo", "training-data"), ("grpo", "training-data"),
("fix:", "bug-fix"), ("bug", "bug-fix"), ("hotfix", "bug-fix"),
("feat:", "feature"), ("feature", "feature"), ("enhancement", "feature"),
("refactor", "maintenance"), ("cleanup", "maintenance"), ("chore:", "maintenance"),
("doc", "documentation"), ("test", "testing"), ("ci", "infrastructure"),
("infra", "infrastructure"), ("deploy", "infrastructure"),
]:
if kw in c:
return cat
return "other"
def refs(pr):
return [int(m) for m in re.findall(r"#(\d+)", pr.get("title", "") + " " + pr.get("body", ""))]
def find_duplicates(prs):
by_ref = defaultdict(list)
for p in prs:
for r in refs(p):
by_ref[r].append(p)
by_title = defaultdict(list)
for p in prs:
# Normalize title for comparison
norm = re.sub(r"^(fix|feat|chore|docs|test|refactor)[\(:].*?[\):]\s*", "", p.get("title", "").lower())
norm = re.sub(r"#\d+", "", norm).strip()
by_title[norm].append(p)
dup_groups = []
seen = set()
# Ref-based duplicates
for r, group in by_ref.items():
if len(group) > 1:
key = tuple(sorted(p["number"] for p in group))
if key not in seen:
seen.add(key)
dup_groups.append({"type": "ref", "ref": r, "prs": group})
# Title-similarity duplicates (threshold 0.85)
for i, p1 in enumerate(prs):
for p2 in prs[i + 1:]:
key = tuple(sorted([p1["number"], p2["number"]]))
if key in seen:
continue
sim = SequenceMatcher(None, p1.get("title", "").lower(), p2.get("title", "").lower()).ratio()
if sim > 0.85:
seen.add(key)
dup_groups.append({"type": "similarity", "similarity": round(sim, 2), "prs": [p1, p2]})
return dup_groups
def health(pr, issues):
r = refs(pr)
created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age_days = (now - created).days
stale_days = (now - updated).days
# Risk score: age + staleness + no refs + not mergeable
risk = 0
risk += min(age_days, 30) # max 30 for age
risk += min(stale_days * 2, 40) # max 40 for staleness
risk += 10 if not r else 0 # no issue refs
risk += 15 if pr.get("mergeable") is False else 0 # conflicts
risk = min(risk, 100)
return {
"pr": pr["number"],
"title": pr["title"],
"head": pr["head"]["ref"],
"category": categorize(pr),
"refs": r,
"open_issues": [x for x in r if x in issues],
"closed_issues": [x for x in r if x not in issues],
"age_days": age_days,
"stale_days": stale_days,
"risk_score": risk,
"mergeable": pr.get("mergeable"),
"author": pr.get("user", {}).get("login", ""),
"labels": [l.get("name", "") for l in pr.get("labels", [])],
}
def is_safe_to_merge(h):
"""Determine if a PR is safe to auto-merge."""
if h["category"] != "training-data":
return False, "not training-data"
if h["mergeable"] is False:
return False, "has conflicts"
if h["mergeable"] is None:
return False, "mergeable status unknown"
if h["stale_days"] > 30:
return False, f"too stale ({h['stale_days']}d)"
if h["risk_score"] > 50:
return False, f"risk too high ({h['risk_score']})"
return True, "safe"
def auto_merge(base, token, owner, repo, pr_num, dry_run=True):
"""Attempt to merge a PR."""
if dry_run:
return {"merged": False, "dry_run": True, "pr": pr_num}
url = f"{base}/api/v1/repos/{owner}/{repo}/pulls/{pr_num}/merge"
result = _api(url, token, method="POST", data={
"MergeTitleField": "auto",
"MergeMessageField": "auto",
"Do": "merge",
})
return {"merged": result is not None, "pr": pr_num, "result": result}
def report(repo, checks, dups):
lines = [
f"# PR Triage -- {repo}",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Open PRs: {len(checks)}",
"",
"## Summary",
"",
]
cats = Counter(h["category"] for h in checks)
lines.append("| Category | Count |")
lines.append("|----------|-------|")
for c, n in cats.most_common():
lines.append(f"| {c} | {n} |")
stale = [h for h in checks if h["stale_days"] > 7]
high_risk = [h for h in checks if h["risk_score"] > 50]
safe_merge = [h for h in checks if is_safe_to_merge(h)[0]]
lines.extend([
"",
f"Stale (>7d): {len(stale)}",
f"High risk (>50): {len(high_risk)}",
f"Safe to merge: {len(safe_merge)}",
f"Duplicate groups: {len(dups)}",
"",
])
if safe_merge:
lines.append("## Safe to Auto-Merge")
for h in safe_merge:
ok, reason = is_safe_to_merge(h)
lines.append(f"- #{h['pr']}: {h['title'][:60]} ({reason})")
lines.append("")
if dups:
lines.append("## Duplicates")
for g in dups:
pr_nums = [str(p["number"]) for p in g["prs"]]
lines.append(f"[{g['type']}] PRs {', '.join('#' + n for n in pr_nums)}:")
for p in g["prs"]:
lines.append(f" - #{p['number']}: {p['title']}")
lines.append("")
if stale:
lines.append("## Stale (>7d)")
for h in sorted(stale, key=lambda x: x["stale_days"], reverse=True):
lines.append(f"- #{h['pr']}: {h['title'][:50]} -- {h['stale_days']}d (risk: {h['risk_score']})")
lines.append("")
lines.append("## All PRs")
lines.append("| # | Title | Category | Age | Stale | Risk | Merge |")
lines.append("|---|-------|----------|-----|-------|------|-------|")
for h in sorted(checks, key=lambda x: x["pr"]):
m = "Y" if h["mergeable"] else ("N" if h["mergeable"] is False else "?")
s = f"{h['stale_days']}d" if h["stale_days"] > 7 else "-"
lines.append(f"| {h['pr']} | {h['title'][:45]} | {h['category']} | {h['age_days']}d | {s} | {h['risk_score']} | {m} |")
return "\n".join(lines)
def main():
p = argparse.ArgumentParser(description="PR Triage Automation")
p.add_argument("--base-url", default="https://forge.alexanderwhitestone.com")
p.add_argument("--owner", default="Timmy_Foundation")
p.add_argument("--repo", default="")
p.add_argument("--all-repos", action="store_true", help="Triage all org repos")
p.add_argument("--json", action="store_true", dest="js")
p.add_argument("--output", default="")
p.add_argument("--auto-merge", action="store_true", help="Auto-merge safe PRs")
p.add_argument("--dry-run", action="store_true", help="Show what would be merged without merging")
a = p.parse_args()
token = _token()
if not token:
print("No token"); sys.exit(1)
if a.all_repos:
repos = fetch_repos(a.base_url, token, a.owner)
all_checks = []
all_dups = []
for repo in repos:
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
for c in checks:
c["repo"] = repo
all_checks.extend(checks)
all_dups.extend(dups)
if a.js:
print(json.dumps({"repos": repos, "prs": all_checks, "duplicates_count": len(all_dups)}, indent=2))
else:
print(f"Org-wide triage: {len(repos)} repos, {len(all_checks)} PRs, {len(all_dups)} duplicate groups")
cats = Counter(h["category"] for h in all_checks)
for c, n in cats.most_common():
print(f" {c}: {n}")
return
repo = a.repo
if not repo:
try:
remote = subprocess.check_output(["git", "remote", "get-url", "origin"], text=True).strip()
m = re.search(r"[/:](\w[\w-]*)/(\w[\w-]*?)(?:\.git)?$", remote)
if m:
a.owner, repo = m.group(1), m.group(2)
except Exception:
pass
if not repo:
print("No repo specified"); sys.exit(1)
print(f"Triaging {a.owner}/{repo}...", file=sys.stderr)
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
# Auto-merge
merge_results = []
if a.auto_merge or a.dry_run:
safe = [h for h in checks if is_safe_to_merge(h)[0]]
if safe:
print(f"Auto-merge: {len(safe)} safe PRs ({'dry-run' if a.dry_run else 'live'})", file=sys.stderr)
for h in safe:
result = auto_merge(a.base_url, token, a.owner, repo, h["pr"], dry_run=a.dry_run)
merge_results.append(result)
status = "WOULD MERGE" if a.dry_run else ("MERGED" if result["merged"] else "FAILED")
print(f" #{h['pr']}: {status}", file=sys.stderr)
if a.js:
out = {
"repo": repo, "prs": checks,
"duplicates": [{"type": g["type"], "prs": [p["number"] for p in g["prs"]]} for g in dups],
"merge_results": merge_results,
}
print(json.dumps(out, indent=2))
else:
r = report(repo, checks, dups)
print(r)
if a.output:
with open(a.output, "w") as f:
f.write(r)
print(f"\n{len(checks)} PRs, {len(dups)} duplicate groups, {len(merge_results)} merges",
file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""Tests for deploy_config_validator.py"""
import json
import sys
import os
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from scripts.deploy_config_validator import (
validate_yaml_syntax,
validate_required_keys,
validate_provider_chain,
validate_value_types,
validate_config,
detect_config_type,
ValidationError,
)
class TestYAMLSyntax:
def test_valid_yaml(self):
data, errors = validate_yaml_syntax("key: value\nlist:\n - a\n - b\n")
assert data is not None
assert len(errors) == 0
def test_invalid_yaml(self):
data, errors = validate_yaml_syntax("key: [unclosed")
assert data is None
assert len(errors) > 0
def test_empty_yaml(self):
data, errors = validate_yaml_syntax("")
assert data is None
assert any("empty" in e.message for e in errors)
def test_tabs_warning(self):
data, errors = validate_yaml_syntax("key:\tvalue\n")
assert any("tab" in e.message for e in errors)
class TestRequiredKeys:
def test_missing_key(self):
errors = validate_required_keys({}, "hermes")
assert any("providers" in e.message for e in errors)
def test_wrong_type(self):
errors = validate_required_keys({"providers": "not-a-list"}, "hermes")
assert any("expected list" in e.message for e in errors)
def test_valid(self):
errors = validate_required_keys({"providers": []}, "hermes")
provider_errors = [e for e in errors if "providers" in e.message and "missing" in e.message]
assert len(provider_errors) == 0
class TestProviderChain:
def test_empty_providers(self):
errors = validate_provider_chain({"providers": []})
assert any("empty" in e.message for e in errors)
def test_missing_name(self):
errors = validate_provider_chain({"providers": [{"model": "test", "base_url": "http://x"}]})
assert any("name" in e.message and "missing" in e.message for e in errors)
def test_banned_provider(self):
errors = validate_provider_chain({"providers": [
{"name": "anthropic", "model": "claude-3", "base_url": "http://x"}
]})
assert any("banned provider" in e.message for e in errors)
def test_banned_model(self):
errors = validate_provider_chain({"providers": [
{"name": "test", "model": "claude-sonnet-4", "base_url": "http://x"}
]})
assert any("banned model" in e.message for e in errors)
def test_valid_providers(self):
errors = validate_provider_chain({"providers": [
{"name": "kimi-coding", "model": "kimi-k2.5", "base_url": "https://api.kimi.com/v1"}
]})
provider_errors = [e for e in errors if e.severity == "error"]
assert len(provider_errors) == 0
class TestValueTypes:
def test_string_port(self):
errors = validate_value_types({"port": "8080"})
assert any("port" in e.path and "number" in e.message for e in errors)
def test_valid_port(self):
errors = validate_value_types({"port": 8080})
port_errors = [e for e in errors if "port" in e.path]
assert len(port_errors) == 0
def test_bad_url(self):
errors = validate_value_types({"base_url": "not-a-url"})
assert any("URL" in e.message for e in errors)
class TestDetectConfigType:
def test_hermes(self):
t = detect_config_type({"providers": [], "display": {}})
assert t == "hermes"
def test_ansible(self):
t = detect_config_type({"all": {"children": {"wizards": {}}}})
assert t == "ansible_inventory"
def test_unknown(self):
t = detect_config_type({"random": "data"})
assert t == "any"
class TestFullValidation:
def test_valid_hermes_config(self):
text = """
providers:
- name: kimi-coding
model: kimi-k2.5
base_url: https://api.kimi.com/coding/v1
timeout: 120
display:
skin: default
"""
errors = validate_config(text, "hermes")
assert not any(e.severity == "error" for e in errors)
def test_banned_provider_catches(self):
text = """
providers:
- name: anthropic
model: claude-sonnet-4
base_url: https://api.anthropic.com
"""
errors = validate_config(text, "hermes")
assert any("banned" in e.message for e in errors)
def test_missing_providers(self):
text = "display:\n skin: default\n"
errors = validate_config(text, "hermes")
assert any("providers" in e.message and "missing" in e.message for e in errors)

View File

@@ -1,161 +0,0 @@
"""Tests for PR triage automation (#659)."""
from __future__ import annotations
import pytest
from datetime import datetime, timezone, timedelta
from scripts.pr_triage import categorize, refs, find_duplicates, health, is_safe_to_merge
class TestCategorize:
"""PR categorization from title/body/labels."""
def test_training_data(self):
pr = {"title": "Add DPO training data", "body": "", "labels": []}
assert categorize(pr) == "training-data"
def test_bug_fix(self):
pr = {"title": "fix: resolve crash on startup", "body": "", "labels": []}
assert categorize(pr) == "bug-fix"
def test_feature(self):
pr = {"title": "feat: add dark mode", "body": "", "labels": []}
assert categorize(pr) == "feature"
def test_maintenance(self):
pr = {"title": "refactor: simplify auth flow", "body": "", "labels": []}
assert categorize(pr) == "maintenance"
def test_other(self):
pr = {"title": "Update readme", "body": "", "labels": []}
assert categorize(pr) == "other"
class TestRefs:
"""Issue reference extraction."""
def test_extracts_from_title(self):
pr = {"title": "fix: resolve #123", "body": ""}
assert refs(pr) == [123]
def test_extracts_from_body(self):
pr = {"title": "Fix", "body": "Closes #456, refs #789"}
assert refs(pr) == [456, 789]
def test_no_refs(self):
pr = {"title": "Fix", "body": "No issue refs"}
assert refs(pr) == []
def test_multiple_refs(self):
pr = {"title": "#1 and #2", "body": "Also #3"}
assert refs(pr) == [1, 2, 3]
class TestFindDuplicates:
"""Duplicate PR detection."""
def test_ref_based_duplicates(self):
prs = [
{"number": 1, "title": "Fix #100", "body": "Closes #100"},
{"number": 2, "title": "Fix #100 too", "body": "Closes #100"},
]
dups = find_duplicates(prs)
assert len(dups) == 1
assert dups[0]["type"] == "ref"
def test_title_similarity_duplicates(self):
prs = [
{"number": 1, "title": "feat: add dark mode support", "body": ""},
{"number": 2, "title": "feat: add dark mode support", "body": "different body"},
]
dups = find_duplicates(prs)
assert len(dups) >= 1
assert any(d["type"] == "similarity" for d in dups)
def test_no_duplicates(self):
prs = [
{"number": 1, "title": "Fix auth bug", "body": "Closes #100"},
{"number": 2, "title": "Add dark mode", "body": "Closes #200"},
]
dups = find_duplicates(prs)
assert len(dups) == 0
class TestHealth:
"""PR health assessment."""
def _make_pr(self, **overrides):
now = datetime.now(timezone.utc).isoformat()
pr = {
"number": 1,
"title": "test",
"body": "Closes #100",
"created_at": now,
"updated_at": now,
"head": {"ref": "fix/test"},
"mergeable": True,
"user": {"login": "agent"},
"labels": [],
}
pr.update(overrides)
return pr
def test_basic_health(self):
pr = self._make_pr()
h = health(pr, {100: {"number": 100}})
assert h["pr"] == 1
assert h["refs"] == [100]
assert h["open_issues"] == [100]
assert h["age_days"] == 0
def test_stale_detection(self):
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
pr = self._make_pr(created_at=old, updated_at=old)
h = health(pr, {})
assert h["stale_days"] >= 29
assert h["risk_score"] > 30
class TestIsSafeToMerge:
"""Auto-merge safety checks."""
def _make_health(self, **overrides):
h = {
"pr": 1, "title": "test", "head": "fix/test",
"category": "training-data", "refs": [100],
"open_issues": [100], "closed_issues": [],
"age_days": 1, "stale_days": 1,
"risk_score": 10, "mergeable": True,
"author": "agent", "labels": [],
}
h.update(overrides)
return h
def test_safe_training_data(self):
h = self._make_health()
ok, reason = is_safe_to_merge(h)
assert ok
def test_unsafe_not_training(self):
h = self._make_health(category="bug-fix")
ok, reason = is_safe_to_merge(h)
assert not ok
assert "not training-data" in reason
def test_unsafe_conflicts(self):
h = self._make_health(mergeable=False)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "conflicts" in reason
def test_unsafe_too_stale(self):
h = self._make_health(stale_days=31)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "stale" in reason
def test_unsafe_high_risk(self):
h = self._make_health(risk_score=60)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "risk" in reason