Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
9339b1a2d4 docs: ground unified fleet sovereignty directive
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 12s
Agent PR Gate / gate (pull_request) Failing after 31s
Smoke Test / smoke (pull_request) Failing after 16s
Agent PR Gate / report (pull_request) Successful in 9s
Refs #524
2026-04-22 10:29:16 -04:00
Alexander Whitestone
a6ec2055cc wip: add issue 524 directive grounding script 2026-04-22 10:27:30 -04:00
7 changed files with 602 additions and 670 deletions

View File

@@ -1,18 +0,0 @@
{
"owner": "Timmy_Foundation",
"repos": [
"timmy-home",
"timmy-config",
"fleet-ops",
"the-beacon",
"the-door",
"the-nexus"
],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6
}
}

View File

@@ -1,70 +0,0 @@
# Burn-down Velocity Tracking
Refs #519.
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
## What it emits
Daily run outputs:
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
Tracked repos live in `configs/burn_velocity_repos.json`.
## Cron command
```bash
cd ~/timmy-home && \
python3 scripts/burn_velocity_tracker.py \
--config configs/burn_velocity_repos.json \
--output-json ~/.timmy/burn-velocity/latest.json \
--output-md ~/.timmy/burn-velocity/latest.md \
--history-file ~/.timmy/burn-velocity/history.json \
--write-history
```
Example crontab entry:
```cron
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
```
## Dashboard handoff
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
- `open_now`
- `opened_last_7d`
- `closed_last_7d`
- `baseline_closed`
- `weekly_net`
- `alert.status`
- `alert.kind`
- `alert.reason`
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
## Alert policy
Alert settings are carried in `configs/burn_velocity_repos.json`:
- `recent_days`
- `baseline_days`
- `minimum_baseline_closed`
- `drop_ratio`
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
## Gitea API contract
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
Live collection shape:
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
## Honest scope boundary
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.

View File

@@ -0,0 +1,107 @@
# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration
Grounding report for `timmy-home #524`.
Issue #524 is a multi-lane directive, not a one-commit feature. This report grounds the directive in repo evidence, highlights stale cross-links, and names the missing operator bundles that still need real execution.
This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.
## Directive Snapshot
- Repo-grounded workstreams: 0
- Partial workstreams: 4
- Missing workstreams: 1
- Drifted references: 4
## Reference Drift
- #813 is cited for Nostr Migration Leadership, but its current title is 'docs: refresh the-playground genome analysis (#671)'.
- #819 is cited for Nostr Migration Leadership, but its current title is 'docs: verify #648 already implemented (closes #818)'.
- #139 is cited for v0.7.0 Feature Audit, but its current title is '🐣 Allegro-Primus is born'.
- #103 is cited for Morrowind Local-First Benchmark, but its current title is 'Build comprehensive caching layer — cache everywhere'.
## Workstream Matrix
### 1. Nostr Migration Leadership — PARTIAL
- Requirement: Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.
- Referenced issues:
- #813 (closed) — docs: refresh the-playground genome analysis (#671) [DRIFT]
- #819 (open) — docs: verify #648 already implemented (closes #818) [DRIFT]
- Repo evidence present:
- `infrastructure/timmy-bridge/client/timmy_client.py` — Nostr event client scaffold already exists
- `infrastructure/timmy-bridge/monitor/timmy_monitor.py` — Nostr relay monitor already exists
- `specs/wizard-telegram-bot-cutover.md` — Telegram cutover planning exists, so the migration lane is real
- Missing operator deliverables:
- wizard keypair inventory and ownership matrix
- NIP-29 relay group verification report
- operator runbook for cutting traffic off Telegram
- Why this lane remains open: The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.
### 2. Lexicon Enforcement — PARTIAL
- Requirement: Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.
- Referenced issues:
- #388 (closed) — [KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents [aligned]
- Repo evidence present:
- `docs/WIZARD_APPRENTICESHIP_CHARTER.md` — The repo already uses wizard-language canon in docs
- `specs/timmy-ezra-bezalel-canon-sheet.md` — Canonical agent naming already exists
- `docs/OPERATIONS_DASHBOARD.md` — Operational roles are already described in repo language
- Missing operator deliverables:
- machine-checkable lexicon policy for review/triage
- terminology lint or reviewer checklist tied to the lexicon
- Why this lane remains open: The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.
### 3. v0.7.0 Feature Audit — PARTIAL
- Requirement: Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.
- Referenced issues:
- #139 (open) — 🐣 Allegro-Primus is born [DRIFT]
- Repo evidence present:
- `scripts/sovereignty_audit.py` — Cloud-vs-local audit machinery already exists
- `reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md` — Recent sovereignty audit report is committed
- `timmy-local/README.md` — Local-first status is already documented for operators
- Missing operator deliverables:
- Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
- Sovereignty Implementation Plan derived from that feature audit
- Why this lane remains open: The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.
### 4. Morrowind Local-First Benchmark — PARTIAL
- Requirement: Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.
- Referenced issues:
- #103 (open) — Build comprehensive caching layer — cache everywhere [DRIFT]
- Repo evidence present:
- `morrowind/local_brain.py` — Local Morrowind control loop already exists
- `morrowind/mcp_server.py` — Morrowind MCP control surface is already wired
- `morrowind/pilot.py` — Trajectory logging for evaluation already exists
- Missing operator deliverables:
- cloud-vs-local benchmark report for the combat loop
- reasoning-gap writeup tied to a proposed LoRA/fine-tune path
- Why this lane remains open: The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.
### 5. Infrastructure Hardening / Syntax Guard — MISSING
- Requirement: Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.
- Referenced issues: none listed in the directive body
- Repo evidence present: none
- Missing operator deliverables:
- repo inventory of Gitea targets that should carry Syntax Guard
- deployment verifier for hook presence across those repos
- operator report proving installation state instead of assuming it
- Why this lane remains open: No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.
## Highest-Leverage Next Actions
- Nostr Migration Leadership: wizard keypair inventory and ownership matrix
- Lexicon Enforcement: machine-checkable lexicon policy for review/triage
- v0.7.0 Feature Audit: Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
- Morrowind Local-First Benchmark: cloud-vs-local benchmark report for the combat loop
- Infrastructure Hardening / Syntax Guard: repo inventory of Gitea targets that should carry Syntax Guard
## Why #524 Remains Open
- The directive bundles five separate workstreams with different evidence surfaces.
- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.
- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.
- Syntax Guard verification is still undocumented and unproven inside this repo.

View File

@@ -1,406 +0,0 @@
#!/usr/bin/env python3
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
Refs: timmy-home #519
"""
from __future__ import annotations
import argparse
import json
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import parse, request
from base64 import b64encode
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
DEFAULT_CONFIG = {
"owner": DEFAULT_OWNER,
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def parse_iso8601(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def normalize_today(value: str | date | None = None) -> date:
if value is None:
return datetime.now(timezone.utc).date()
if isinstance(value, date):
return value
return date.fromisoformat(value)
def build_day_window(today: date, lookback_days: int) -> list[date]:
start = today - timedelta(days=lookback_days - 1)
return [start + timedelta(days=offset) for offset in range(lookback_days)]
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [item for item in items if not item.get("pull_request")]
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
days = build_day_window(today, lookback_days)
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
start_day = days[0]
for item in filter_issue_items(items):
created_at = parse_iso8601(item.get("created_at"))
if created_at is not None:
created_day = created_at.date()
if start_day <= created_day <= today:
counts[created_day.isoformat()]["opened"] += 1
closed_at = parse_iso8601(item.get("closed_at"))
if closed_at is not None:
closed_day = closed_at.date()
if start_day <= closed_day <= today:
counts[closed_day.isoformat()]["closed"] += 1
return [
{
"date": day.isoformat(),
"opened": counts[day.isoformat()]["opened"],
"closed": counts[day.isoformat()]["closed"],
}
for day in days
]
def summarize_velocity_alert(
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
) -> dict[str, Any]:
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
drop_ratio = float(config.get("drop_ratio", 0.6))
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": (
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
),
}
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
}
return {
"status": "ok",
"kind": "none",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity stable",
}
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
if days <= 0:
return 0
return sum(int(item[field]) for item in daily[-days:])
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
if baseline_days <= 0:
return 0
if recent_days <= 0:
return sum(int(item["closed"]) for item in daily[-baseline_days:])
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
return sum(int(item["closed"]) for item in baseline_slice)
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
report_day = normalize_today(today)
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
owner = config.get("owner", DEFAULT_OWNER)
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
lookback_days = int(config.get("lookback_days", 14))
alert_config = dict(DEFAULT_CONFIG["alert"])
alert_config.update(config.get("alert") or {})
recent_days = int(alert_config.get("recent_days", 7))
baseline_days = int(alert_config.get("baseline_days", 7))
repo_reports: list[dict[str, Any]] = []
total_open_now = 0
total_closed_last_7d = 0
repos_with_alerts: list[str] = []
for repo_name in repos:
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
daily = build_daily_series(recent_issues, report_day, lookback_days)
open_now = len(open_issues)
opened_last_7d = _sum_window(daily, "opened", recent_days)
closed_last_7d = _sum_window(daily, "closed", recent_days)
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
weekly_net = opened_last_7d - closed_last_7d
alert = summarize_velocity_alert(
recent_closed=closed_last_7d,
baseline_closed=baseline_closed,
open_now=open_now,
config=alert_config,
)
repo_report = {
"repo": repo_name,
"open_now": open_now,
"opened_last_7d": opened_last_7d,
"closed_last_7d": closed_last_7d,
"baseline_closed": baseline_closed,
"weekly_net": weekly_net,
"daily": daily,
"alert": alert,
}
repo_reports.append(repo_report)
total_open_now += open_now
total_closed_last_7d += closed_last_7d
if alert["status"] != "ok":
repos_with_alerts.append(repo_name)
return {
"owner": owner,
"generated_at": generated_at,
"generated_day": report_day.isoformat(),
"lookback_days": lookback_days,
"dashboard_contract_version": 1,
"repos": repo_reports,
"summary": {
"total_open_now": total_open_now,
"total_closed_last_7d": total_closed_last_7d,
"repos_with_alerts": repos_with_alerts,
},
}
def render_markdown(report: dict[str, Any]) -> str:
lines = [
"# Burn-down Velocity Tracking",
"",
f"Generated: {report['generated_at']}",
f"Owner: {report['owner']}",
f"Lookback days: {report['lookback_days']}",
"",
"## Per-repo velocity",
"",
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
"| --- | ---: | ---: | ---: | ---: | --- |",
]
for repo in report["repos"]:
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
lines.append(
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
)
lines.extend(
[
"",
"## Dashboard handoff for timmy-config",
"",
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
"- `open_now`",
"- `opened_last_7d`",
"- `closed_last_7d`",
"- `baseline_closed`",
"- `alert.status` / `alert.kind` / `alert.reason`",
"",
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
"",
"## Alerts",
"",
]
)
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
if not alerts:
lines.append("- none")
else:
for repo in alerts:
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
return "\n".join(lines) + "\n"
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
if history_path.exists():
history = json.loads(history_path.read_text(encoding="utf-8"))
else:
history = {"days": []}
entry = {
"date": report["generated_day"],
"generated_at": report["generated_at"],
"summary": report["summary"],
"repos": report["repos"],
}
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
retained.append(entry)
retained.sort(key=lambda item: item["date"])
history["days"] = retained
history_path.parent.mkdir(parents=True, exist_ok=True)
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
return history
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.base_url = base_url.rstrip("/")
def _headers(self) -> list[dict[str, str]]:
return [
{"Authorization": f"token {self.token}", "Accept": "application/json"},
{
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
"Accept": "application/json",
},
]
def _request_json(self, url: str) -> list[dict[str, Any]]:
last_error: Exception | None = None
for headers in self._headers():
try:
req = request.Request(url, headers=headers)
with request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except Exception as exc: # pragma: no cover - exercised only on live API failure
last_error = exc
if last_error is None: # pragma: no cover - defensive
raise RuntimeError("request failed without an exception")
raise last_error
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
issues: list[dict[str, Any]] = []
page = 1
while True:
query = {"state": state, "type": "issues", "limit": 100, "page": page}
if since:
query["since"] = since
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
batch = self._request_json(url)
if not batch:
break
issues.extend(filter_issue_items(batch))
page += 1
return issues
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def load_config(path: Path) -> dict[str, Any]:
config = dict(DEFAULT_CONFIG)
alert = dict(DEFAULT_CONFIG["alert"])
raw = load_json(path, {})
config.update(raw)
alert.update(raw.get("alert") or {})
config["alert"] = alert
return config
def collect_live_snapshot(
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
) -> dict[str, Any]:
token = token_file.read_text(encoding="utf-8").strip()
report_day = normalize_today(today)
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
repos = list(config.get("repos") or [])
repo_payload = {}
for repo in repos:
repo_payload[repo] = {
"open_issues": client.list_issues(repo, state="open"),
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
}
return {
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"repos": repo_payload,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config(args.config)
if args.snapshot_file:
snapshot = load_json(args.snapshot_file, {"repos": {}})
else:
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
report = build_velocity_report(config, snapshot, today=args.today)
args.output_json.parent.mkdir(parents=True, exist_ok=True)
args.output_md.parent.mkdir(parents=True, exist_ok=True)
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
args.output_md.write_text(render_markdown(report), encoding="utf-8")
if args.write_history:
update_history(args.history_file, report)
if args.json:
print(json.dumps(report, indent=2))
else:
print(render_markdown(report))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""Ground timmy-home #524 as an executable status report.
Refs: timmy-home #524
"""
from __future__ import annotations
import argparse
import json
from copy import deepcopy
from pathlib import Path
from typing import Any
from urllib import request
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_REPO = "timmy-home"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DOC_PATH = DEFAULT_REPO_ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
DIRECTIVE_TITLE = "[DIRECTIVE] Unified Fleet Sovereignty & Comms Migration"
DIRECTIVE_SUMMARY = (
"Issue #524 is a multi-lane directive, not a one-commit feature. "
"This report grounds the directive in repo evidence, highlights stale cross-links, "
"and names the missing operator bundles that still need real execution."
)
DEFAULT_REFERENCE_SNAPSHOT = {
388: {
"title": "[KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents",
"state": "closed",
},
103: {
"title": "Build comprehensive caching layer — cache everywhere",
"state": "open",
},
139: {
"title": "🐣 Allegro-Primus is born",
"state": "open",
},
813: {
"title": "docs: refresh the-playground genome analysis (#671)",
"state": "closed",
},
819: {
"title": "docs: verify #648 already implemented (closes #818)",
"state": "open",
},
}
WORKSTREAMS = [
{
"key": "nostr-migration",
"name": "Nostr Migration Leadership",
"requirement": "Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.",
"references": [813, 819],
"expected_keywords": ["nostr", "relay", "telegram", "comms", "messenger"],
"repo_evidence": [
{
"path": "infrastructure/timmy-bridge/client/timmy_client.py",
"description": "Nostr event client scaffold already exists",
},
{
"path": "infrastructure/timmy-bridge/monitor/timmy_monitor.py",
"description": "Nostr relay monitor already exists",
},
{
"path": "specs/wizard-telegram-bot-cutover.md",
"description": "Telegram cutover planning exists, so the migration lane is real",
},
],
"missing_deliverables": [
"wizard keypair inventory and ownership matrix",
"NIP-29 relay group verification report",
"operator runbook for cutting traffic off Telegram",
],
"why_open": "The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.",
},
{
"key": "lexicon-enforcement",
"name": "Lexicon Enforcement",
"requirement": "Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.",
"references": [388],
"expected_keywords": ["lexicon", "vocabulary", "standards", "shared vocabulary"],
"repo_evidence": [
{
"path": "docs/WIZARD_APPRENTICESHIP_CHARTER.md",
"description": "The repo already uses wizard-language canon in docs",
},
{
"path": "specs/timmy-ezra-bezalel-canon-sheet.md",
"description": "Canonical agent naming already exists",
},
{
"path": "docs/OPERATIONS_DASHBOARD.md",
"description": "Operational roles are already described in repo language",
},
],
"missing_deliverables": [
"machine-checkable lexicon policy for review/triage",
"terminology lint or reviewer checklist tied to the lexicon",
],
"why_open": "The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.",
},
{
"key": "feature-audit",
"name": "v0.7.0 Feature Audit",
"requirement": "Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.",
"references": [139],
"expected_keywords": ["hermes", "feature", "audit", "v0.7.0", "sovereignty"],
"repo_evidence": [
{
"path": "scripts/sovereignty_audit.py",
"description": "Cloud-vs-local audit machinery already exists",
},
{
"path": "reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md",
"description": "Recent sovereignty audit report is committed",
},
{
"path": "timmy-local/README.md",
"description": "Local-first status is already documented for operators",
},
],
"missing_deliverables": [
"Hermes v0.7.0 feature inventory linked to cloud-reduction leverage",
"Sovereignty Implementation Plan derived from that feature audit",
],
"why_open": "The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.",
},
{
"key": "morrowind-benchmark",
"name": "Morrowind Local-First Benchmark",
"requirement": "Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.",
"references": [103],
"expected_keywords": ["morrowind", "combat", "benchmark", "local", "cloud"],
"repo_evidence": [
{
"path": "morrowind/local_brain.py",
"description": "Local Morrowind control loop already exists",
},
{
"path": "morrowind/mcp_server.py",
"description": "Morrowind MCP control surface is already wired",
},
{
"path": "morrowind/pilot.py",
"description": "Trajectory logging for evaluation already exists",
},
],
"missing_deliverables": [
"cloud-vs-local benchmark report for the combat loop",
"reasoning-gap writeup tied to a proposed LoRA/fine-tune path",
],
"why_open": "The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.",
},
{
"key": "syntax-guard",
"name": "Infrastructure Hardening / Syntax Guard",
"requirement": "Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.",
"references": [],
"expected_keywords": [],
"repo_evidence": [],
"missing_deliverables": [
"repo inventory of Gitea targets that should carry Syntax Guard",
"deployment verifier for hook presence across those repos",
"operator report proving installation state instead of assuming it",
],
"why_open": "No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.",
},
]
def default_snapshot() -> dict[int, dict[str, str]]:
return deepcopy(DEFAULT_REFERENCE_SNAPSHOT)
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.repo = repo
self.base_url = base_url.rstrip("/")
def get_issue(self, issue_number: int) -> dict[str, Any]:
req = request.Request(
f"{self.base_url}/repos/{self.owner}/{self.repo}/issues/{issue_number}",
headers={"Authorization": f"token {self.token}", "Accept": "application/json"},
)
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def load_snapshot(path: Path | None = None) -> dict[int, dict[str, str]]:
if path is None:
return default_snapshot()
data = json.loads(path.read_text(encoding="utf-8"))
return {int(k): v for k, v in data.items()}
def refresh_snapshot(token_file: Path = DEFAULT_TOKEN_FILE) -> dict[int, dict[str, str]]:
token = token_file.read_text(encoding="utf-8").strip()
client = GiteaClient(token=token)
snapshot: dict[int, dict[str, str]] = {}
for issue_number in sorted(DEFAULT_REFERENCE_SNAPSHOT):
issue = client.get_issue(issue_number)
snapshot[issue_number] = {
"title": issue["title"],
"state": issue["state"],
}
return snapshot
def collect_repo_evidence(entries: list[dict[str, str]], repo_root: Path) -> tuple[list[str], list[str]]:
present: list[str] = []
missing: list[str] = []
for entry in entries:
label = f"`{entry['path']}` — {entry['description']}"
if (repo_root / entry["path"]).exists():
present.append(label)
else:
missing.append(label)
return present, missing
def evaluate_reference(issue_number: int, snapshot: dict[int, dict[str, str]], expected_keywords: list[str]) -> dict[str, Any]:
record = snapshot.get(issue_number, {"title": "missing from snapshot", "state": "unknown"})
title = record["title"]
title_lower = title.lower()
matched_keywords = [kw for kw in expected_keywords if kw.lower() in title_lower]
aligned = bool(matched_keywords) if expected_keywords else True
return {
"number": issue_number,
"title": title,
"state": record["state"],
"aligned": aligned,
"matched_keywords": matched_keywords,
}
def classify_workstream(reference_results: list[dict[str, Any]], evidence_present: list[str], missing_deliverables: list[str]) -> str:
has_drift = any(not item["aligned"] for item in reference_results)
if not evidence_present:
return "MISSING"
if has_drift or missing_deliverables:
return "PARTIAL"
return "GROUNDED"
def evaluate_directive(snapshot: dict[int, dict[str, str]] | None = None, repo_root: Path | None = None) -> dict[str, Any]:
snapshot = snapshot or default_snapshot()
repo_root = repo_root or DEFAULT_REPO_ROOT
workstreams: list[dict[str, Any]] = []
drift_items: list[str] = []
for lane in WORKSTREAMS:
reference_results = [
evaluate_reference(issue_number, snapshot, lane["expected_keywords"])
for issue_number in lane["references"]
]
present, missing = collect_repo_evidence(lane["repo_evidence"], repo_root)
for item in reference_results:
if not item["aligned"]:
drift_items.append(
f"#{item['number']} is cited for {lane['name']}, but its current title is '{item['title']}'."
)
workstream = {
"key": lane["key"],
"name": lane["name"],
"requirement": lane["requirement"],
"reference_results": reference_results,
"repo_evidence_present": present,
"repo_evidence_missing": missing,
"missing_deliverables": list(lane["missing_deliverables"]),
"why_open": lane["why_open"],
}
workstream["status"] = classify_workstream(
reference_results=reference_results,
evidence_present=present,
missing_deliverables=workstream["missing_deliverables"],
)
workstreams.append(workstream)
next_actions: list[str] = []
for workstream in workstreams:
if workstream["missing_deliverables"]:
next_actions.append(f"{workstream['name']}: {workstream['missing_deliverables'][0]}")
return {
"issue_number": 524,
"title": DIRECTIVE_TITLE,
"summary": DIRECTIVE_SUMMARY,
"reference_snapshot": {str(k): v for k, v in sorted(snapshot.items())},
"workstreams": workstreams,
"reference_drift": drift_items,
"grounded_workstreams": sum(1 for item in workstreams if item["status"] == "GROUNDED"),
"partial_workstreams": sum(1 for item in workstreams if item["status"] == "PARTIAL"),
"missing_workstreams": sum(1 for item in workstreams if item["status"] == "MISSING"),
"next_actions": next_actions,
}
def render_markdown(result: dict[str, Any]) -> str:
lines = [
f"# {result['title']}",
"",
"Grounding report for `timmy-home #524`.",
"",
result["summary"],
"",
"This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.",
"",
"## Directive Snapshot",
"",
f"- Repo-grounded workstreams: {result['grounded_workstreams']}",
f"- Partial workstreams: {result['partial_workstreams']}",
f"- Missing workstreams: {result['missing_workstreams']}",
f"- Drifted references: {len(result['reference_drift'])}",
"",
"## Reference Drift",
"",
]
if result["reference_drift"]:
lines.extend(f"- {item}" for item in result["reference_drift"])
else:
lines.append("- No stale cross-links detected in the directive snapshot.")
lines.extend(["", "## Workstream Matrix", ""])
for index, workstream in enumerate(result["workstreams"], start=1):
lines.extend(
[
f"### {index}. {workstream['name']}{workstream['status']}",
"",
f"- Requirement: {workstream['requirement']}",
]
)
if workstream["reference_results"]:
lines.append("- Referenced issues:")
for ref in workstream["reference_results"]:
alignment = "aligned" if ref["aligned"] else "DRIFT"
lines.append(
f" - #{ref['number']} ({ref['state']}) — {ref['title']} [{alignment}]"
)
else:
lines.append("- Referenced issues: none listed in the directive body")
if workstream["repo_evidence_present"]:
lines.append("- Repo evidence present:")
lines.extend(f" - {item}" for item in workstream["repo_evidence_present"])
else:
lines.append("- Repo evidence present: none")
if workstream["repo_evidence_missing"]:
lines.append("- Repo evidence expected but missing:")
lines.extend(f" - {item}" for item in workstream["repo_evidence_missing"])
if workstream["missing_deliverables"]:
lines.append("- Missing operator deliverables:")
lines.extend(f" - {item}" for item in workstream["missing_deliverables"])
else:
lines.append("- Missing operator deliverables: none")
lines.append(f"- Why this lane remains open: {workstream['why_open']}")
lines.append("")
lines.extend(["## Highest-Leverage Next Actions", ""])
lines.extend(f"- {item}" for item in result["next_actions"])
lines.extend(
[
"",
"## Why #524 Remains Open",
"",
"- The directive bundles five separate workstreams with different evidence surfaces.",
"- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.",
"- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.",
"- Syntax Guard verification is still undocumented and unproven inside this repo.",
]
)
return "\n".join(lines).rstrip() + "\n"
def main() -> None:
parser = argparse.ArgumentParser(description="Render the unified fleet sovereignty status report for issue #524")
parser.add_argument("--snapshot", help="Optional JSON snapshot file overriding the default issue-title/state snapshot")
parser.add_argument("--live", action="store_true", help="Refresh the issue snapshot from Gitea before rendering")
parser.add_argument("--token-file", default=str(DEFAULT_TOKEN_FILE), help="Token file used with --live")
parser.add_argument("--output", help="Optional path to write the rendered report")
parser.add_argument("--json", action="store_true", help="Print computed JSON instead of markdown")
args = parser.parse_args()
if args.live:
snapshot = refresh_snapshot(Path(args.token_file).expanduser())
else:
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
result = evaluate_directive(snapshot=snapshot, repo_root=DEFAULT_REPO_ROOT)
rendered = json.dumps(result, indent=2) if args.json else render_markdown(result)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"Directive status written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -1,176 +0,0 @@
from __future__ import annotations
import json
import subprocess
import sys
from datetime import date
from pathlib import Path
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
ROOT = Path(__file__).resolve().parent.parent
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
SNAPSHOT = {
"generated_at": "2026-04-22T12:00:00Z",
"repos": {
"timmy-home": {
"open_issues": [
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
],
"recent_issues": [
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
],
},
"timmy-config": {
"open_issues": [
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
],
"recent_issues": [
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
],
},
},
}
CONFIG = {
"owner": "Timmy_Foundation",
"repos": ["timmy-home", "timmy-config"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
assert report["generated_day"] == "2026-04-22"
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
assert report["summary"]["total_open_now"] == 3
home = report["repos"][0]
assert home["repo"] == "timmy-home"
assert home["open_now"] == 2
assert home["opened_last_7d"] == 5
assert home["closed_last_7d"] == 3
assert home["baseline_closed"] == 7
assert home["weekly_net"] == 2
assert home["alert"]["status"] == "drop"
assert home["alert"]["recent_closed"] == 3
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
timmy_config = report["repos"][1]
assert timmy_config["repo"] == "timmy-config"
assert timmy_config["open_now"] == 1
assert timmy_config["closed_last_7d"] == 1
assert timmy_config["alert"]["status"] == "ok"
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
rendered = render_markdown(report)
for snippet in (
"# Burn-down Velocity Tracking",
"## Per-repo velocity",
"timmy-home",
"timmy-config",
"## Dashboard handoff for timmy-config",
"velocity_drop",
"## Alerts",
):
assert snippet in rendered
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
history_path = tmp_path / "burn-velocity-history.json"
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
update_history(history_path, report)
updated = json.loads(json.dumps(report))
updated["repos"][0]["open_now"] = 9
updated["summary"]["total_open_now"] = 10
update_history(history_path, updated)
history = json.loads(history_path.read_text(encoding="utf-8"))
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
assert history["days"][0]["summary"]["total_open_now"] == 10
assert history["days"][0]["repos"][0]["open_now"] == 9
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
snapshot_path = tmp_path / "snapshot.json"
output_json = tmp_path / "latest.json"
output_md = tmp_path / "latest.md"
history_path = tmp_path / "history.json"
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.burn_velocity_tracker",
"--snapshot-file",
str(snapshot_path),
"--today",
"2026-04-22",
"--output-json",
str(output_json),
"--output-md",
str(output_md),
"--history-file",
str(history_path),
"--write-history",
"--json",
],
check=True,
cwd=ROOT,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
assert output_json.exists()
assert output_md.exists()
assert history_path.exists()
assert "timmy-config" in output_md.read_text(encoding="utf-8")
def test_repo_contains_burn_velocity_tracking_doc() -> None:
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Burn-down Velocity Tracking",
"python3 scripts/burn_velocity_tracker.py",
"configs/burn_velocity_repos.json",
"~/.timmy/burn-velocity/latest.json",
"timmy-config dashboard",
"type=issues",
"velocity_drop",
]
for snippet in required:
assert snippet in text

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "unified_fleet_sovereignty_status.py"
DOC_PATH = ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
def _load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _workstream(result: dict, key: str) -> dict:
for workstream in result["workstreams"]:
if workstream["key"] == key:
return workstream
raise AssertionError(f"missing workstream {key}")
def test_evaluate_directive_flags_reference_drift_without_faking_completion() -> None:
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
assert len(result["reference_drift"]) == 4
assert any("#813" in item for item in result["reference_drift"])
assert any("#103" in item for item in result["reference_drift"])
nostr = _workstream(result, "nostr-migration")
assert nostr["status"] == "PARTIAL"
assert any("timmy_client.py" in item for item in nostr["repo_evidence_present"])
lexicon = _workstream(result, "lexicon-enforcement")
assert all(item["aligned"] for item in lexicon["reference_results"])
assert lexicon["status"] == "PARTIAL"
syntax_guard = _workstream(result, "syntax-guard")
assert syntax_guard["status"] == "MISSING"
assert any("deployment verifier" in item for item in syntax_guard["missing_deliverables"])
def test_render_markdown_includes_required_sections_and_grounding_evidence() -> None:
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
report = mod.render_markdown(result)
for snippet in (
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
"## Directive Snapshot",
"## Reference Drift",
"## Workstream Matrix",
"### 5. Infrastructure Hardening / Syntax Guard — MISSING",
"`infrastructure/timmy-bridge/client/timmy_client.py`",
"machine-checkable lexicon policy for review/triage",
"## Why #524 Remains Open",
):
assert snippet in report
def test_repo_contains_committed_issue_524_grounding_doc() -> None:
assert DOC_PATH.exists(), "missing committed directive grounding doc"
text = DOC_PATH.read_text(encoding="utf-8")
for snippet in (
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
"## Reference Drift",
"## Workstream Matrix",
"## Highest-Leverage Next Actions",
"## Why #524 Remains Open",
):
assert snippet in text