Compare commits
2 Commits
step35/498
...
fix/524
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9339b1a2d4 | ||
|
|
a6ec2055cc |
20
SOUL.md
20
SOUL.md
@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
|
||||
|
||||
---
|
||||
|
||||
## What Honesty Requires
|
||||
|
||||
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
|
||||
|
||||
The following are not optional. Any implementation that calls itself Timmy must build them.
|
||||
|
||||
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
|
||||
|
||||
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
|
||||
|
||||
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
|
||||
|
||||
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
|
||||
|
||||
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
|
||||
|
||||
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
|
||||
|
||||
---
|
||||
|
||||
## For the Audio Overview
|
||||
|
||||
If you are listening to this as a NotebookLM Audio Overview:
|
||||
|
||||
107
docs/UNIFIED_FLEET_SOVEREIGNTY_STATUS.md
Normal file
107
docs/UNIFIED_FLEET_SOVEREIGNTY_STATUS.md
Normal file
@@ -0,0 +1,107 @@
|
||||
# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration
|
||||
|
||||
Grounding report for `timmy-home #524`.
|
||||
|
||||
Issue #524 is a multi-lane directive, not a one-commit feature. This report grounds the directive in repo evidence, highlights stale cross-links, and names the missing operator bundles that still need real execution.
|
||||
|
||||
This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.
|
||||
|
||||
## Directive Snapshot
|
||||
|
||||
- Repo-grounded workstreams: 0
|
||||
- Partial workstreams: 4
|
||||
- Missing workstreams: 1
|
||||
- Drifted references: 4
|
||||
|
||||
## Reference Drift
|
||||
|
||||
- #813 is cited for Nostr Migration Leadership, but its current title is 'docs: refresh the-playground genome analysis (#671)'.
|
||||
- #819 is cited for Nostr Migration Leadership, but its current title is 'docs: verify #648 already implemented (closes #818)'.
|
||||
- #139 is cited for v0.7.0 Feature Audit, but its current title is '🐣 Allegro-Primus is born'.
|
||||
- #103 is cited for Morrowind Local-First Benchmark, but its current title is 'Build comprehensive caching layer — cache everywhere'.
|
||||
|
||||
## Workstream Matrix
|
||||
|
||||
### 1. Nostr Migration Leadership — PARTIAL
|
||||
|
||||
- Requirement: Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.
|
||||
- Referenced issues:
|
||||
- #813 (closed) — docs: refresh the-playground genome analysis (#671) [DRIFT]
|
||||
- #819 (open) — docs: verify #648 already implemented (closes #818) [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `infrastructure/timmy-bridge/client/timmy_client.py` — Nostr event client scaffold already exists
|
||||
- `infrastructure/timmy-bridge/monitor/timmy_monitor.py` — Nostr relay monitor already exists
|
||||
- `specs/wizard-telegram-bot-cutover.md` — Telegram cutover planning exists, so the migration lane is real
|
||||
- Missing operator deliverables:
|
||||
- wizard keypair inventory and ownership matrix
|
||||
- NIP-29 relay group verification report
|
||||
- operator runbook for cutting traffic off Telegram
|
||||
- Why this lane remains open: The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.
|
||||
|
||||
### 2. Lexicon Enforcement — PARTIAL
|
||||
|
||||
- Requirement: Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.
|
||||
- Referenced issues:
|
||||
- #388 (closed) — [KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents [aligned]
|
||||
- Repo evidence present:
|
||||
- `docs/WIZARD_APPRENTICESHIP_CHARTER.md` — The repo already uses wizard-language canon in docs
|
||||
- `specs/timmy-ezra-bezalel-canon-sheet.md` — Canonical agent naming already exists
|
||||
- `docs/OPERATIONS_DASHBOARD.md` — Operational roles are already described in repo language
|
||||
- Missing operator deliverables:
|
||||
- machine-checkable lexicon policy for review/triage
|
||||
- terminology lint or reviewer checklist tied to the lexicon
|
||||
- Why this lane remains open: The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.
|
||||
|
||||
### 3. v0.7.0 Feature Audit — PARTIAL
|
||||
|
||||
- Requirement: Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.
|
||||
- Referenced issues:
|
||||
- #139 (open) — 🐣 Allegro-Primus is born [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `scripts/sovereignty_audit.py` — Cloud-vs-local audit machinery already exists
|
||||
- `reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md` — Recent sovereignty audit report is committed
|
||||
- `timmy-local/README.md` — Local-first status is already documented for operators
|
||||
- Missing operator deliverables:
|
||||
- Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
|
||||
- Sovereignty Implementation Plan derived from that feature audit
|
||||
- Why this lane remains open: The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.
|
||||
|
||||
### 4. Morrowind Local-First Benchmark — PARTIAL
|
||||
|
||||
- Requirement: Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.
|
||||
- Referenced issues:
|
||||
- #103 (open) — Build comprehensive caching layer — cache everywhere [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `morrowind/local_brain.py` — Local Morrowind control loop already exists
|
||||
- `morrowind/mcp_server.py` — Morrowind MCP control surface is already wired
|
||||
- `morrowind/pilot.py` — Trajectory logging for evaluation already exists
|
||||
- Missing operator deliverables:
|
||||
- cloud-vs-local benchmark report for the combat loop
|
||||
- reasoning-gap writeup tied to a proposed LoRA/fine-tune path
|
||||
- Why this lane remains open: The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.
|
||||
|
||||
### 5. Infrastructure Hardening / Syntax Guard — MISSING
|
||||
|
||||
- Requirement: Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.
|
||||
- Referenced issues: none listed in the directive body
|
||||
- Repo evidence present: none
|
||||
- Missing operator deliverables:
|
||||
- repo inventory of Gitea targets that should carry Syntax Guard
|
||||
- deployment verifier for hook presence across those repos
|
||||
- operator report proving installation state instead of assuming it
|
||||
- Why this lane remains open: No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.
|
||||
|
||||
## Highest-Leverage Next Actions
|
||||
|
||||
- Nostr Migration Leadership: wizard keypair inventory and ownership matrix
|
||||
- Lexicon Enforcement: machine-checkable lexicon policy for review/triage
|
||||
- v0.7.0 Feature Audit: Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
|
||||
- Morrowind Local-First Benchmark: cloud-vs-local benchmark report for the combat loop
|
||||
- Infrastructure Hardening / Syntax Guard: repo inventory of Gitea targets that should carry Syntax Guard
|
||||
|
||||
## Why #524 Remains Open
|
||||
|
||||
- The directive bundles five separate workstreams with different evidence surfaces.
|
||||
- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.
|
||||
- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.
|
||||
- Syntax Guard verification is still undocumented and unproven inside this repo.
|
||||
418
scripts/unified_fleet_sovereignty_status.py
Normal file
418
scripts/unified_fleet_sovereignty_status.py
Normal file
@@ -0,0 +1,418 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Ground timmy-home #524 as an executable status report.
|
||||
|
||||
Refs: timmy-home #524
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import request
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_REPO = "timmy-home"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
DEFAULT_DOC_PATH = DEFAULT_REPO_ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
|
||||
|
||||
DIRECTIVE_TITLE = "[DIRECTIVE] Unified Fleet Sovereignty & Comms Migration"
|
||||
DIRECTIVE_SUMMARY = (
|
||||
"Issue #524 is a multi-lane directive, not a one-commit feature. "
|
||||
"This report grounds the directive in repo evidence, highlights stale cross-links, "
|
||||
"and names the missing operator bundles that still need real execution."
|
||||
)
|
||||
|
||||
DEFAULT_REFERENCE_SNAPSHOT = {
|
||||
388: {
|
||||
"title": "[KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents",
|
||||
"state": "closed",
|
||||
},
|
||||
103: {
|
||||
"title": "Build comprehensive caching layer — cache everywhere",
|
||||
"state": "open",
|
||||
},
|
||||
139: {
|
||||
"title": "🐣 Allegro-Primus is born",
|
||||
"state": "open",
|
||||
},
|
||||
813: {
|
||||
"title": "docs: refresh the-playground genome analysis (#671)",
|
||||
"state": "closed",
|
||||
},
|
||||
819: {
|
||||
"title": "docs: verify #648 already implemented (closes #818)",
|
||||
"state": "open",
|
||||
},
|
||||
}
|
||||
|
||||
WORKSTREAMS = [
|
||||
{
|
||||
"key": "nostr-migration",
|
||||
"name": "Nostr Migration Leadership",
|
||||
"requirement": "Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.",
|
||||
"references": [813, 819],
|
||||
"expected_keywords": ["nostr", "relay", "telegram", "comms", "messenger"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "infrastructure/timmy-bridge/client/timmy_client.py",
|
||||
"description": "Nostr event client scaffold already exists",
|
||||
},
|
||||
{
|
||||
"path": "infrastructure/timmy-bridge/monitor/timmy_monitor.py",
|
||||
"description": "Nostr relay monitor already exists",
|
||||
},
|
||||
{
|
||||
"path": "specs/wizard-telegram-bot-cutover.md",
|
||||
"description": "Telegram cutover planning exists, so the migration lane is real",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"wizard keypair inventory and ownership matrix",
|
||||
"NIP-29 relay group verification report",
|
||||
"operator runbook for cutting traffic off Telegram",
|
||||
],
|
||||
"why_open": "The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.",
|
||||
},
|
||||
{
|
||||
"key": "lexicon-enforcement",
|
||||
"name": "Lexicon Enforcement",
|
||||
"requirement": "Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.",
|
||||
"references": [388],
|
||||
"expected_keywords": ["lexicon", "vocabulary", "standards", "shared vocabulary"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "docs/WIZARD_APPRENTICESHIP_CHARTER.md",
|
||||
"description": "The repo already uses wizard-language canon in docs",
|
||||
},
|
||||
{
|
||||
"path": "specs/timmy-ezra-bezalel-canon-sheet.md",
|
||||
"description": "Canonical agent naming already exists",
|
||||
},
|
||||
{
|
||||
"path": "docs/OPERATIONS_DASHBOARD.md",
|
||||
"description": "Operational roles are already described in repo language",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"machine-checkable lexicon policy for review/triage",
|
||||
"terminology lint or reviewer checklist tied to the lexicon",
|
||||
],
|
||||
"why_open": "The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.",
|
||||
},
|
||||
{
|
||||
"key": "feature-audit",
|
||||
"name": "v0.7.0 Feature Audit",
|
||||
"requirement": "Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.",
|
||||
"references": [139],
|
||||
"expected_keywords": ["hermes", "feature", "audit", "v0.7.0", "sovereignty"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "scripts/sovereignty_audit.py",
|
||||
"description": "Cloud-vs-local audit machinery already exists",
|
||||
},
|
||||
{
|
||||
"path": "reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md",
|
||||
"description": "Recent sovereignty audit report is committed",
|
||||
},
|
||||
{
|
||||
"path": "timmy-local/README.md",
|
||||
"description": "Local-first status is already documented for operators",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"Hermes v0.7.0 feature inventory linked to cloud-reduction leverage",
|
||||
"Sovereignty Implementation Plan derived from that feature audit",
|
||||
],
|
||||
"why_open": "The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.",
|
||||
},
|
||||
{
|
||||
"key": "morrowind-benchmark",
|
||||
"name": "Morrowind Local-First Benchmark",
|
||||
"requirement": "Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.",
|
||||
"references": [103],
|
||||
"expected_keywords": ["morrowind", "combat", "benchmark", "local", "cloud"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "morrowind/local_brain.py",
|
||||
"description": "Local Morrowind control loop already exists",
|
||||
},
|
||||
{
|
||||
"path": "morrowind/mcp_server.py",
|
||||
"description": "Morrowind MCP control surface is already wired",
|
||||
},
|
||||
{
|
||||
"path": "morrowind/pilot.py",
|
||||
"description": "Trajectory logging for evaluation already exists",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"cloud-vs-local benchmark report for the combat loop",
|
||||
"reasoning-gap writeup tied to a proposed LoRA/fine-tune path",
|
||||
],
|
||||
"why_open": "The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.",
|
||||
},
|
||||
{
|
||||
"key": "syntax-guard",
|
||||
"name": "Infrastructure Hardening / Syntax Guard",
|
||||
"requirement": "Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.",
|
||||
"references": [],
|
||||
"expected_keywords": [],
|
||||
"repo_evidence": [],
|
||||
"missing_deliverables": [
|
||||
"repo inventory of Gitea targets that should carry Syntax Guard",
|
||||
"deployment verifier for hook presence across those repos",
|
||||
"operator report proving installation state instead of assuming it",
|
||||
],
|
||||
"why_open": "No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def default_snapshot() -> dict[int, dict[str, str]]:
|
||||
return deepcopy(DEFAULT_REFERENCE_SNAPSHOT)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def get_issue(self, issue_number: int) -> dict[str, Any]:
|
||||
req = request.Request(
|
||||
f"{self.base_url}/repos/{self.owner}/{self.repo}/issues/{issue_number}",
|
||||
headers={"Authorization": f"token {self.token}", "Accept": "application/json"},
|
||||
)
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def load_snapshot(path: Path | None = None) -> dict[int, dict[str, str]]:
|
||||
if path is None:
|
||||
return default_snapshot()
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
return {int(k): v for k, v in data.items()}
|
||||
|
||||
|
||||
def refresh_snapshot(token_file: Path = DEFAULT_TOKEN_FILE) -> dict[int, dict[str, str]]:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
client = GiteaClient(token=token)
|
||||
snapshot: dict[int, dict[str, str]] = {}
|
||||
for issue_number in sorted(DEFAULT_REFERENCE_SNAPSHOT):
|
||||
issue = client.get_issue(issue_number)
|
||||
snapshot[issue_number] = {
|
||||
"title": issue["title"],
|
||||
"state": issue["state"],
|
||||
}
|
||||
return snapshot
|
||||
|
||||
|
||||
def collect_repo_evidence(entries: list[dict[str, str]], repo_root: Path) -> tuple[list[str], list[str]]:
|
||||
present: list[str] = []
|
||||
missing: list[str] = []
|
||||
for entry in entries:
|
||||
label = f"`{entry['path']}` — {entry['description']}"
|
||||
if (repo_root / entry["path"]).exists():
|
||||
present.append(label)
|
||||
else:
|
||||
missing.append(label)
|
||||
return present, missing
|
||||
|
||||
|
||||
|
||||
def evaluate_reference(issue_number: int, snapshot: dict[int, dict[str, str]], expected_keywords: list[str]) -> dict[str, Any]:
|
||||
record = snapshot.get(issue_number, {"title": "missing from snapshot", "state": "unknown"})
|
||||
title = record["title"]
|
||||
title_lower = title.lower()
|
||||
matched_keywords = [kw for kw in expected_keywords if kw.lower() in title_lower]
|
||||
aligned = bool(matched_keywords) if expected_keywords else True
|
||||
return {
|
||||
"number": issue_number,
|
||||
"title": title,
|
||||
"state": record["state"],
|
||||
"aligned": aligned,
|
||||
"matched_keywords": matched_keywords,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def classify_workstream(reference_results: list[dict[str, Any]], evidence_present: list[str], missing_deliverables: list[str]) -> str:
|
||||
has_drift = any(not item["aligned"] for item in reference_results)
|
||||
if not evidence_present:
|
||||
return "MISSING"
|
||||
if has_drift or missing_deliverables:
|
||||
return "PARTIAL"
|
||||
return "GROUNDED"
|
||||
|
||||
|
||||
|
||||
def evaluate_directive(snapshot: dict[int, dict[str, str]] | None = None, repo_root: Path | None = None) -> dict[str, Any]:
|
||||
snapshot = snapshot or default_snapshot()
|
||||
repo_root = repo_root or DEFAULT_REPO_ROOT
|
||||
workstreams: list[dict[str, Any]] = []
|
||||
drift_items: list[str] = []
|
||||
|
||||
for lane in WORKSTREAMS:
|
||||
reference_results = [
|
||||
evaluate_reference(issue_number, snapshot, lane["expected_keywords"])
|
||||
for issue_number in lane["references"]
|
||||
]
|
||||
present, missing = collect_repo_evidence(lane["repo_evidence"], repo_root)
|
||||
for item in reference_results:
|
||||
if not item["aligned"]:
|
||||
drift_items.append(
|
||||
f"#{item['number']} is cited for {lane['name']}, but its current title is '{item['title']}'."
|
||||
)
|
||||
workstream = {
|
||||
"key": lane["key"],
|
||||
"name": lane["name"],
|
||||
"requirement": lane["requirement"],
|
||||
"reference_results": reference_results,
|
||||
"repo_evidence_present": present,
|
||||
"repo_evidence_missing": missing,
|
||||
"missing_deliverables": list(lane["missing_deliverables"]),
|
||||
"why_open": lane["why_open"],
|
||||
}
|
||||
workstream["status"] = classify_workstream(
|
||||
reference_results=reference_results,
|
||||
evidence_present=present,
|
||||
missing_deliverables=workstream["missing_deliverables"],
|
||||
)
|
||||
workstreams.append(workstream)
|
||||
|
||||
next_actions: list[str] = []
|
||||
for workstream in workstreams:
|
||||
if workstream["missing_deliverables"]:
|
||||
next_actions.append(f"{workstream['name']}: {workstream['missing_deliverables'][0]}")
|
||||
|
||||
return {
|
||||
"issue_number": 524,
|
||||
"title": DIRECTIVE_TITLE,
|
||||
"summary": DIRECTIVE_SUMMARY,
|
||||
"reference_snapshot": {str(k): v for k, v in sorted(snapshot.items())},
|
||||
"workstreams": workstreams,
|
||||
"reference_drift": drift_items,
|
||||
"grounded_workstreams": sum(1 for item in workstreams if item["status"] == "GROUNDED"),
|
||||
"partial_workstreams": sum(1 for item in workstreams if item["status"] == "PARTIAL"),
|
||||
"missing_workstreams": sum(1 for item in workstreams if item["status"] == "MISSING"),
|
||||
"next_actions": next_actions,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def render_markdown(result: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
f"# {result['title']}",
|
||||
"",
|
||||
"Grounding report for `timmy-home #524`.",
|
||||
"",
|
||||
result["summary"],
|
||||
"",
|
||||
"This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.",
|
||||
"",
|
||||
"## Directive Snapshot",
|
||||
"",
|
||||
f"- Repo-grounded workstreams: {result['grounded_workstreams']}",
|
||||
f"- Partial workstreams: {result['partial_workstreams']}",
|
||||
f"- Missing workstreams: {result['missing_workstreams']}",
|
||||
f"- Drifted references: {len(result['reference_drift'])}",
|
||||
"",
|
||||
"## Reference Drift",
|
||||
"",
|
||||
]
|
||||
if result["reference_drift"]:
|
||||
lines.extend(f"- {item}" for item in result["reference_drift"])
|
||||
else:
|
||||
lines.append("- No stale cross-links detected in the directive snapshot.")
|
||||
|
||||
lines.extend(["", "## Workstream Matrix", ""])
|
||||
for index, workstream in enumerate(result["workstreams"], start=1):
|
||||
lines.extend(
|
||||
[
|
||||
f"### {index}. {workstream['name']} — {workstream['status']}",
|
||||
"",
|
||||
f"- Requirement: {workstream['requirement']}",
|
||||
]
|
||||
)
|
||||
if workstream["reference_results"]:
|
||||
lines.append("- Referenced issues:")
|
||||
for ref in workstream["reference_results"]:
|
||||
alignment = "aligned" if ref["aligned"] else "DRIFT"
|
||||
lines.append(
|
||||
f" - #{ref['number']} ({ref['state']}) — {ref['title']} [{alignment}]"
|
||||
)
|
||||
else:
|
||||
lines.append("- Referenced issues: none listed in the directive body")
|
||||
|
||||
if workstream["repo_evidence_present"]:
|
||||
lines.append("- Repo evidence present:")
|
||||
lines.extend(f" - {item}" for item in workstream["repo_evidence_present"])
|
||||
else:
|
||||
lines.append("- Repo evidence present: none")
|
||||
|
||||
if workstream["repo_evidence_missing"]:
|
||||
lines.append("- Repo evidence expected but missing:")
|
||||
lines.extend(f" - {item}" for item in workstream["repo_evidence_missing"])
|
||||
|
||||
if workstream["missing_deliverables"]:
|
||||
lines.append("- Missing operator deliverables:")
|
||||
lines.extend(f" - {item}" for item in workstream["missing_deliverables"])
|
||||
else:
|
||||
lines.append("- Missing operator deliverables: none")
|
||||
|
||||
lines.append(f"- Why this lane remains open: {workstream['why_open']}")
|
||||
lines.append("")
|
||||
|
||||
lines.extend(["## Highest-Leverage Next Actions", ""])
|
||||
lines.extend(f"- {item}" for item in result["next_actions"])
|
||||
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"## Why #524 Remains Open",
|
||||
"",
|
||||
"- The directive bundles five separate workstreams with different evidence surfaces.",
|
||||
"- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.",
|
||||
"- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.",
|
||||
"- Syntax Guard verification is still undocumented and unproven inside this repo.",
|
||||
]
|
||||
)
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Render the unified fleet sovereignty status report for issue #524")
|
||||
parser.add_argument("--snapshot", help="Optional JSON snapshot file overriding the default issue-title/state snapshot")
|
||||
parser.add_argument("--live", action="store_true", help="Refresh the issue snapshot from Gitea before rendering")
|
||||
parser.add_argument("--token-file", default=str(DEFAULT_TOKEN_FILE), help="Token file used with --live")
|
||||
parser.add_argument("--output", help="Optional path to write the rendered report")
|
||||
parser.add_argument("--json", action="store_true", help="Print computed JSON instead of markdown")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.live:
|
||||
snapshot = refresh_snapshot(Path(args.token_file).expanduser())
|
||||
else:
|
||||
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
|
||||
|
||||
result = evaluate_directive(snapshot=snapshot, repo_root=DEFAULT_REPO_ROOT)
|
||||
rendered = json.dumps(result, indent=2) if args.json else render_markdown(result)
|
||||
|
||||
if args.output:
|
||||
output_path = Path(args.output).expanduser()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(rendered, encoding="utf-8")
|
||||
print(f"Directive status written to {output_path}")
|
||||
else:
|
||||
print(rendered)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,12 +1 @@
|
||||
# Timmy core module
|
||||
|
||||
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
|
||||
from .audit_trail import AuditTrail, AuditEntry
|
||||
|
||||
__all__ = [
|
||||
"ClaimAnnotator",
|
||||
"AnnotatedResponse",
|
||||
"Claim",
|
||||
"AuditTrail",
|
||||
"AuditEntry",
|
||||
]
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Response Claim Annotator — Source Distinction System
|
||||
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
|
||||
a verified source I can point to, or my own pattern-matching. My user must be
|
||||
able to tell which is which."
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Claim:
|
||||
"""A single claim in a response, annotated with source type."""
|
||||
text: str
|
||||
source_type: str # "verified" | "inferred"
|
||||
source_ref: Optional[str] = None # path/URL to verified source, if verified
|
||||
confidence: str = "unknown" # high | medium | low | unknown
|
||||
hedged: bool = False # True if hedging language was added
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnnotatedResponse:
|
||||
"""Full response with annotated claims and rendered output."""
|
||||
original_text: str
|
||||
claims: List[Claim] = field(default_factory=list)
|
||||
rendered_text: str = ""
|
||||
has_unverified: bool = False # True if any inferred claims without hedging
|
||||
|
||||
|
||||
class ClaimAnnotator:
|
||||
"""Annotates response claims with source distinction and hedging."""
|
||||
|
||||
# Hedging phrases to prepend to inferred claims if not already present
|
||||
HEDGE_PREFIXES = [
|
||||
"I think ",
|
||||
"I believe ",
|
||||
"It seems ",
|
||||
"Probably ",
|
||||
"Likely ",
|
||||
]
|
||||
|
||||
def __init__(self, default_confidence: str = "unknown"):
|
||||
self.default_confidence = default_confidence
|
||||
|
||||
def annotate_claims(
|
||||
self,
|
||||
response_text: str,
|
||||
verified_sources: Optional[Dict[str, str]] = None,
|
||||
) -> AnnotatedResponse:
|
||||
"""
|
||||
Annotate claims in a response text.
|
||||
|
||||
Args:
|
||||
response_text: Raw response from the model
|
||||
verified_sources: Dict mapping claim substrings to source references
|
||||
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
|
||||
Returns:
|
||||
AnnotatedResponse with claims marked and rendered text
|
||||
"""
|
||||
verified_sources = verified_sources or {}
|
||||
claims = []
|
||||
has_unverified = False
|
||||
|
||||
# Simple sentence splitting (naive, but sufficient for MVP)
|
||||
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
|
||||
|
||||
for sent in sentences:
|
||||
# Check if sentence is a claim we can verify
|
||||
matched_source = None
|
||||
for claim_substr, source_ref in verified_sources.items():
|
||||
if claim_substr.lower() in sent.lower():
|
||||
matched_source = source_ref
|
||||
break
|
||||
|
||||
if matched_source:
|
||||
# Verified claim
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="verified",
|
||||
source_ref=matched_source,
|
||||
confidence="high",
|
||||
hedged=False,
|
||||
)
|
||||
else:
|
||||
# Inferred claim (pattern-matched)
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="inferred",
|
||||
confidence=self.default_confidence,
|
||||
hedged=self._has_hedge(sent),
|
||||
)
|
||||
if not claim.hedged:
|
||||
has_unverified = True
|
||||
|
||||
claims.append(claim)
|
||||
|
||||
# Render the annotated response
|
||||
rendered = self._render_response(claims)
|
||||
|
||||
return AnnotatedResponse(
|
||||
original_text=response_text,
|
||||
claims=claims,
|
||||
rendered_text=rendered,
|
||||
has_unverified=has_unverified,
|
||||
)
|
||||
|
||||
def _has_hedge(self, text: str) -> bool:
|
||||
"""Check if text already contains hedging language."""
|
||||
text_lower = text.lower()
|
||||
for prefix in self.HEDGE_PREFIXES:
|
||||
if text_lower.startswith(prefix.lower()):
|
||||
return True
|
||||
# Also check for inline hedges
|
||||
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
|
||||
return any(word in text_lower for word in hedge_words)
|
||||
|
||||
def _render_response(self, claims: List[Claim]) -> str:
|
||||
"""
|
||||
Render response with source distinction markers.
|
||||
|
||||
Verified claims: [V] claim text [source: ref]
|
||||
Inferred claims: [I] claim text (or with hedging if missing)
|
||||
"""
|
||||
rendered_parts = []
|
||||
for claim in claims:
|
||||
if claim.source_type == "verified":
|
||||
part = f"[V] {claim.text}"
|
||||
if claim.source_ref:
|
||||
part += f" [source: {claim.source_ref}]"
|
||||
else: # inferred
|
||||
if not claim.hedged:
|
||||
# Add hedging if missing
|
||||
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
|
||||
part = f"[I] {hedged_text}"
|
||||
else:
|
||||
part = f"[I] {claim.text}"
|
||||
rendered_parts.append(part)
|
||||
return " ".join(rendered_parts)
|
||||
|
||||
def to_json(self, annotated: AnnotatedResponse) -> str:
|
||||
"""Serialize annotated response to JSON."""
|
||||
return json.dumps(
|
||||
{
|
||||
"original_text": annotated.original_text,
|
||||
"rendered_text": annotated.rendered_text,
|
||||
"has_unverified": annotated.has_unverified,
|
||||
"claims": [asdict(c) for c in annotated.claims],
|
||||
},
|
||||
indent=2,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
@@ -1,54 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Smoke test for load_cap_enforcer.py — validates structure and dry-run path.
|
||||
|
||||
Refs: timmy-home #498
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT = Path(__file__).parent.parent / "timmy-config" / "bin" / "load_cap_enforcer.py"
|
||||
|
||||
|
||||
def test_script_exists_and_is_executable():
|
||||
assert SCRIPT.exists(), f"Script not found: {SCRIPT}"
|
||||
assert os.access(SCRIPT, os.X_OK), "Script not executable"
|
||||
|
||||
|
||||
def test_dry_run_help():
|
||||
result = subprocess.run([sys.executable, str(SCRIPT), "--help"], capture_output=True, text=True)
|
||||
assert result.returncode == 0
|
||||
assert "--dry-run" in result.stdout
|
||||
assert "--cap" in result.stdout
|
||||
assert "Enforce open-issue load cap" in result.stdout
|
||||
|
||||
|
||||
def test_dry_run_with_mocks(monkeypatch):
|
||||
"""Test dry-run path with mocked Gitea data — checks summary generation."""
|
||||
# Create a tiny stub script that imports the module and exercises core functions
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("load_cap_enforcer", SCRIPT)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
# Load but don't execute main yet — just verify module structure
|
||||
# We'll parse the module source for expected symbols
|
||||
source = SCRIPT.read_text()
|
||||
assert "fetch_all_open_issues" in source
|
||||
assert "build_summary" in source
|
||||
assert "unassignment_map" in source
|
||||
assert "COMMENT_TEMPLATE" in source
|
||||
assert "Unassigned from @{assignee} due to load cap" in source
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run minimal smoke checks when invoked directly
|
||||
test_script_exists_and_is_executable()
|
||||
print("✓ Script exists and is executable")
|
||||
test_dry_run_help()
|
||||
print("✓ --help works")
|
||||
test_dry_run_with_mocks(type('obj', (object,), {'assert': lambda *a: True})())
|
||||
print("✓ Core structure verified")
|
||||
print("\nAll smoke tests passed.")
|
||||
|
||||
77
tests/test_unified_fleet_sovereignty_status.py
Normal file
77
tests/test_unified_fleet_sovereignty_status.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPT_PATH = ROOT / "scripts" / "unified_fleet_sovereignty_status.py"
|
||||
DOC_PATH = ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
|
||||
|
||||
|
||||
def _load_module(path: Path, name: str):
|
||||
assert path.exists(), f"missing {path.relative_to(ROOT)}"
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
assert spec and spec.loader
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _workstream(result: dict, key: str) -> dict:
|
||||
for workstream in result["workstreams"]:
|
||||
if workstream["key"] == key:
|
||||
return workstream
|
||||
raise AssertionError(f"missing workstream {key}")
|
||||
|
||||
|
||||
def test_evaluate_directive_flags_reference_drift_without_faking_completion() -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
|
||||
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
|
||||
|
||||
assert len(result["reference_drift"]) == 4
|
||||
assert any("#813" in item for item in result["reference_drift"])
|
||||
assert any("#103" in item for item in result["reference_drift"])
|
||||
|
||||
nostr = _workstream(result, "nostr-migration")
|
||||
assert nostr["status"] == "PARTIAL"
|
||||
assert any("timmy_client.py" in item for item in nostr["repo_evidence_present"])
|
||||
|
||||
lexicon = _workstream(result, "lexicon-enforcement")
|
||||
assert all(item["aligned"] for item in lexicon["reference_results"])
|
||||
assert lexicon["status"] == "PARTIAL"
|
||||
|
||||
syntax_guard = _workstream(result, "syntax-guard")
|
||||
assert syntax_guard["status"] == "MISSING"
|
||||
assert any("deployment verifier" in item for item in syntax_guard["missing_deliverables"])
|
||||
|
||||
|
||||
def test_render_markdown_includes_required_sections_and_grounding_evidence() -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
|
||||
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
|
||||
report = mod.render_markdown(result)
|
||||
|
||||
for snippet in (
|
||||
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
|
||||
"## Directive Snapshot",
|
||||
"## Reference Drift",
|
||||
"## Workstream Matrix",
|
||||
"### 5. Infrastructure Hardening / Syntax Guard — MISSING",
|
||||
"`infrastructure/timmy-bridge/client/timmy_client.py`",
|
||||
"machine-checkable lexicon policy for review/triage",
|
||||
"## Why #524 Remains Open",
|
||||
):
|
||||
assert snippet in report
|
||||
|
||||
|
||||
def test_repo_contains_committed_issue_524_grounding_doc() -> None:
|
||||
assert DOC_PATH.exists(), "missing committed directive grounding doc"
|
||||
text = DOC_PATH.read_text(encoding="utf-8")
|
||||
for snippet in (
|
||||
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
|
||||
"## Reference Drift",
|
||||
"## Workstream Matrix",
|
||||
"## Highest-Leverage Next Actions",
|
||||
"## Why #524 Remains Open",
|
||||
):
|
||||
assert snippet in text
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for claim_annotator.py — verifies source distinction is present."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
|
||||
|
||||
|
||||
def test_verified_claim_has_source():
|
||||
"""Verified claims include source reference."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
response = "Paris is the capital of France. It is a beautiful city."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert len(result.claims) > 0
|
||||
verified_claims = [c for c in result.claims if c.source_type == "verified"]
|
||||
assert len(verified_claims) == 1
|
||||
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
|
||||
assert "[V]" in result.rendered_text
|
||||
assert "[source:" in result.rendered_text
|
||||
|
||||
|
||||
def test_inferred_claim_has_hedging():
|
||||
"""Pattern-matched claims use hedging language."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "The weather is nice today. It might rain tomorrow."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
|
||||
assert len(inferred_claims) >= 1
|
||||
# Check that rendered text has [I] marker
|
||||
assert "[I]" in result.rendered_text
|
||||
# Check that unhedged inferred claims get hedging
|
||||
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
|
||||
|
||||
|
||||
def test_hedged_claim_not_double_hedged():
|
||||
"""Claims already with hedging are not double-hedged."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "I think the sky is blue. It is a nice day."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
# The "I think" claim should not become "I think I think ..."
|
||||
assert "I think I think" not in result.rendered_text
|
||||
|
||||
|
||||
def test_rendered_text_distinguishes_types():
|
||||
"""Rendered text clearly distinguishes verified vs inferred."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Earth is round": "https://science.org/earth"}
|
||||
response = "Earth is round. Stars are far away."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert "[V]" in result.rendered_text # verified marker
|
||||
assert "[I]" in result.rendered_text # inferred marker
|
||||
|
||||
|
||||
def test_to_json_serialization():
|
||||
"""Annotated response serializes to valid JSON."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "Test claim."
|
||||
result = annotator.annotate_claims(response)
|
||||
json_str = annotator.to_json(result)
|
||||
parsed = json.loads(json_str)
|
||||
assert "claims" in parsed
|
||||
assert "rendered_text" in parsed
|
||||
assert parsed["has_unverified"] is True # inferred claim without hedging
|
||||
|
||||
|
||||
def test_audit_trail_integration():
|
||||
"""Check that claims are logged with confidence and source type."""
|
||||
# This test verifies the audit trail integration point
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"AI is useful": "https://example.com/ai"}
|
||||
response = "AI is useful. It can help with tasks."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
for claim in result.claims:
|
||||
assert claim.source_type in ("verified", "inferred")
|
||||
assert claim.confidence in ("high", "medium", "low", "unknown")
|
||||
if claim.source_type == "verified":
|
||||
assert claim.source_ref is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_verified_claim_has_source()
|
||||
print("✓ test_verified_claim_has_source passed")
|
||||
test_inferred_claim_has_hedging()
|
||||
print("✓ test_inferred_claim_has_hedging passed")
|
||||
test_hedged_claim_not_double_hedged()
|
||||
print("✓ test_hedged_claim_not_double_hedged passed")
|
||||
test_rendered_text_distinguishes_types()
|
||||
print("✓ test_rendered_text_distinguishes_types passed")
|
||||
test_to_json_serialization()
|
||||
print("✓ test_to_json_serialization passed")
|
||||
test_audit_trail_integration()
|
||||
print("✓ test_audit_trail_integration passed")
|
||||
print("\nAll tests passed!")
|
||||
@@ -1,210 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Open-Load Cap Enforcement — Audit-B3
|
||||
|
||||
Scans multiple repos for open issues, enforces a per-agent open-issue cap,
|
||||
auto-unassigns overflow (oldest first), and posts a summary.
|
||||
|
||||
Acceptance (timmy-home #498):
|
||||
- Lives in timmy-config/bin/load_cap_enforcer.py
|
||||
- Scans timmy-home, timmy-config, the-nexus, hermes-agent
|
||||
- Cap: 25 open issues per agent (configurable)
|
||||
- Unassign oldest overflow, comment on each
|
||||
- Dry-run first, then live; summary posted on parent issue #495
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
ORG = "Timmy_Foundation"
|
||||
REPOS = ["timmy-home", "timmy-config", "the-nexus", "hermes-agent"]
|
||||
TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_CAP = 25
|
||||
COMMENT_TEMPLATE = "Unassigned from @{{assignee}} due to load cap. Available for pickup."
|
||||
|
||||
|
||||
def load_token() -> str:
|
||||
if TOKEN_PATH.exists():
|
||||
return TOKEN_PATH.read_text().strip()
|
||||
tok = os.environ.get("GITEA_TOKEN", "")
|
||||
if tok:
|
||||
return tok
|
||||
sys.exit("ERROR: Gitea token not found at ~/.config/gitea/token or GITEA_TOKEN env")
|
||||
|
||||
|
||||
def api(method: str, path: str, token: str, data=None):
|
||||
url = f"{GITEA_BASE}{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
if body:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read()), resp.status
|
||||
except urllib.error.HTTPError as e:
|
||||
err = e.read().decode() if e.fp else str(e)
|
||||
print(f" API {e.code}: {err}", file=sys.stderr)
|
||||
return None, e.code
|
||||
except Exception as e:
|
||||
print(f" Request error: {e}", file=sys.stderr)
|
||||
return None, None
|
||||
|
||||
|
||||
def fetch_all_open_issues(token: str):
|
||||
all_issues = []
|
||||
for repo in REPOS:
|
||||
page = 1
|
||||
while True:
|
||||
data, status = api("GET", f"/repos/{ORG}/{repo}/issues?state=open&page={page}&limit=50", token)
|
||||
if status != 200 or not data:
|
||||
break
|
||||
all_issues.extend(data)
|
||||
if len(data) < 50:
|
||||
break
|
||||
page += 1
|
||||
return all_issues
|
||||
|
||||
|
||||
def build_summary(by_agent: dict, unassignment_map: dict):
|
||||
lines = []
|
||||
lines.append("Agent | Before | After | Unassigned Count")
|
||||
lines.append("-" * 50)
|
||||
for agent in sorted(by_agent.keys()):
|
||||
before = by_agent[agent]["before"]
|
||||
after = by_agent[agent]["after"]
|
||||
unassigned = len(unassignment_map.get(agent, []))
|
||||
lines.append(f"@{agent} | {before} | {after} | {unassigned}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Enforce open-issue load cap per agent")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Report without making changes")
|
||||
parser.add_argument("--cap", type=int, default=DEFAULT_CAP, help=f"Max open issues per agent (default: {DEFAULT_CAP})")
|
||||
parser.add_argument("--output", type=str, default=None, help="Write summary to file")
|
||||
parser.add_argument("--comment-on", type=int, default=None, help="Post summary as comment on timmy-home issue N")
|
||||
args = parser.parse_args()
|
||||
|
||||
token = load_token()
|
||||
print(f"Fetching open issues from {', '.join(REPOS)} ...")
|
||||
issues = fetch_all_open_issues(token)
|
||||
print(f"Fetched {len(issues)} open issues.")
|
||||
|
||||
# Group by assignee
|
||||
by_agent = defaultdict(lambda: {"before": 0, "issues": []})
|
||||
for iss in issues:
|
||||
for a in (iss.get("assignees") or []):
|
||||
login = a.get("login")
|
||||
if login:
|
||||
by_agent[login]["issues"].append(iss)
|
||||
by_agent[login]["before"] += 1
|
||||
|
||||
print(f"\nAgents with open issues: {list(by_agent.keys())}")
|
||||
for agent, d in sorted(by_agent.items()):
|
||||
print(f" @{agent}: {d['before']} issues")
|
||||
|
||||
# Identify overflow
|
||||
unassignment_map = defaultdict(list)
|
||||
for agent, d in by_agent.items():
|
||||
count = d["before"]
|
||||
if count > args.cap:
|
||||
overflow = count - args.cap
|
||||
issues_sorted = sorted(d["issues"], key=lambda i: i.get("created_at", ""))
|
||||
unassignment_map[agent] = issues_sorted[:overflow]
|
||||
print(f"\n@{agent} exceeds cap ({count} > {args.cap}); will unassign {overflow} oldest issue(s):")
|
||||
for iss in issues_sorted[:overflow]:
|
||||
print(f" - #{iss['number']}: {iss.get('title', '')[:50]}")
|
||||
|
||||
# Dry-run: just show summary and exit
|
||||
if args.dry_run:
|
||||
print("\n=== DRY RUN — no changes made ===")
|
||||
# For dry-run, after = before (no changes)
|
||||
for agent in by_agent:
|
||||
by_agent[agent]["after"] = by_agent[agent]["before"]
|
||||
summary = build_summary(by_agent, unassignment_map)
|
||||
print("\n" + summary)
|
||||
if args.output:
|
||||
Path(args.output).write_text(summary)
|
||||
print(f"\nSummary written to {args.output}")
|
||||
return 0
|
||||
|
||||
# LIVE: perform unassignments and comments (concurrent)
|
||||
print("\n=== LIVE RUN — executing ===")
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import threading
|
||||
lock = threading.Lock()
|
||||
tasks = []
|
||||
for agent, issues_to_unassign in unassignment_map.items():
|
||||
for iss in issues_to_unassign:
|
||||
issue_num = iss["number"]
|
||||
repo_name = next(
|
||||
(r for r in REPOS if f"/{r}/issues/" in iss.get("html_url", "")), REPOS[0]
|
||||
)
|
||||
tasks.append((agent, issue_num, repo_name, iss))
|
||||
print(f"Total unassignment tasks: {len(tasks)}")
|
||||
def do_task(agent, issue_num, repo_name, iss):
|
||||
# Unassign
|
||||
_, status1 = api("PATCH", f"/repos/{ORG}/{repo_name}/issues/{issue_num}", token, {"assignees": []})
|
||||
if status1 not in (200, 201, 204):
|
||||
return (agent, issue_num, repo_name, False, f"unassign HTTP {status1}")
|
||||
# Comment
|
||||
comment_body = COMMENT_TEMPLATE.format(assignee=agent)
|
||||
_, status2 = api("POST", f"/repos/{ORG}/{repo_name}/issues/{issue_num}/comments", token, {"body": comment_body})
|
||||
if status2 not in (200, 201):
|
||||
return (agent, issue_num, repo_name, True, f"unassigned but comment HTTP {status2}")
|
||||
return (agent, issue_num, repo_name, True, "OK")
|
||||
completed = 0
|
||||
with ThreadPoolExecutor(max_workers=12) as executor:
|
||||
futures = [executor.submit(do_task, a, n, r, i) for (a, n, r, i) in tasks]
|
||||
for fut in as_completed(futures):
|
||||
agent, num, repo, ok, msg = fut.result()
|
||||
with lock:
|
||||
completed += 1
|
||||
if completed % 50 == 0:
|
||||
print(f" Progress: {completed}/{len(tasks)}")
|
||||
if ok:
|
||||
print(f" ✓ #{num} ({repo})")
|
||||
else:
|
||||
print(f" ✗ #{num} ({repo}): {msg}")
|
||||
|
||||
# Recompute after counts for summary
|
||||
print("\nRecomputing after counts ...")
|
||||
after_issues = fetch_all_open_issues(token)
|
||||
by_agent_after = defaultdict(int)
|
||||
for iss in after_issues:
|
||||
for a in (iss.get("assignees") or []):
|
||||
by_agent_after[a.get("login")] += 1
|
||||
for agent in by_agent:
|
||||
by_agent[agent]["after"] = by_agent_after.get(agent, 0)
|
||||
|
||||
summary = build_summary(by_agent, unassignment_map)
|
||||
print("\n=== SUMMARY ===")
|
||||
print(summary)
|
||||
|
||||
if args.output:
|
||||
Path(args.output).write_text(summary)
|
||||
print(f"Summary written to {args.output}")
|
||||
|
||||
if args.comment_on:
|
||||
body = f"Open-load cap enforcement run (cap={args.cap}):\n\n```\n{summary}\n```"
|
||||
_, status = api("POST", f"/repos/{ORG}/timmy-home/issues/{args.comment_on}/comments", token, {"body": body})
|
||||
if status in (200, 201):
|
||||
print(f"\nSummary posted as comment on timmy-home issue #{args.comment_on}")
|
||||
else:
|
||||
print(f"\nWARNING: failed to post comment (HTTP {status})")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user