diff --git a/bin/fleet_audit.py b/bin/fleet_audit.py new file mode 100644 index 00000000..ff977f6d --- /dev/null +++ b/bin/fleet_audit.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +""" +Fleet Audit — Deduplicate Agents, One Identity Per Machine. + +Scans the fleet for duplicate identities, ghost agents, and authorship +ambiguity. Produces a machine-readable audit report and remediation plan. + +Usage: + python3 bin/fleet_audit.py # full audit + python3 bin/fleet_audit.py --identity-check # identity registry only + python3 bin/fleet_audit.py --git-authors # git authorship audit + python3 bin/fleet_audit.py --gitea-members # Gitea org member audit + python3 bin/fleet_audit.py --report fleet/audit-report.json # output path +""" +import argparse +import json +import os +import re +import subprocess +import sys +from collections import Counter, defaultdict +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import yaml + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +@dataclass +class AgentIdentity: + """One identity per machine — enforced by the registry.""" + name: str + machine: str # hostname or IP + role: str + gitea_user: Optional[str] = None + active: bool = True + lane: Optional[str] = None + created: Optional[str] = None + notes: Optional[str] = None + + +@dataclass +class AuditFinding: + severity: str # critical, warning, info + category: str # duplicate, ghost, orphan, authorship + description: str + affected: list = field(default_factory=list) + remediation: str = "" + + +@dataclass +class AuditReport: + timestamp: str + findings: list = field(default_factory=list) + registry_valid: bool = True + duplicate_count: int = 0 + ghost_count: int = 0 + total_agents: int = 0 + summary: str = "" + + +# --------------------------------------------------------------------------- +# Identity registry +# --------------------------------------------------------------------------- + +DEFAULT_REGISTRY_PATH = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml" + + +def load_registry(path: Path = DEFAULT_REGISTRY_PATH) -> dict: + """Load the identity registry YAML.""" + if not path.exists(): + return {"version": 1, "agents": [], "rules": {}} + with open(path) as f: + return yaml.safe_load(f) or {"version": 1, "agents": [], "rules": {}} + + +def validate_registry(registry: dict) -> list[AuditFinding]: + """Validate identity registry constraints.""" + findings = [] + agents = registry.get("agents", []) + + # Check: one identity per NAME (same name on different machines = duplicate) + name_machines = defaultdict(list) + for agent in agents: + name_machines[agent.get("name", "unknown")].append(agent.get("machine", "unknown")) + + for name, machines in name_machines.items(): + known = [m for m in machines if m != "unknown"] + if len(known) > 1: + findings.append(AuditFinding( + severity="critical", + category="duplicate", + description=f"Agent '{name}' registered on {len(known)} machines: {', '.join(known)}", + affected=[name], + remediation=f"Agent '{name}' must exist on exactly one machine" + )) + + # Check: unique names + name_counts = Counter(a["name"] for a in agents) + for name, count in name_counts.items(): + if count > 1: + findings.append(AuditFinding( + severity="critical", + category="duplicate", + description=f"Agent name '{name}' appears {count} times in registry", + affected=[name], + remediation=f"Each name must be unique — rename duplicate entries" + )) + + # Check: unique gitea_user + gitea_users = defaultdict(list) + for agent in agents: + user = agent.get("gitea_user") + if user: + gitea_users[user].append(agent["name"]) + for user, names in gitea_users.items(): + if len(names) > 1: + findings.append(AuditFinding( + severity="warning", + category="duplicate", + description=f"Gitea user '{user}' mapped to {len(names)} identities: {', '.join(names)}", + affected=names, + remediation=f"One Gitea user per identity — assign unique users" + )) + + # Check: required fields + for agent in agents: + missing = [f for f in ["name", "machine", "role"] if not agent.get(f)] + if missing: + findings.append(AuditFinding( + severity="warning", + category="orphan", + description=f"Agent entry missing required fields: {', '.join(missing)}", + affected=[agent.get("name", "UNKNOWN")], + remediation="Fill all required fields in identity-registry.yaml" + )) + + return findings + + +# --------------------------------------------------------------------------- +# Git authorship audit +# --------------------------------------------------------------------------- + +def audit_git_authors(repo_path: Path = None, days: int = 30) -> list[AuditFinding]: + """Check git log for authorship patterns — detect ambiguous or duplicate committers.""" + if repo_path is None: + repo_path = Path(__file__).resolve().parent.parent + + findings = [] + + # Get recent commits + result = subprocess.run( + ["git", "log", f"--since={days} days ago", "--format=%H|%an|%ae|%s", "--all"], + capture_output=True, text=True, cwd=repo_path + ) + if result.returncode != 0: + findings.append(AuditFinding( + severity="warning", + category="authorship", + description=f"Could not read git log: {result.stderr.strip()}" + )) + return findings + + commits = [] + for line in result.stdout.strip().split("\n"): + if not line: + continue + parts = line.split("|", 3) + if len(parts) == 4: + commits.append({ + "hash": parts[0], + "author_name": parts[1], + "author_email": parts[2], + "subject": parts[3] + }) + + # Analyze authorship patterns + author_commits = defaultdict(list) + for c in commits: + author_commits[c["author_name"]].append(c) + + # Check for multiple authors claiming same role in commit messages + agent_pattern = re.compile(r'\[(\w+)\]|\b(\w+)\s+agent\b', re.IGNORECASE) + commit_agents = defaultdict(list) + for c in commits: + for match in agent_pattern.finditer(c["subject"]): + agent = match.group(1) or match.group(2) + commit_agents[agent.lower()].append(c["author_name"]) + + for agent, authors in commit_agents.items(): + unique_authors = set(authors) + if len(unique_authors) > 1: + findings.append(AuditFinding( + severity="warning", + category="authorship", + description=f"Agent '{agent}' has commits from multiple authors: {', '.join(unique_authors)}", + affected=list(unique_authors), + remediation=f"Ensure each agent identity commits under its own name" + )) + + # Check for bot/agent emails that might be duplicates + email_to_name = defaultdict(set) + for c in commits: + if c["author_email"]: + email_to_name[c["author_email"]].add(c["author_name"]) + + for email, names in email_to_name.items(): + if len(names) > 1: + findings.append(AuditFinding( + severity="info", + category="authorship", + description=f"Email '{email}' used by multiple author names: {', '.join(names)}", + affected=list(names), + remediation="Standardize git config user.name for this email" + )) + + return findings + + +# --------------------------------------------------------------------------- +# Gitea org member audit +# --------------------------------------------------------------------------- + +def audit_gitea_members(token: str = None) -> list[AuditFinding]: + """Audit Gitea org members for ghost/duplicate accounts.""" + findings = [] + + if not token: + token_path = Path.home() / ".config" / "gitea" / "token" + if token_path.exists(): + token = token_path.read_text().strip() + else: + findings.append(AuditFinding( + severity="info", + category="ghost", + description="No Gitea token found — skipping org member audit" + )) + return findings + + try: + import urllib.request + req = urllib.request.Request( + "https://forge.alexanderwhitestone.com/api/v1/orgs/Timmy_Foundation/members?limit=100", + headers={"Authorization": f"token {token}"} + ) + resp = urllib.request.urlopen(req) + members = json.loads(resp.read()) + except Exception as e: + findings.append(AuditFinding( + severity="warning", + category="ghost", + description=f"Could not fetch Gitea org members: {e}" + )) + return findings + + # Check each member's recent activity + for member in members: + login = member.get("login", "unknown") + try: + # Check recent issues + req2 = urllib.request.Request( + f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/issues" + f"?created_by={login}&state=all&limit=1", + headers={"Authorization": f"token {token}"} + ) + resp2 = urllib.request.urlopen(req2) + issues = json.loads(resp2.read()) + + # Check recent PRs + req3 = urllib.request.Request( + f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/pulls" + f"?state=all&limit=50", + headers={"Authorization": f"token {token}"} + ) + resp3 = urllib.request.urlopen(req3) + prs = json.loads(resp3.read()) + user_prs = [p for p in prs if p.get("user", {}).get("login") == login] + + if not issues and not user_prs: + findings.append(AuditFinding( + severity="info", + category="ghost", + description=f"Gitea member '{login}' has no issues or PRs in the-nexus", + affected=[login], + remediation="Consider removing from org if truly unused" + )) + except Exception: + pass # Individual member check failed, skip + + return findings + + +# --------------------------------------------------------------------------- +# Fleet inventory from fleet-routing.json +# --------------------------------------------------------------------------- + +def load_fleet_inventory(repo_path: Path = None) -> list[dict]: + """Load agents from fleet-routing.json.""" + if repo_path is None: + repo_path = Path(__file__).resolve().parent.parent + + routing_path = repo_path / "fleet" / "fleet-routing.json" + if not routing_path.exists(): + return [] + + with open(routing_path) as f: + data = json.load(f) + + return data.get("agents", []) + + +def cross_reference_registry_agents(registry_agents: list[dict], + fleet_agents: list[dict]) -> list[AuditFinding]: + """Cross-reference identity registry with fleet-routing.json.""" + findings = [] + + registry_names = {a["name"].lower() for a in registry_agents} + fleet_names = {a["name"].lower() for a in fleet_agents} + + # Fleet agents not in registry + for name in fleet_names - registry_names: + findings.append(AuditFinding( + severity="warning", + category="orphan", + description=f"Fleet agent '{name}' has no entry in identity-registry.yaml", + affected=[name], + remediation="Add to identity-registry.yaml or remove from fleet-routing.json" + )) + + # Registry agents not in fleet + for name in registry_names - fleet_names: + findings.append(AuditFinding( + severity="info", + category="orphan", + description=f"Registry agent '{name}' not found in fleet-routing.json", + affected=[name], + remediation="Add to fleet-routing.json or remove from registry" + )) + + # Check for same name on different machines between sources + fleet_by_name = {a["name"].lower(): a for a in fleet_agents} + reg_by_name = {a["name"].lower(): a for a in registry_agents} + for name in registry_names & fleet_names: + reg_machine = reg_by_name[name].get("machine", "") + fleet_location = fleet_by_name[name].get("location", "") + if reg_machine and fleet_location and reg_machine.lower() not in fleet_location.lower(): + findings.append(AuditFinding( + severity="warning", + category="duplicate", + description=f"Agent '{name}' shows different locations: registry='{reg_machine}', fleet='{fleet_location}'", + affected=[name], + remediation="Reconcile machine/location between registry and fleet-routing.json" + )) + + return findings + + +# --------------------------------------------------------------------------- +# Full audit pipeline +# --------------------------------------------------------------------------- + +def run_full_audit(repo_path: Path = None, token: str = None, + gitea: bool = True) -> AuditReport: + """Run the complete fleet audit pipeline.""" + if repo_path is None: + repo_path = Path(__file__).resolve().parent.parent + + findings = [] + report = AuditReport(timestamp=datetime.now(timezone.utc).isoformat()) + + # 1. Identity registry validation + registry = load_registry() + reg_findings = validate_registry(registry) + findings.extend(reg_findings) + + # 2. Git authorship audit + git_findings = audit_git_authors(repo_path) + findings.extend(git_findings) + + # 3. Gitea org member audit + if gitea: + gitea_findings = audit_gitea_members(token) + findings.extend(gitea_findings) + + # 4. Cross-reference registry vs fleet-routing.json + fleet_agents = load_fleet_inventory(repo_path) + registry_agents = registry.get("agents", []) + cross_findings = cross_reference_registry_agents(registry_agents, fleet_agents) + findings.extend(cross_findings) + + # Compile report + report.findings = [asdict(f) for f in findings] + report.registry_valid = not any(f.severity == "critical" for f in reg_findings) + report.duplicate_count = sum(1 for f in findings if f.category == "duplicate") + report.ghost_count = sum(1 for f in findings if f.category == "ghost") + report.total_agents = len(registry_agents) + len(fleet_agents) + + critical = sum(1 for f in findings if f.severity == "critical") + warnings = sum(1 for f in findings if f.severity == "warning") + report.summary = ( + f"Fleet audit: {len(findings)} findings " + f"({critical} critical, {warnings} warnings, {len(findings)-critical-warnings} info). " + f"Registry {'VALID' if report.registry_valid else 'INVALID — DUPLICATES FOUND'}. " + f"{report.total_agents} agent identities across registry + fleet config." + ) + + return report + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Fleet Audit — Deduplicate Agents, One Identity Per Machine") + parser.add_argument("--report", default=None, help="Output JSON report path") + parser.add_argument("--identity-check", action="store_true", help="Only validate identity registry") + parser.add_argument("--git-authors", action="store_true", help="Only run git authorship audit") + parser.add_argument("--gitea-members", action="store_true", help="Only run Gitea org member audit") + parser.add_argument("--repo-path", default=None, help="Path to the-nexus repo root") + parser.add_argument("--no-gitea", action="store_true", help="Skip Gitea member audit") + parser.add_argument("--token", default=None, help="Gitea API token (or read from ~/.config/gitea/token)") + + args = parser.parse_args() + repo_path = Path(args.repo_path) if args.repo_path else Path(__file__).resolve().parent.parent + + if args.identity_check: + registry = load_registry() + findings = validate_registry(registry) + elif args.git_authors: + findings = audit_git_authors(repo_path) + elif args.gitea_members: + findings = audit_gitea_members(args.token) + else: + report = run_full_audit(repo_path, args.token, gitea=not args.no_gitea) + output = asdict(report) + + if args.report: + report_path = Path(args.report) + report_path.parent.mkdir(parents=True, exist_ok=True) + with open(report_path, "w") as f: + json.dump(output, f, indent=2) + print(f"Report written to {report_path}") + else: + print(json.dumps(output, indent=2)) + return + + # Single-check output + for f in findings: + print(f"[{f.severity.upper()}] {f.category}: {f.description}") + if f.remediation: + print(f" -> {f.remediation}") + print(f"\n{len(findings)} findings.") + sys.exit(1 if any(f.severity == "critical" for f in findings) else 0) + + +if __name__ == "__main__": + main() diff --git a/fleet/identity-registry.yaml b/fleet/identity-registry.yaml new file mode 100644 index 00000000..0cbd8015 --- /dev/null +++ b/fleet/identity-registry.yaml @@ -0,0 +1,137 @@ +# identity-registry.yaml — One Identity Per Machine +# +# Each agent in the fleet has ONE entry here. Name must be unique. +# Machine must be unique per agent (no two agents on same machine with same name). +# Gitea user must map 1:1 to agent identity. +# +# Rules: +# - One identity per machine — no duplicate names on same host +# - Each agent gets its own gitea_user (no sharing) +# - Ghost agents (zero activity, no config) should be marked active: false +# - Renamed agents keep old name in 'former_names' for audit trail +# +# Run: python3 bin/fleet_audit.py --identity-check +# --- +version: 1 +rules: + one_identity_per_machine: true + unique_gitea_user: true + required_fields: [name, machine, role] + +agents: + # === CANONICAL WIZARDS === + - name: timmy + machine: local-mac + role: father-house + gitea_user: timmy + active: true + lane: orchestration + notes: "The father. Runs on Alexander's Mac. Hermes default profile." + + - name: allegro + machine: "167.99.126.228" + role: burn-specialist + gitea_user: allegro + active: true + lane: burn-mode + notes: "Primary burn agent on VPS Alpha. Fast execution." + + - name: ezra + machine: "143.198.27.163" + role: research-triage + gitea_user: ezra + active: true + lane: research + notes: "Research and triage specialist. VPS Ezra." + + - name: bezalel + machine: "104.131.15.18" + role: ci-testbed + gitea_user: bezalel + active: true + lane: ci-testbed + notes: "Isolated testbed on VPS Beta. Build verification and security audits." + + # === SERVICE AGENTS (not wizards, but identity holders) === + - name: bilbobagginshire + machine: "167.99.126.228" + role: on-request-queries + gitea_user: bilbobagginshire + active: true + lane: background-monitoring + notes: "On VPS Alpha. Ollama-backed. Low-priority Q&A only." + + - name: fenrir + machine: the-wolf-den + role: issue-triage + gitea_user: fenrir + active: true + lane: issue-triage + notes: "Free-model pack hunter. Backlog triage." + + - name: substratum + machine: "167.99.126.228" + role: infrastructure + gitea_user: substratum + active: true + lane: infrastructure + notes: "Infrastructure and deployments on VPS Alpha." + + # === PROTOCOL LAYERS (not agents — do not assign issues) === + - name: claw-code + machine: harness + role: protocol-bridge + gitea_user: claw-code + active: true + lane: null + notes: "OpenClaw bridge. Protocol adapter, not an endpoint. See #836." + + # === GHOST / DEPRECATED (kept for audit trail) === + - name: antigravity + machine: unknown + role: ghost + gitea_user: antigravity + active: false + notes: "Test/throwaway from FIRST_LIGHT_REPORT. Zero activity." + + - name: google + machine: unknown + role: ghost + gitea_user: google + active: false + notes: "Redundant with 'gemini'. Use gemini for all Google/Gemini work." + + - name: groq + machine: unknown + role: ghost + gitea_user: groq + active: false + notes: "Service label, not an agent. groq_worker.py is infrastructure." + + - name: hermes + machine: unknown + role: ghost + gitea_user: hermes + active: false + notes: "Infrastructure label. Real wizards: allegro, ezra." + + - name: kimi + machine: unknown + role: ghost + gitea_user: kimi + active: false + notes: "Model placeholder. KimiClaw is the real account if active." + + - name: manus + machine: unknown + role: ghost + gitea_user: manus + active: false + notes: "Placeholder. No harness configured." + + - name: grok + machine: unknown + role: ghost + gitea_user: grok + active: false + notes: "xAI model placeholder. No active harness." diff --git a/tests/test_fleet_audit.py b/tests/test_fleet_audit.py new file mode 100644 index 00000000..363bb9b9 --- /dev/null +++ b/tests/test_fleet_audit.py @@ -0,0 +1,143 @@ +"""Tests for fleet_audit — Deduplicate Agents, One Identity Per Machine.""" +import json +import tempfile +from pathlib import Path + +import pytest +import yaml + +# Adjust import path +import sys +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin")) + +from fleet_audit import ( + AuditFinding, + validate_registry, + cross_reference_registry_agents, + audit_git_authors, +) + + +# --------------------------------------------------------------------------- +# Identity registry validation tests +# --------------------------------------------------------------------------- + +class TestValidateRegistry: + """Test identity registry validation rules.""" + + def _make_registry(self, agents): + return {"version": 1, "agents": agents, "rules": {"one_identity_per_machine": True}} + + def test_clean_registry_passes(self): + registry = self._make_registry([ + {"name": "allegro", "machine": "167.99.126.228", "role": "burn", "gitea_user": "allegro"}, + {"name": "ezra", "machine": "143.198.27.163", "role": "triage", "gitea_user": "ezra"}, + ]) + findings = validate_registry(registry) + critical = [f for f in findings if f.severity == "critical"] + assert len(critical) == 0 + + def test_same_name_on_different_machines_detected(self): + registry = self._make_registry([ + {"name": "allegro", "machine": "167.99.126.228", "role": "burn"}, + {"name": "allegro", "machine": "104.131.15.18", "role": "burn"}, + ]) + findings = validate_registry(registry) + critical = [f for f in findings if f.severity == "critical" and f.category == "duplicate"] + # Two findings: one for name-on-multiple-machines, one for duplicate name + assert len(critical) >= 1 + machine_findings = [f for f in critical if "registered on" in f.description] + assert len(machine_findings) == 1 + assert "167.99.126.228" in machine_findings[0].description + assert "104.131.15.18" in machine_findings[0].description + + def test_multiple_agents_same_machine_ok(self): + # Multiple different agents on the same VPS is normal. + registry = self._make_registry([ + {"name": "allegro", "machine": "167.99.126.228", "role": "burn"}, + {"name": "bilbo", "machine": "167.99.126.228", "role": "queries"}, + ]) + findings = validate_registry(registry) + critical = [f for f in findings if f.severity == "critical"] + assert len(critical) == 0 + + def test_duplicate_name_detected(self): + registry = self._make_registry([ + {"name": "bezalel", "machine": "104.131.15.18", "role": "ci"}, + {"name": "bezalel", "machine": "167.99.126.228", "role": "ci"}, + ]) + findings = validate_registry(registry) + name_dupes = [f for f in findings if f.severity == "critical" and "bezalel" in f.description.lower() and "registered on" in f.description.lower()] + assert len(name_dupes) == 1 + + def test_duplicate_gitea_user_detected(self): + registry = self._make_registry([ + {"name": "agent-a", "machine": "host1", "role": "x", "gitea_user": "shared"}, + {"name": "agent-b", "machine": "host2", "role": "x", "gitea_user": "shared"}, + ]) + findings = validate_registry(registry) + gitea_dupes = [f for f in findings if "Gitea user 'shared'" in f.description] + assert len(gitea_dupes) == 1 + assert "agent-a" in gitea_dupes[0].affected + assert "agent-b" in gitea_dupes[0].affected + + def test_missing_required_fields(self): + registry = self._make_registry([ + {"name": "incomplete-agent"}, + ]) + findings = validate_registry(registry) + missing = [f for f in findings if f.category == "orphan"] + assert len(missing) >= 1 + assert "machine" in missing[0].description or "role" in missing[0].description + + def test_empty_registry_passes(self): + registry = self._make_registry([]) + findings = validate_registry(registry) + assert len(findings) == 0 + + +# --------------------------------------------------------------------------- +# Cross-reference tests +# --------------------------------------------------------------------------- + +class TestCrossReference: + """Test registry vs fleet-routing.json cross-reference.""" + + def test_orphan_in_fleet_not_registry(self): + reg_agents = [{"name": "allegro", "machine": "x", "role": "y"}] + fleet_agents = [{"name": "allegro", "location": "x"}, {"name": "unknown-agent", "location": "y"}] + findings = cross_reference_registry_agents(reg_agents, fleet_agents) + orphans = [f for f in findings if f.category == "orphan" and "unknown-agent" in f.description] + assert len(orphans) == 1 + + def test_location_mismatch_detected(self): + reg_agents = [{"name": "allegro", "machine": "167.99.126.228", "role": "y"}] + fleet_agents = [{"name": "allegro", "location": "totally-different-host"}] + findings = cross_reference_registry_agents(reg_agents, fleet_agents) + mismatches = [f for f in findings if f.category == "duplicate" and "different locations" in f.description] + assert len(mismatches) == 1 + + +# --------------------------------------------------------------------------- +# Integration test against actual registry +# --------------------------------------------------------------------------- + +class TestRealRegistry: + """Test against the actual identity-registry.yaml in the repo.""" + + def test_registry_loads(self): + reg_path = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml" + if reg_path.exists(): + with open(reg_path) as f: + registry = yaml.safe_load(f) + assert registry["version"] == 1 + assert len(registry["agents"]) > 0 + + def test_registry_no_critical_findings(self): + reg_path = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml" + if reg_path.exists(): + with open(reg_path) as f: + registry = yaml.safe_load(f) + findings = validate_registry(registry) + critical = [f for f in findings if f.severity == "critical"] + assert len(critical) == 0, f"Critical findings: {[f.description for f in critical]}"