Files
the-nexus/bin/fleet_audit.py
Timmy (NEXUSBURN) 66c010301d
Some checks failed
CI / test (pull_request) Failing after 38s
Review Approval Gate / verify-review (pull_request) Failing after 5s
CI / validate (pull_request) Failing after 34s
feat: fleet audit tool — deduplicate agents, one identity per machine
Closes #1144. Builds a fleet audit pipeline that detects duplicate
agent identities, ghost accounts, and authorship ambiguity across
all machines.

Deliverables:

bin/fleet_audit.py — Full audit tool with four checks:
  - Identity registry validation (one name per machine, unique gitea_user)
  - Git authorship audit (detects ambiguous committers from branch names)
  - Gitea org member audit (finds ghost accounts with zero activity)
  - Cross-reference registry vs fleet-routing.json (orphan/location mismatch)

fleet/identity-registry.yaml — Canonical identity registry:
  - 8 active agents (timmy, allegro, ezra, bezalel, bilbobagginshire,
    fenrir, substratum, claw-code)
  - 7 ghost/deprecated accounts marked inactive
  - Rules: one identity per machine, unique gitea_user, required fields

tests/test_fleet_audit.py — 11 tests covering all validation rules.

Usage:
  python3 bin/fleet_audit.py                  # full audit -> JSON
  python3 bin/fleet_audit.py --identity-check # registry only
  python3 bin/fleet_audit.py --git-authors    # authorship only
  python3 bin/fleet_audit.py --report out.json # write to file
2026-04-13 18:51:31 -04:00

464 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Fleet Audit — Deduplicate Agents, One Identity Per Machine.
Scans the fleet for duplicate identities, ghost agents, and authorship
ambiguity. Produces a machine-readable audit report and remediation plan.
Usage:
python3 bin/fleet_audit.py # full audit
python3 bin/fleet_audit.py --identity-check # identity registry only
python3 bin/fleet_audit.py --git-authors # git authorship audit
python3 bin/fleet_audit.py --gitea-members # Gitea org member audit
python3 bin/fleet_audit.py --report fleet/audit-report.json # output path
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class AgentIdentity:
"""One identity per machine — enforced by the registry."""
name: str
machine: str # hostname or IP
role: str
gitea_user: Optional[str] = None
active: bool = True
lane: Optional[str] = None
created: Optional[str] = None
notes: Optional[str] = None
@dataclass
class AuditFinding:
severity: str # critical, warning, info
category: str # duplicate, ghost, orphan, authorship
description: str
affected: list = field(default_factory=list)
remediation: str = ""
@dataclass
class AuditReport:
timestamp: str
findings: list = field(default_factory=list)
registry_valid: bool = True
duplicate_count: int = 0
ghost_count: int = 0
total_agents: int = 0
summary: str = ""
# ---------------------------------------------------------------------------
# Identity registry
# ---------------------------------------------------------------------------
DEFAULT_REGISTRY_PATH = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml"
def load_registry(path: Path = DEFAULT_REGISTRY_PATH) -> dict:
"""Load the identity registry YAML."""
if not path.exists():
return {"version": 1, "agents": [], "rules": {}}
with open(path) as f:
return yaml.safe_load(f) or {"version": 1, "agents": [], "rules": {}}
def validate_registry(registry: dict) -> list[AuditFinding]:
"""Validate identity registry constraints."""
findings = []
agents = registry.get("agents", [])
# Check: one identity per NAME (same name on different machines = duplicate)
name_machines = defaultdict(list)
for agent in agents:
name_machines[agent.get("name", "unknown")].append(agent.get("machine", "unknown"))
for name, machines in name_machines.items():
known = [m for m in machines if m != "unknown"]
if len(known) > 1:
findings.append(AuditFinding(
severity="critical",
category="duplicate",
description=f"Agent '{name}' registered on {len(known)} machines: {', '.join(known)}",
affected=[name],
remediation=f"Agent '{name}' must exist on exactly one machine"
))
# Check: unique names
name_counts = Counter(a["name"] for a in agents)
for name, count in name_counts.items():
if count > 1:
findings.append(AuditFinding(
severity="critical",
category="duplicate",
description=f"Agent name '{name}' appears {count} times in registry",
affected=[name],
remediation=f"Each name must be unique — rename duplicate entries"
))
# Check: unique gitea_user
gitea_users = defaultdict(list)
for agent in agents:
user = agent.get("gitea_user")
if user:
gitea_users[user].append(agent["name"])
for user, names in gitea_users.items():
if len(names) > 1:
findings.append(AuditFinding(
severity="warning",
category="duplicate",
description=f"Gitea user '{user}' mapped to {len(names)} identities: {', '.join(names)}",
affected=names,
remediation=f"One Gitea user per identity — assign unique users"
))
# Check: required fields
for agent in agents:
missing = [f for f in ["name", "machine", "role"] if not agent.get(f)]
if missing:
findings.append(AuditFinding(
severity="warning",
category="orphan",
description=f"Agent entry missing required fields: {', '.join(missing)}",
affected=[agent.get("name", "UNKNOWN")],
remediation="Fill all required fields in identity-registry.yaml"
))
return findings
# ---------------------------------------------------------------------------
# Git authorship audit
# ---------------------------------------------------------------------------
def audit_git_authors(repo_path: Path = None, days: int = 30) -> list[AuditFinding]:
"""Check git log for authorship patterns — detect ambiguous or duplicate committers."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
findings = []
# Get recent commits
result = subprocess.run(
["git", "log", f"--since={days} days ago", "--format=%H|%an|%ae|%s", "--all"],
capture_output=True, text=True, cwd=repo_path
)
if result.returncode != 0:
findings.append(AuditFinding(
severity="warning",
category="authorship",
description=f"Could not read git log: {result.stderr.strip()}"
))
return findings
commits = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|", 3)
if len(parts) == 4:
commits.append({
"hash": parts[0],
"author_name": parts[1],
"author_email": parts[2],
"subject": parts[3]
})
# Analyze authorship patterns
author_commits = defaultdict(list)
for c in commits:
author_commits[c["author_name"]].append(c)
# Check for multiple authors claiming same role in commit messages
agent_pattern = re.compile(r'\[(\w+)\]|\b(\w+)\s+agent\b', re.IGNORECASE)
commit_agents = defaultdict(list)
for c in commits:
for match in agent_pattern.finditer(c["subject"]):
agent = match.group(1) or match.group(2)
commit_agents[agent.lower()].append(c["author_name"])
for agent, authors in commit_agents.items():
unique_authors = set(authors)
if len(unique_authors) > 1:
findings.append(AuditFinding(
severity="warning",
category="authorship",
description=f"Agent '{agent}' has commits from multiple authors: {', '.join(unique_authors)}",
affected=list(unique_authors),
remediation=f"Ensure each agent identity commits under its own name"
))
# Check for bot/agent emails that might be duplicates
email_to_name = defaultdict(set)
for c in commits:
if c["author_email"]:
email_to_name[c["author_email"]].add(c["author_name"])
for email, names in email_to_name.items():
if len(names) > 1:
findings.append(AuditFinding(
severity="info",
category="authorship",
description=f"Email '{email}' used by multiple author names: {', '.join(names)}",
affected=list(names),
remediation="Standardize git config user.name for this email"
))
return findings
# ---------------------------------------------------------------------------
# Gitea org member audit
# ---------------------------------------------------------------------------
def audit_gitea_members(token: str = None) -> list[AuditFinding]:
"""Audit Gitea org members for ghost/duplicate accounts."""
findings = []
if not token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
token = token_path.read_text().strip()
else:
findings.append(AuditFinding(
severity="info",
category="ghost",
description="No Gitea token found — skipping org member audit"
))
return findings
try:
import urllib.request
req = urllib.request.Request(
"https://forge.alexanderwhitestone.com/api/v1/orgs/Timmy_Foundation/members?limit=100",
headers={"Authorization": f"token {token}"}
)
resp = urllib.request.urlopen(req)
members = json.loads(resp.read())
except Exception as e:
findings.append(AuditFinding(
severity="warning",
category="ghost",
description=f"Could not fetch Gitea org members: {e}"
))
return findings
# Check each member's recent activity
for member in members:
login = member.get("login", "unknown")
try:
# Check recent issues
req2 = urllib.request.Request(
f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/issues"
f"?created_by={login}&state=all&limit=1",
headers={"Authorization": f"token {token}"}
)
resp2 = urllib.request.urlopen(req2)
issues = json.loads(resp2.read())
# Check recent PRs
req3 = urllib.request.Request(
f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/pulls"
f"?state=all&limit=50",
headers={"Authorization": f"token {token}"}
)
resp3 = urllib.request.urlopen(req3)
prs = json.loads(resp3.read())
user_prs = [p for p in prs if p.get("user", {}).get("login") == login]
if not issues and not user_prs:
findings.append(AuditFinding(
severity="info",
category="ghost",
description=f"Gitea member '{login}' has no issues or PRs in the-nexus",
affected=[login],
remediation="Consider removing from org if truly unused"
))
except Exception:
pass # Individual member check failed, skip
return findings
# ---------------------------------------------------------------------------
# Fleet inventory from fleet-routing.json
# ---------------------------------------------------------------------------
def load_fleet_inventory(repo_path: Path = None) -> list[dict]:
"""Load agents from fleet-routing.json."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
routing_path = repo_path / "fleet" / "fleet-routing.json"
if not routing_path.exists():
return []
with open(routing_path) as f:
data = json.load(f)
return data.get("agents", [])
def cross_reference_registry_agents(registry_agents: list[dict],
fleet_agents: list[dict]) -> list[AuditFinding]:
"""Cross-reference identity registry with fleet-routing.json."""
findings = []
registry_names = {a["name"].lower() for a in registry_agents}
fleet_names = {a["name"].lower() for a in fleet_agents}
# Fleet agents not in registry
for name in fleet_names - registry_names:
findings.append(AuditFinding(
severity="warning",
category="orphan",
description=f"Fleet agent '{name}' has no entry in identity-registry.yaml",
affected=[name],
remediation="Add to identity-registry.yaml or remove from fleet-routing.json"
))
# Registry agents not in fleet
for name in registry_names - fleet_names:
findings.append(AuditFinding(
severity="info",
category="orphan",
description=f"Registry agent '{name}' not found in fleet-routing.json",
affected=[name],
remediation="Add to fleet-routing.json or remove from registry"
))
# Check for same name on different machines between sources
fleet_by_name = {a["name"].lower(): a for a in fleet_agents}
reg_by_name = {a["name"].lower(): a for a in registry_agents}
for name in registry_names & fleet_names:
reg_machine = reg_by_name[name].get("machine", "")
fleet_location = fleet_by_name[name].get("location", "")
if reg_machine and fleet_location and reg_machine.lower() not in fleet_location.lower():
findings.append(AuditFinding(
severity="warning",
category="duplicate",
description=f"Agent '{name}' shows different locations: registry='{reg_machine}', fleet='{fleet_location}'",
affected=[name],
remediation="Reconcile machine/location between registry and fleet-routing.json"
))
return findings
# ---------------------------------------------------------------------------
# Full audit pipeline
# ---------------------------------------------------------------------------
def run_full_audit(repo_path: Path = None, token: str = None,
gitea: bool = True) -> AuditReport:
"""Run the complete fleet audit pipeline."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
findings = []
report = AuditReport(timestamp=datetime.now(timezone.utc).isoformat())
# 1. Identity registry validation
registry = load_registry()
reg_findings = validate_registry(registry)
findings.extend(reg_findings)
# 2. Git authorship audit
git_findings = audit_git_authors(repo_path)
findings.extend(git_findings)
# 3. Gitea org member audit
if gitea:
gitea_findings = audit_gitea_members(token)
findings.extend(gitea_findings)
# 4. Cross-reference registry vs fleet-routing.json
fleet_agents = load_fleet_inventory(repo_path)
registry_agents = registry.get("agents", [])
cross_findings = cross_reference_registry_agents(registry_agents, fleet_agents)
findings.extend(cross_findings)
# Compile report
report.findings = [asdict(f) for f in findings]
report.registry_valid = not any(f.severity == "critical" for f in reg_findings)
report.duplicate_count = sum(1 for f in findings if f.category == "duplicate")
report.ghost_count = sum(1 for f in findings if f.category == "ghost")
report.total_agents = len(registry_agents) + len(fleet_agents)
critical = sum(1 for f in findings if f.severity == "critical")
warnings = sum(1 for f in findings if f.severity == "warning")
report.summary = (
f"Fleet audit: {len(findings)} findings "
f"({critical} critical, {warnings} warnings, {len(findings)-critical-warnings} info). "
f"Registry {'VALID' if report.registry_valid else 'INVALID — DUPLICATES FOUND'}. "
f"{report.total_agents} agent identities across registry + fleet config."
)
return report
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Fleet Audit — Deduplicate Agents, One Identity Per Machine")
parser.add_argument("--report", default=None, help="Output JSON report path")
parser.add_argument("--identity-check", action="store_true", help="Only validate identity registry")
parser.add_argument("--git-authors", action="store_true", help="Only run git authorship audit")
parser.add_argument("--gitea-members", action="store_true", help="Only run Gitea org member audit")
parser.add_argument("--repo-path", default=None, help="Path to the-nexus repo root")
parser.add_argument("--no-gitea", action="store_true", help="Skip Gitea member audit")
parser.add_argument("--token", default=None, help="Gitea API token (or read from ~/.config/gitea/token)")
args = parser.parse_args()
repo_path = Path(args.repo_path) if args.repo_path else Path(__file__).resolve().parent.parent
if args.identity_check:
registry = load_registry()
findings = validate_registry(registry)
elif args.git_authors:
findings = audit_git_authors(repo_path)
elif args.gitea_members:
findings = audit_gitea_members(args.token)
else:
report = run_full_audit(repo_path, args.token, gitea=not args.no_gitea)
output = asdict(report)
if args.report:
report_path = Path(args.report)
report_path.parent.mkdir(parents=True, exist_ok=True)
with open(report_path, "w") as f:
json.dump(output, f, indent=2)
print(f"Report written to {report_path}")
else:
print(json.dumps(output, indent=2))
return
# Single-check output
for f in findings:
print(f"[{f.severity.upper()}] {f.category}: {f.description}")
if f.remediation:
print(f" -> {f.remediation}")
print(f"\n{len(findings)} findings.")
sys.exit(1 if any(f.severity == "critical" for f in findings) else 0)
if __name__ == "__main__":
main()