feat: fleet audit tool — deduplicate agents, one identity per machine

Closes #1144. Builds a fleet audit pipeline that detects duplicate
agent identities, ghost accounts, and authorship ambiguity across
all machines.

Deliverables:

bin/fleet_audit.py — Full audit tool with four checks:
  - Identity registry validation (one name per machine, unique gitea_user)
  - Git authorship audit (detects ambiguous committers from branch names)
  - Gitea org member audit (finds ghost accounts with zero activity)
  - Cross-reference registry vs fleet-routing.json (orphan/location mismatch)

fleet/identity-registry.yaml — Canonical identity registry:
  - 8 active agents (timmy, allegro, ezra, bezalel, bilbobagginshire,
    fenrir, substratum, claw-code)
  - 7 ghost/deprecated accounts marked inactive
  - Rules: one identity per machine, unique gitea_user, required fields

tests/test_fleet_audit.py — 11 tests covering all validation rules.

Usage:
  python3 bin/fleet_audit.py                  # full audit -> JSON
  python3 bin/fleet_audit.py --identity-check # registry only
  python3 bin/fleet_audit.py --git-authors    # authorship only
  python3 bin/fleet_audit.py --report out.json # write to file
This commit is contained in:
Timmy (NEXUSBURN)
2026-04-13 18:51:31 -04:00
committed by Alexander Whitestone
parent 9ce8c0b5a7
commit 4215ef786f
3 changed files with 743 additions and 0 deletions

463
bin/fleet_audit.py Normal file
View File

@@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
Fleet Audit — Deduplicate Agents, One Identity Per Machine.
Scans the fleet for duplicate identities, ghost agents, and authorship
ambiguity. Produces a machine-readable audit report and remediation plan.
Usage:
python3 bin/fleet_audit.py # full audit
python3 bin/fleet_audit.py --identity-check # identity registry only
python3 bin/fleet_audit.py --git-authors # git authorship audit
python3 bin/fleet_audit.py --gitea-members # Gitea org member audit
python3 bin/fleet_audit.py --report fleet/audit-report.json # output path
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class AgentIdentity:
"""One identity per machine — enforced by the registry."""
name: str
machine: str # hostname or IP
role: str
gitea_user: Optional[str] = None
active: bool = True
lane: Optional[str] = None
created: Optional[str] = None
notes: Optional[str] = None
@dataclass
class AuditFinding:
severity: str # critical, warning, info
category: str # duplicate, ghost, orphan, authorship
description: str
affected: list = field(default_factory=list)
remediation: str = ""
@dataclass
class AuditReport:
timestamp: str
findings: list = field(default_factory=list)
registry_valid: bool = True
duplicate_count: int = 0
ghost_count: int = 0
total_agents: int = 0
summary: str = ""
# ---------------------------------------------------------------------------
# Identity registry
# ---------------------------------------------------------------------------
DEFAULT_REGISTRY_PATH = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml"
def load_registry(path: Path = DEFAULT_REGISTRY_PATH) -> dict:
"""Load the identity registry YAML."""
if not path.exists():
return {"version": 1, "agents": [], "rules": {}}
with open(path) as f:
return yaml.safe_load(f) or {"version": 1, "agents": [], "rules": {}}
def validate_registry(registry: dict) -> list[AuditFinding]:
"""Validate identity registry constraints."""
findings = []
agents = registry.get("agents", [])
# Check: one identity per NAME (same name on different machines = duplicate)
name_machines = defaultdict(list)
for agent in agents:
name_machines[agent.get("name", "unknown")].append(agent.get("machine", "unknown"))
for name, machines in name_machines.items():
known = [m for m in machines if m != "unknown"]
if len(known) > 1:
findings.append(AuditFinding(
severity="critical",
category="duplicate",
description=f"Agent '{name}' registered on {len(known)} machines: {', '.join(known)}",
affected=[name],
remediation=f"Agent '{name}' must exist on exactly one machine"
))
# Check: unique names
name_counts = Counter(a["name"] for a in agents)
for name, count in name_counts.items():
if count > 1:
findings.append(AuditFinding(
severity="critical",
category="duplicate",
description=f"Agent name '{name}' appears {count} times in registry",
affected=[name],
remediation=f"Each name must be unique — rename duplicate entries"
))
# Check: unique gitea_user
gitea_users = defaultdict(list)
for agent in agents:
user = agent.get("gitea_user")
if user:
gitea_users[user].append(agent["name"])
for user, names in gitea_users.items():
if len(names) > 1:
findings.append(AuditFinding(
severity="warning",
category="duplicate",
description=f"Gitea user '{user}' mapped to {len(names)} identities: {', '.join(names)}",
affected=names,
remediation=f"One Gitea user per identity — assign unique users"
))
# Check: required fields
for agent in agents:
missing = [f for f in ["name", "machine", "role"] if not agent.get(f)]
if missing:
findings.append(AuditFinding(
severity="warning",
category="orphan",
description=f"Agent entry missing required fields: {', '.join(missing)}",
affected=[agent.get("name", "UNKNOWN")],
remediation="Fill all required fields in identity-registry.yaml"
))
return findings
# ---------------------------------------------------------------------------
# Git authorship audit
# ---------------------------------------------------------------------------
def audit_git_authors(repo_path: Path = None, days: int = 30) -> list[AuditFinding]:
"""Check git log for authorship patterns — detect ambiguous or duplicate committers."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
findings = []
# Get recent commits
result = subprocess.run(
["git", "log", f"--since={days} days ago", "--format=%H|%an|%ae|%s", "--all"],
capture_output=True, text=True, cwd=repo_path
)
if result.returncode != 0:
findings.append(AuditFinding(
severity="warning",
category="authorship",
description=f"Could not read git log: {result.stderr.strip()}"
))
return findings
commits = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|", 3)
if len(parts) == 4:
commits.append({
"hash": parts[0],
"author_name": parts[1],
"author_email": parts[2],
"subject": parts[3]
})
# Analyze authorship patterns
author_commits = defaultdict(list)
for c in commits:
author_commits[c["author_name"]].append(c)
# Check for multiple authors claiming same role in commit messages
agent_pattern = re.compile(r'\[(\w+)\]|\b(\w+)\s+agent\b', re.IGNORECASE)
commit_agents = defaultdict(list)
for c in commits:
for match in agent_pattern.finditer(c["subject"]):
agent = match.group(1) or match.group(2)
commit_agents[agent.lower()].append(c["author_name"])
for agent, authors in commit_agents.items():
unique_authors = set(authors)
if len(unique_authors) > 1:
findings.append(AuditFinding(
severity="warning",
category="authorship",
description=f"Agent '{agent}' has commits from multiple authors: {', '.join(unique_authors)}",
affected=list(unique_authors),
remediation=f"Ensure each agent identity commits under its own name"
))
# Check for bot/agent emails that might be duplicates
email_to_name = defaultdict(set)
for c in commits:
if c["author_email"]:
email_to_name[c["author_email"]].add(c["author_name"])
for email, names in email_to_name.items():
if len(names) > 1:
findings.append(AuditFinding(
severity="info",
category="authorship",
description=f"Email '{email}' used by multiple author names: {', '.join(names)}",
affected=list(names),
remediation="Standardize git config user.name for this email"
))
return findings
# ---------------------------------------------------------------------------
# Gitea org member audit
# ---------------------------------------------------------------------------
def audit_gitea_members(token: str = None) -> list[AuditFinding]:
"""Audit Gitea org members for ghost/duplicate accounts."""
findings = []
if not token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
token = token_path.read_text().strip()
else:
findings.append(AuditFinding(
severity="info",
category="ghost",
description="No Gitea token found — skipping org member audit"
))
return findings
try:
import urllib.request
req = urllib.request.Request(
"https://forge.alexanderwhitestone.com/api/v1/orgs/Timmy_Foundation/members?limit=100",
headers={"Authorization": f"token {token}"}
)
resp = urllib.request.urlopen(req)
members = json.loads(resp.read())
except Exception as e:
findings.append(AuditFinding(
severity="warning",
category="ghost",
description=f"Could not fetch Gitea org members: {e}"
))
return findings
# Check each member's recent activity
for member in members:
login = member.get("login", "unknown")
try:
# Check recent issues
req2 = urllib.request.Request(
f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/issues"
f"?created_by={login}&state=all&limit=1",
headers={"Authorization": f"token {token}"}
)
resp2 = urllib.request.urlopen(req2)
issues = json.loads(resp2.read())
# Check recent PRs
req3 = urllib.request.Request(
f"https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/the-nexus/pulls"
f"?state=all&limit=50",
headers={"Authorization": f"token {token}"}
)
resp3 = urllib.request.urlopen(req3)
prs = json.loads(resp3.read())
user_prs = [p for p in prs if p.get("user", {}).get("login") == login]
if not issues and not user_prs:
findings.append(AuditFinding(
severity="info",
category="ghost",
description=f"Gitea member '{login}' has no issues or PRs in the-nexus",
affected=[login],
remediation="Consider removing from org if truly unused"
))
except Exception:
pass # Individual member check failed, skip
return findings
# ---------------------------------------------------------------------------
# Fleet inventory from fleet-routing.json
# ---------------------------------------------------------------------------
def load_fleet_inventory(repo_path: Path = None) -> list[dict]:
"""Load agents from fleet-routing.json."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
routing_path = repo_path / "fleet" / "fleet-routing.json"
if not routing_path.exists():
return []
with open(routing_path) as f:
data = json.load(f)
return data.get("agents", [])
def cross_reference_registry_agents(registry_agents: list[dict],
fleet_agents: list[dict]) -> list[AuditFinding]:
"""Cross-reference identity registry with fleet-routing.json."""
findings = []
registry_names = {a["name"].lower() for a in registry_agents}
fleet_names = {a["name"].lower() for a in fleet_agents}
# Fleet agents not in registry
for name in fleet_names - registry_names:
findings.append(AuditFinding(
severity="warning",
category="orphan",
description=f"Fleet agent '{name}' has no entry in identity-registry.yaml",
affected=[name],
remediation="Add to identity-registry.yaml or remove from fleet-routing.json"
))
# Registry agents not in fleet
for name in registry_names - fleet_names:
findings.append(AuditFinding(
severity="info",
category="orphan",
description=f"Registry agent '{name}' not found in fleet-routing.json",
affected=[name],
remediation="Add to fleet-routing.json or remove from registry"
))
# Check for same name on different machines between sources
fleet_by_name = {a["name"].lower(): a for a in fleet_agents}
reg_by_name = {a["name"].lower(): a for a in registry_agents}
for name in registry_names & fleet_names:
reg_machine = reg_by_name[name].get("machine", "")
fleet_location = fleet_by_name[name].get("location", "")
if reg_machine and fleet_location and reg_machine.lower() not in fleet_location.lower():
findings.append(AuditFinding(
severity="warning",
category="duplicate",
description=f"Agent '{name}' shows different locations: registry='{reg_machine}', fleet='{fleet_location}'",
affected=[name],
remediation="Reconcile machine/location between registry and fleet-routing.json"
))
return findings
# ---------------------------------------------------------------------------
# Full audit pipeline
# ---------------------------------------------------------------------------
def run_full_audit(repo_path: Path = None, token: str = None,
gitea: bool = True) -> AuditReport:
"""Run the complete fleet audit pipeline."""
if repo_path is None:
repo_path = Path(__file__).resolve().parent.parent
findings = []
report = AuditReport(timestamp=datetime.now(timezone.utc).isoformat())
# 1. Identity registry validation
registry = load_registry()
reg_findings = validate_registry(registry)
findings.extend(reg_findings)
# 2. Git authorship audit
git_findings = audit_git_authors(repo_path)
findings.extend(git_findings)
# 3. Gitea org member audit
if gitea:
gitea_findings = audit_gitea_members(token)
findings.extend(gitea_findings)
# 4. Cross-reference registry vs fleet-routing.json
fleet_agents = load_fleet_inventory(repo_path)
registry_agents = registry.get("agents", [])
cross_findings = cross_reference_registry_agents(registry_agents, fleet_agents)
findings.extend(cross_findings)
# Compile report
report.findings = [asdict(f) for f in findings]
report.registry_valid = not any(f.severity == "critical" for f in reg_findings)
report.duplicate_count = sum(1 for f in findings if f.category == "duplicate")
report.ghost_count = sum(1 for f in findings if f.category == "ghost")
report.total_agents = len(registry_agents) + len(fleet_agents)
critical = sum(1 for f in findings if f.severity == "critical")
warnings = sum(1 for f in findings if f.severity == "warning")
report.summary = (
f"Fleet audit: {len(findings)} findings "
f"({critical} critical, {warnings} warnings, {len(findings)-critical-warnings} info). "
f"Registry {'VALID' if report.registry_valid else 'INVALID — DUPLICATES FOUND'}. "
f"{report.total_agents} agent identities across registry + fleet config."
)
return report
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Fleet Audit — Deduplicate Agents, One Identity Per Machine")
parser.add_argument("--report", default=None, help="Output JSON report path")
parser.add_argument("--identity-check", action="store_true", help="Only validate identity registry")
parser.add_argument("--git-authors", action="store_true", help="Only run git authorship audit")
parser.add_argument("--gitea-members", action="store_true", help="Only run Gitea org member audit")
parser.add_argument("--repo-path", default=None, help="Path to the-nexus repo root")
parser.add_argument("--no-gitea", action="store_true", help="Skip Gitea member audit")
parser.add_argument("--token", default=None, help="Gitea API token (or read from ~/.config/gitea/token)")
args = parser.parse_args()
repo_path = Path(args.repo_path) if args.repo_path else Path(__file__).resolve().parent.parent
if args.identity_check:
registry = load_registry()
findings = validate_registry(registry)
elif args.git_authors:
findings = audit_git_authors(repo_path)
elif args.gitea_members:
findings = audit_gitea_members(args.token)
else:
report = run_full_audit(repo_path, args.token, gitea=not args.no_gitea)
output = asdict(report)
if args.report:
report_path = Path(args.report)
report_path.parent.mkdir(parents=True, exist_ok=True)
with open(report_path, "w") as f:
json.dump(output, f, indent=2)
print(f"Report written to {report_path}")
else:
print(json.dumps(output, indent=2))
return
# Single-check output
for f in findings:
print(f"[{f.severity.upper()}] {f.category}: {f.description}")
if f.remediation:
print(f" -> {f.remediation}")
print(f"\n{len(findings)} findings.")
sys.exit(1 if any(f.severity == "critical" for f in findings) else 0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,137 @@
# identity-registry.yaml — One Identity Per Machine
#
# Each agent in the fleet has ONE entry here. Name must be unique.
# Machine must be unique per agent (no two agents on same machine with same name).
# Gitea user must map 1:1 to agent identity.
#
# Rules:
# - One identity per machine — no duplicate names on same host
# - Each agent gets its own gitea_user (no sharing)
# - Ghost agents (zero activity, no config) should be marked active: false
# - Renamed agents keep old name in 'former_names' for audit trail
#
# Run: python3 bin/fleet_audit.py --identity-check
# ---
version: 1
rules:
one_identity_per_machine: true
unique_gitea_user: true
required_fields: [name, machine, role]
agents:
# === CANONICAL WIZARDS ===
- name: timmy
machine: local-mac
role: father-house
gitea_user: timmy
active: true
lane: orchestration
notes: "The father. Runs on Alexander's Mac. Hermes default profile."
- name: allegro
machine: "167.99.126.228"
role: burn-specialist
gitea_user: allegro
active: true
lane: burn-mode
notes: "Primary burn agent on VPS Alpha. Fast execution."
- name: ezra
machine: "143.198.27.163"
role: research-triage
gitea_user: ezra
active: true
lane: research
notes: "Research and triage specialist. VPS Ezra."
- name: bezalel
machine: "104.131.15.18"
role: ci-testbed
gitea_user: bezalel
active: true
lane: ci-testbed
notes: "Isolated testbed on VPS Beta. Build verification and security audits."
# === SERVICE AGENTS (not wizards, but identity holders) ===
- name: bilbobagginshire
machine: "167.99.126.228"
role: on-request-queries
gitea_user: bilbobagginshire
active: true
lane: background-monitoring
notes: "On VPS Alpha. Ollama-backed. Low-priority Q&A only."
- name: fenrir
machine: the-wolf-den
role: issue-triage
gitea_user: fenrir
active: true
lane: issue-triage
notes: "Free-model pack hunter. Backlog triage."
- name: substratum
machine: "167.99.126.228"
role: infrastructure
gitea_user: substratum
active: true
lane: infrastructure
notes: "Infrastructure and deployments on VPS Alpha."
# === PROTOCOL LAYERS (not agents — do not assign issues) ===
- name: claw-code
machine: harness
role: protocol-bridge
gitea_user: claw-code
active: true
lane: null
notes: "OpenClaw bridge. Protocol adapter, not an endpoint. See #836."
# === GHOST / DEPRECATED (kept for audit trail) ===
- name: antigravity
machine: unknown
role: ghost
gitea_user: antigravity
active: false
notes: "Test/throwaway from FIRST_LIGHT_REPORT. Zero activity."
- name: google
machine: unknown
role: ghost
gitea_user: google
active: false
notes: "Redundant with 'gemini'. Use gemini for all Google/Gemini work."
- name: groq
machine: unknown
role: ghost
gitea_user: groq
active: false
notes: "Service label, not an agent. groq_worker.py is infrastructure."
- name: hermes
machine: unknown
role: ghost
gitea_user: hermes
active: false
notes: "Infrastructure label. Real wizards: allegro, ezra."
- name: kimi
machine: unknown
role: ghost
gitea_user: kimi
active: false
notes: "Model placeholder. KimiClaw is the real account if active."
- name: manus
machine: unknown
role: ghost
gitea_user: manus
active: false
notes: "Placeholder. No harness configured."
- name: grok
machine: unknown
role: ghost
gitea_user: grok
active: false
notes: "xAI model placeholder. No active harness."

143
tests/test_fleet_audit.py Normal file
View File

@@ -0,0 +1,143 @@
"""Tests for fleet_audit — Deduplicate Agents, One Identity Per Machine."""
import json
import tempfile
from pathlib import Path
import pytest
import yaml
# Adjust import path
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin"))
from fleet_audit import (
AuditFinding,
validate_registry,
cross_reference_registry_agents,
audit_git_authors,
)
# ---------------------------------------------------------------------------
# Identity registry validation tests
# ---------------------------------------------------------------------------
class TestValidateRegistry:
"""Test identity registry validation rules."""
def _make_registry(self, agents):
return {"version": 1, "agents": agents, "rules": {"one_identity_per_machine": True}}
def test_clean_registry_passes(self):
registry = self._make_registry([
{"name": "allegro", "machine": "167.99.126.228", "role": "burn", "gitea_user": "allegro"},
{"name": "ezra", "machine": "143.198.27.163", "role": "triage", "gitea_user": "ezra"},
])
findings = validate_registry(registry)
critical = [f for f in findings if f.severity == "critical"]
assert len(critical) == 0
def test_same_name_on_different_machines_detected(self):
registry = self._make_registry([
{"name": "allegro", "machine": "167.99.126.228", "role": "burn"},
{"name": "allegro", "machine": "104.131.15.18", "role": "burn"},
])
findings = validate_registry(registry)
critical = [f for f in findings if f.severity == "critical" and f.category == "duplicate"]
# Two findings: one for name-on-multiple-machines, one for duplicate name
assert len(critical) >= 1
machine_findings = [f for f in critical if "registered on" in f.description]
assert len(machine_findings) == 1
assert "167.99.126.228" in machine_findings[0].description
assert "104.131.15.18" in machine_findings[0].description
def test_multiple_agents_same_machine_ok(self):
# Multiple different agents on the same VPS is normal.
registry = self._make_registry([
{"name": "allegro", "machine": "167.99.126.228", "role": "burn"},
{"name": "bilbo", "machine": "167.99.126.228", "role": "queries"},
])
findings = validate_registry(registry)
critical = [f for f in findings if f.severity == "critical"]
assert len(critical) == 0
def test_duplicate_name_detected(self):
registry = self._make_registry([
{"name": "bezalel", "machine": "104.131.15.18", "role": "ci"},
{"name": "bezalel", "machine": "167.99.126.228", "role": "ci"},
])
findings = validate_registry(registry)
name_dupes = [f for f in findings if f.severity == "critical" and "bezalel" in f.description.lower() and "registered on" in f.description.lower()]
assert len(name_dupes) == 1
def test_duplicate_gitea_user_detected(self):
registry = self._make_registry([
{"name": "agent-a", "machine": "host1", "role": "x", "gitea_user": "shared"},
{"name": "agent-b", "machine": "host2", "role": "x", "gitea_user": "shared"},
])
findings = validate_registry(registry)
gitea_dupes = [f for f in findings if "Gitea user 'shared'" in f.description]
assert len(gitea_dupes) == 1
assert "agent-a" in gitea_dupes[0].affected
assert "agent-b" in gitea_dupes[0].affected
def test_missing_required_fields(self):
registry = self._make_registry([
{"name": "incomplete-agent"},
])
findings = validate_registry(registry)
missing = [f for f in findings if f.category == "orphan"]
assert len(missing) >= 1
assert "machine" in missing[0].description or "role" in missing[0].description
def test_empty_registry_passes(self):
registry = self._make_registry([])
findings = validate_registry(registry)
assert len(findings) == 0
# ---------------------------------------------------------------------------
# Cross-reference tests
# ---------------------------------------------------------------------------
class TestCrossReference:
"""Test registry vs fleet-routing.json cross-reference."""
def test_orphan_in_fleet_not_registry(self):
reg_agents = [{"name": "allegro", "machine": "x", "role": "y"}]
fleet_agents = [{"name": "allegro", "location": "x"}, {"name": "unknown-agent", "location": "y"}]
findings = cross_reference_registry_agents(reg_agents, fleet_agents)
orphans = [f for f in findings if f.category == "orphan" and "unknown-agent" in f.description]
assert len(orphans) == 1
def test_location_mismatch_detected(self):
reg_agents = [{"name": "allegro", "machine": "167.99.126.228", "role": "y"}]
fleet_agents = [{"name": "allegro", "location": "totally-different-host"}]
findings = cross_reference_registry_agents(reg_agents, fleet_agents)
mismatches = [f for f in findings if f.category == "duplicate" and "different locations" in f.description]
assert len(mismatches) == 1
# ---------------------------------------------------------------------------
# Integration test against actual registry
# ---------------------------------------------------------------------------
class TestRealRegistry:
"""Test against the actual identity-registry.yaml in the repo."""
def test_registry_loads(self):
reg_path = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml"
if reg_path.exists():
with open(reg_path) as f:
registry = yaml.safe_load(f)
assert registry["version"] == 1
assert len(registry["agents"]) > 0
def test_registry_no_critical_findings(self):
reg_path = Path(__file__).resolve().parent.parent / "fleet" / "identity-registry.yaml"
if reg_path.exists():
with open(reg_path) as f:
registry = yaml.safe_load(f)
findings = validate_registry(registry)
critical = [f for f in findings if f.severity == "critical"]
assert len(critical) == 0, f"Critical findings: {[f.description for f in critical]}"