Compare commits
4 Commits
step35/467
...
fix/519
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
212a5befba | ||
|
|
1eaf6df7d0 | ||
|
|
735fafa235 | ||
|
|
e53d54f873 |
@@ -1,142 +0,0 @@
|
||||
---
|
||||
name: sov-bundle-export-import
|
||||
category: data-export
|
||||
description: |
|
||||
Sovereign Bundle (.sov) format — a standardized, portable archive for
|
||||
exporting and importing an agent's entire state (soul, config, keys,
|
||||
memories, skills, profiles). Enables backup, migration, and sovereignty.
|
||||
---
|
||||
|
||||
# Sovereign Bundle Format (.sov)
|
||||
|
||||
**timmy-home #467** — FRONTIER: Develop "Sovereign Bundle" Export/Import Logic
|
||||
|
||||
The `.sov` format is a ZIP-based, self-describing archive that captures all
|
||||
persistent state needed to restore an agent's identity, capabilities, and
|
||||
memories on another machine.
|
||||
|
||||
## Format
|
||||
|
||||
```
|
||||
sov/
|
||||
├── META.json # Format identifier + environment metadata
|
||||
├── manifest.json # Bundle contents & component sizes (canonical index)
|
||||
├── soul/
|
||||
│ └── SOUL.md # Identity document, values, oath
|
||||
├── config/
|
||||
│ └── config.yaml # Agent configuration, providers, toolsets
|
||||
├── keys/
|
||||
│ └── keymaxxing.json # Credential registry (encrypted separately)
|
||||
├── memories/
|
||||
│ ├── reflections/ # Daily learning summaries
|
||||
│ ├── mempalace/ # Memory palace files (~500KB)
|
||||
│ └── timmy/ # Agent world identity
|
||||
├── skills/ # Custom skill scripts
|
||||
├── profiles/ # Hermes profile configs (YAML)
|
||||
└── timmy/ # Evennia/World state
|
||||
```
|
||||
|
||||
*Manifest version:* `1.0`
|
||||
*Filename suffix:* `.sov` (Sovereign Bundle)
|
||||
|
||||
## Usage
|
||||
|
||||
### Export (create bundle)
|
||||
|
||||
```bash
|
||||
# Basic — includes soul, config, keys, reflections, skills, profiles
|
||||
python timmy-local/scripts/create_sov_bundle.py export -o my-agent.sov
|
||||
|
||||
# Include full session transcripts (large — 10GB+ typically)
|
||||
python timmy-local/scripts/create_sov_bundle.py export \
|
||||
--include-sessions -o full-backup.sov
|
||||
|
||||
# From a specific HERMES_HOME
|
||||
HERMES_HOME=/path/to/.hermes python timmy-local/scripts/create_sov_bundle.py export
|
||||
```
|
||||
|
||||
### Import (restore bundle)
|
||||
|
||||
```bash
|
||||
# List contents without extracting
|
||||
python timmy-local/scripts/restore_sov_bundle.py --list my-agent.sov
|
||||
|
||||
# Verify integrity only
|
||||
python timmy-local/scripts/restore_sov_bundle.py verify my-agent.sov
|
||||
|
||||
# Dry-run (preview where files would go)
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov --dry-run
|
||||
|
||||
# Restore to target directory
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov \
|
||||
--target /path/to/hermes
|
||||
|
||||
# Restore to default HERMES_HOME
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov --yes
|
||||
```
|
||||
|
||||
### Verify / list
|
||||
|
||||
```bash
|
||||
# Verify hash + manifest
|
||||
python timmy-local/scripts/restore_sov_bundle.py verify my-agent.sov
|
||||
|
||||
# List archives
|
||||
python timmy-local/scripts/restore_sov_bundle.py --list my-agent.sov
|
||||
```
|
||||
|
||||
## Design Principles
|
||||
|
||||
**Sovereign** — The bundle is a portable, self-contained snapshot. No
|
||||
third-party service required to read or write it.
|
||||
|
||||
**Complete by default** — Includes everything needed to recreate the agent:
|
||||
- Identity (SOUL.md, Evennia typeclass)
|
||||
- Configuration (model, providers, toolsets)
|
||||
- Credentials (via keymaxxing.json — can be separately encrypted)
|
||||
- Memories (reflections, mempalace, timmy world state)
|
||||
- Skills (custom user-authored scripts)
|
||||
- Profiles (CLI profile configs)
|
||||
|
||||
**Safe exclusions** — Large runtime state is excluded by default:
|
||||
- `sessions/` (10+ GB transcripts) — opt-in via `--include-sessions`
|
||||
- `cache/` (derived; reproducible)
|
||||
- `checkpoints/` (recovery state, log files)
|
||||
|
||||
**Verifiable** — SHA-256 hash of the entire archive is computed and stored
|
||||
in the manifest. Integrity can be checked without extracting.
|
||||
|
||||
**Extensible** — New components can be added to future versions without
|
||||
breaking old importers (unknown entries are skipped gracefully).
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
- Core code: `timmy-local/scripts/create_sov_bundle.py`, `restore_sov_bundle.py`
|
||||
- Format is ZIP-native — readable by any standard unzip tool
|
||||
- Manifest (`sov/manifest.json`) tracks component-level sizes for quick diffing
|
||||
- `sov/META.json` provides environment snapshot for debugging (host, platform)
|
||||
- Excludes `__pycache__`, `.venv`, `.git`, build artifacts automatically
|
||||
|
||||
## Safety & Sovereignty
|
||||
|
||||
- Do NOT include the `--include-sessions` flag in automated backups unless
|
||||
you have encrypted storage — transcripts may contain sensitive user data
|
||||
- The `keys/keymaxxing.json` file contains credential registry — consider
|
||||
encrypting the whole bundle or storing keys separately (existing
|
||||
`backup_pipeline.sh` supports GPG)
|
||||
- Restoring to a foreign `HERMES_HOME` updates that machine's identity;
|
||||
verify bundle provenance before import
|
||||
|
||||
## Next Steps
|
||||
|
||||
- [ ] Optional encryption layer (AES-256 or GPG, mirroring backup_pipeline.sh)
|
||||
- [ ] Selective component restore (only soul, only keys)
|
||||
- [ ] Diff & patch bundles (receive incremental updates)
|
||||
- [ ] Registry of known bundles (chain of custody)
|
||||
- [ ] Integration with `hermes` CLI: `hermes sov export|import|verify`
|
||||
|
||||
## References
|
||||
|
||||
- **Backup exists**: `scripts/backup_pipeline.sh` — encrypted tarball of ~/.hermes
|
||||
- This format complements, does not replace the backup pipeline — it's a
|
||||
structured, portable, versioned alternative for migration & inspection
|
||||
20
SOUL.md
20
SOUL.md
@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
|
||||
|
||||
---
|
||||
|
||||
## What Honesty Requires
|
||||
|
||||
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
|
||||
|
||||
The following are not optional. Any implementation that calls itself Timmy must build them.
|
||||
|
||||
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
|
||||
|
||||
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
|
||||
|
||||
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
|
||||
|
||||
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
|
||||
|
||||
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
|
||||
|
||||
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
|
||||
|
||||
---
|
||||
|
||||
## For the Audio Overview
|
||||
|
||||
If you are listening to this as a NotebookLM Audio Overview:
|
||||
|
||||
18
configs/burn_velocity_repos.json
Normal file
18
configs/burn_velocity_repos.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"owner": "Timmy_Foundation",
|
||||
"repos": [
|
||||
"timmy-home",
|
||||
"timmy-config",
|
||||
"fleet-ops",
|
||||
"the-beacon",
|
||||
"the-door",
|
||||
"the-nexus"
|
||||
],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6
|
||||
}
|
||||
}
|
||||
70
docs/BURN_VELOCITY_TRACKING.md
Normal file
70
docs/BURN_VELOCITY_TRACKING.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Burn-down Velocity Tracking
|
||||
|
||||
Refs #519.
|
||||
|
||||
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
|
||||
|
||||
## What it emits
|
||||
|
||||
Daily run outputs:
|
||||
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
|
||||
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
|
||||
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
|
||||
|
||||
Tracked repos live in `configs/burn_velocity_repos.json`.
|
||||
|
||||
## Cron command
|
||||
|
||||
```bash
|
||||
cd ~/timmy-home && \
|
||||
python3 scripts/burn_velocity_tracker.py \
|
||||
--config configs/burn_velocity_repos.json \
|
||||
--output-json ~/.timmy/burn-velocity/latest.json \
|
||||
--output-md ~/.timmy/burn-velocity/latest.md \
|
||||
--history-file ~/.timmy/burn-velocity/history.json \
|
||||
--write-history
|
||||
```
|
||||
|
||||
Example crontab entry:
|
||||
|
||||
```cron
|
||||
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
|
||||
```
|
||||
|
||||
## Dashboard handoff
|
||||
|
||||
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
|
||||
- `open_now`
|
||||
- `opened_last_7d`
|
||||
- `closed_last_7d`
|
||||
- `baseline_closed`
|
||||
- `weekly_net`
|
||||
- `alert.status`
|
||||
- `alert.kind`
|
||||
- `alert.reason`
|
||||
|
||||
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
|
||||
|
||||
## Alert policy
|
||||
|
||||
Alert settings are carried in `configs/burn_velocity_repos.json`:
|
||||
- `recent_days`
|
||||
- `baseline_days`
|
||||
- `minimum_baseline_closed`
|
||||
- `drop_ratio`
|
||||
|
||||
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
|
||||
|
||||
## Gitea API contract
|
||||
|
||||
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
|
||||
|
||||
Live collection shape:
|
||||
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
|
||||
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
|
||||
|
||||
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
|
||||
|
||||
## Honest scope boundary
|
||||
|
||||
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.
|
||||
406
scripts/burn_velocity_tracker.py
Normal file
406
scripts/burn_velocity_tracker.py
Normal file
@@ -0,0 +1,406 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
|
||||
|
||||
Refs: timmy-home #519
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from datetime import date, datetime, time, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import parse, request
|
||||
from base64 import b64encode
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
|
||||
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
|
||||
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
|
||||
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
|
||||
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
|
||||
DEFAULT_CONFIG = {
|
||||
"owner": DEFAULT_OWNER,
|
||||
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def parse_iso8601(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
normalized = value.replace("Z", "+00:00")
|
||||
parsed = datetime.fromisoformat(normalized)
|
||||
if parsed.tzinfo is None:
|
||||
return parsed.replace(tzinfo=timezone.utc)
|
||||
return parsed.astimezone(timezone.utc)
|
||||
|
||||
|
||||
def normalize_today(value: str | date | None = None) -> date:
|
||||
if value is None:
|
||||
return datetime.now(timezone.utc).date()
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
return date.fromisoformat(value)
|
||||
|
||||
|
||||
def build_day_window(today: date, lookback_days: int) -> list[date]:
|
||||
start = today - timedelta(days=lookback_days - 1)
|
||||
return [start + timedelta(days=offset) for offset in range(lookback_days)]
|
||||
|
||||
|
||||
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
return [item for item in items if not item.get("pull_request")]
|
||||
|
||||
|
||||
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
|
||||
days = build_day_window(today, lookback_days)
|
||||
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
|
||||
start_day = days[0]
|
||||
|
||||
for item in filter_issue_items(items):
|
||||
created_at = parse_iso8601(item.get("created_at"))
|
||||
if created_at is not None:
|
||||
created_day = created_at.date()
|
||||
if start_day <= created_day <= today:
|
||||
counts[created_day.isoformat()]["opened"] += 1
|
||||
|
||||
closed_at = parse_iso8601(item.get("closed_at"))
|
||||
if closed_at is not None:
|
||||
closed_day = closed_at.date()
|
||||
if start_day <= closed_day <= today:
|
||||
counts[closed_day.isoformat()]["closed"] += 1
|
||||
|
||||
return [
|
||||
{
|
||||
"date": day.isoformat(),
|
||||
"opened": counts[day.isoformat()]["opened"],
|
||||
"closed": counts[day.isoformat()]["closed"],
|
||||
}
|
||||
for day in days
|
||||
]
|
||||
|
||||
|
||||
def summarize_velocity_alert(
|
||||
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
|
||||
drop_ratio = float(config.get("drop_ratio", 0.6))
|
||||
|
||||
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
|
||||
return {
|
||||
"status": "drop",
|
||||
"kind": "velocity_drop",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": (
|
||||
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
|
||||
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
|
||||
),
|
||||
}
|
||||
|
||||
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
|
||||
return {
|
||||
"status": "drop",
|
||||
"kind": "velocity_drop",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
|
||||
}
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"kind": "none",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": "velocity stable",
|
||||
}
|
||||
|
||||
|
||||
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
|
||||
if days <= 0:
|
||||
return 0
|
||||
return sum(int(item[field]) for item in daily[-days:])
|
||||
|
||||
|
||||
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
|
||||
if baseline_days <= 0:
|
||||
return 0
|
||||
if recent_days <= 0:
|
||||
return sum(int(item["closed"]) for item in daily[-baseline_days:])
|
||||
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
|
||||
return sum(int(item["closed"]) for item in baseline_slice)
|
||||
|
||||
|
||||
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
|
||||
report_day = normalize_today(today)
|
||||
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
owner = config.get("owner", DEFAULT_OWNER)
|
||||
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
|
||||
lookback_days = int(config.get("lookback_days", 14))
|
||||
alert_config = dict(DEFAULT_CONFIG["alert"])
|
||||
alert_config.update(config.get("alert") or {})
|
||||
recent_days = int(alert_config.get("recent_days", 7))
|
||||
baseline_days = int(alert_config.get("baseline_days", 7))
|
||||
|
||||
repo_reports: list[dict[str, Any]] = []
|
||||
total_open_now = 0
|
||||
total_closed_last_7d = 0
|
||||
repos_with_alerts: list[str] = []
|
||||
|
||||
for repo_name in repos:
|
||||
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
|
||||
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
|
||||
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
|
||||
daily = build_daily_series(recent_issues, report_day, lookback_days)
|
||||
|
||||
open_now = len(open_issues)
|
||||
opened_last_7d = _sum_window(daily, "opened", recent_days)
|
||||
closed_last_7d = _sum_window(daily, "closed", recent_days)
|
||||
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
|
||||
weekly_net = opened_last_7d - closed_last_7d
|
||||
alert = summarize_velocity_alert(
|
||||
recent_closed=closed_last_7d,
|
||||
baseline_closed=baseline_closed,
|
||||
open_now=open_now,
|
||||
config=alert_config,
|
||||
)
|
||||
|
||||
repo_report = {
|
||||
"repo": repo_name,
|
||||
"open_now": open_now,
|
||||
"opened_last_7d": opened_last_7d,
|
||||
"closed_last_7d": closed_last_7d,
|
||||
"baseline_closed": baseline_closed,
|
||||
"weekly_net": weekly_net,
|
||||
"daily": daily,
|
||||
"alert": alert,
|
||||
}
|
||||
repo_reports.append(repo_report)
|
||||
|
||||
total_open_now += open_now
|
||||
total_closed_last_7d += closed_last_7d
|
||||
if alert["status"] != "ok":
|
||||
repos_with_alerts.append(repo_name)
|
||||
|
||||
return {
|
||||
"owner": owner,
|
||||
"generated_at": generated_at,
|
||||
"generated_day": report_day.isoformat(),
|
||||
"lookback_days": lookback_days,
|
||||
"dashboard_contract_version": 1,
|
||||
"repos": repo_reports,
|
||||
"summary": {
|
||||
"total_open_now": total_open_now,
|
||||
"total_closed_last_7d": total_closed_last_7d,
|
||||
"repos_with_alerts": repos_with_alerts,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def render_markdown(report: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
"# Burn-down Velocity Tracking",
|
||||
"",
|
||||
f"Generated: {report['generated_at']}",
|
||||
f"Owner: {report['owner']}",
|
||||
f"Lookback days: {report['lookback_days']}",
|
||||
"",
|
||||
"## Per-repo velocity",
|
||||
"",
|
||||
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
|
||||
"| --- | ---: | ---: | ---: | ---: | --- |",
|
||||
]
|
||||
|
||||
for repo in report["repos"]:
|
||||
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
|
||||
lines.append(
|
||||
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
|
||||
)
|
||||
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"## Dashboard handoff for timmy-config",
|
||||
"",
|
||||
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
|
||||
"- `open_now`",
|
||||
"- `opened_last_7d`",
|
||||
"- `closed_last_7d`",
|
||||
"- `baseline_closed`",
|
||||
"- `alert.status` / `alert.kind` / `alert.reason`",
|
||||
"",
|
||||
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
|
||||
"",
|
||||
"## Alerts",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
|
||||
if not alerts:
|
||||
lines.append("- none")
|
||||
else:
|
||||
for repo in alerts:
|
||||
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
|
||||
if history_path.exists():
|
||||
history = json.loads(history_path.read_text(encoding="utf-8"))
|
||||
else:
|
||||
history = {"days": []}
|
||||
|
||||
entry = {
|
||||
"date": report["generated_day"],
|
||||
"generated_at": report["generated_at"],
|
||||
"summary": report["summary"],
|
||||
"repos": report["repos"],
|
||||
}
|
||||
|
||||
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
|
||||
retained.append(entry)
|
||||
retained.sort(key=lambda item: item["date"])
|
||||
history["days"] = retained
|
||||
|
||||
history_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
|
||||
return history
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def _headers(self) -> list[dict[str, str]]:
|
||||
return [
|
||||
{"Authorization": f"token {self.token}", "Accept": "application/json"},
|
||||
{
|
||||
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
|
||||
"Accept": "application/json",
|
||||
},
|
||||
]
|
||||
|
||||
def _request_json(self, url: str) -> list[dict[str, Any]]:
|
||||
last_error: Exception | None = None
|
||||
for headers in self._headers():
|
||||
try:
|
||||
req = request.Request(url, headers=headers)
|
||||
with request.urlopen(req, timeout=30) as response:
|
||||
return json.loads(response.read().decode())
|
||||
except Exception as exc: # pragma: no cover - exercised only on live API failure
|
||||
last_error = exc
|
||||
if last_error is None: # pragma: no cover - defensive
|
||||
raise RuntimeError("request failed without an exception")
|
||||
raise last_error
|
||||
|
||||
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
|
||||
issues: list[dict[str, Any]] = []
|
||||
page = 1
|
||||
while True:
|
||||
query = {"state": state, "type": "issues", "limit": 100, "page": page}
|
||||
if since:
|
||||
query["since"] = since
|
||||
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
|
||||
batch = self._request_json(url)
|
||||
if not batch:
|
||||
break
|
||||
issues.extend(filter_issue_items(batch))
|
||||
page += 1
|
||||
return issues
|
||||
|
||||
|
||||
def load_json(path: Path, default: Any) -> Any:
|
||||
if not path.exists():
|
||||
return default
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def load_config(path: Path) -> dict[str, Any]:
|
||||
config = dict(DEFAULT_CONFIG)
|
||||
alert = dict(DEFAULT_CONFIG["alert"])
|
||||
raw = load_json(path, {})
|
||||
config.update(raw)
|
||||
alert.update(raw.get("alert") or {})
|
||||
config["alert"] = alert
|
||||
return config
|
||||
|
||||
|
||||
def collect_live_snapshot(
|
||||
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
|
||||
) -> dict[str, Any]:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
report_day = normalize_today(today)
|
||||
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
|
||||
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
|
||||
|
||||
repos = list(config.get("repos") or [])
|
||||
repo_payload = {}
|
||||
for repo in repos:
|
||||
repo_payload[repo] = {
|
||||
"open_issues": client.list_issues(repo, state="open"),
|
||||
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
|
||||
}
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||||
"repos": repo_payload,
|
||||
}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
|
||||
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
|
||||
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
|
||||
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
|
||||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
|
||||
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
|
||||
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
|
||||
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
|
||||
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
|
||||
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
|
||||
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
config = load_config(args.config)
|
||||
|
||||
if args.snapshot_file:
|
||||
snapshot = load_json(args.snapshot_file, {"repos": {}})
|
||||
else:
|
||||
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
|
||||
|
||||
report = build_velocity_report(config, snapshot, today=args.today)
|
||||
|
||||
args.output_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output_md.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
|
||||
args.output_md.write_text(render_markdown(report), encoding="utf-8")
|
||||
|
||||
if args.write_history:
|
||||
update_history(args.history_file, report)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(report, indent=2))
|
||||
else:
|
||||
print(render_markdown(report))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
52
scripts/sov
52
scripts/sov
@@ -1,52 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sovereign Bundle (.sov) command-line wrapper.
|
||||
|
||||
Usage:
|
||||
sov export [-o OUTPUT] [--include-sessions]
|
||||
sov import BUNDLE [--target DIR] [--dry-run]
|
||||
sov verify BUNDLE
|
||||
sov list BUNDLE
|
||||
"""
|
||||
|
||||
import sys
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.parent / "timmy-local" / "scripts"
|
||||
CREATE_SCRIPT = SCRIPT_DIR / "create_sov_bundle.py"
|
||||
RESTORE_SCRIPT = SCRIPT_DIR / "restore_sov_bundle.py"
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(__doc__)
|
||||
sys.exit(1)
|
||||
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "export":
|
||||
# Delegate to create_sov_bundle.py
|
||||
args = [sys.executable, str(CREATE_SCRIPT), "export"] + sys.argv[2:]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd in ("import", "restore"):
|
||||
args = [sys.executable, str(RESTORE_SCRIPT)] + sys.argv[2:]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd == "verify":
|
||||
args = [sys.executable, str(RESTORE_SCRIPT), "verify", sys.argv[2]]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd in ("list", "ls"):
|
||||
args = [sys.executable, str(RESTORE_SCRIPT), "--list", sys.argv[2]]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
else:
|
||||
print(f"Unknown command: {cmd}", file=sys.stderr)
|
||||
print(__doc__)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,12 +1 @@
|
||||
# Timmy core module
|
||||
|
||||
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
|
||||
from .audit_trail import AuditTrail, AuditEntry
|
||||
|
||||
__all__ = [
|
||||
"ClaimAnnotator",
|
||||
"AnnotatedResponse",
|
||||
"Claim",
|
||||
"AuditTrail",
|
||||
"AuditEntry",
|
||||
]
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Response Claim Annotator — Source Distinction System
|
||||
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
|
||||
a verified source I can point to, or my own pattern-matching. My user must be
|
||||
able to tell which is which."
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Claim:
|
||||
"""A single claim in a response, annotated with source type."""
|
||||
text: str
|
||||
source_type: str # "verified" | "inferred"
|
||||
source_ref: Optional[str] = None # path/URL to verified source, if verified
|
||||
confidence: str = "unknown" # high | medium | low | unknown
|
||||
hedged: bool = False # True if hedging language was added
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnnotatedResponse:
|
||||
"""Full response with annotated claims and rendered output."""
|
||||
original_text: str
|
||||
claims: List[Claim] = field(default_factory=list)
|
||||
rendered_text: str = ""
|
||||
has_unverified: bool = False # True if any inferred claims without hedging
|
||||
|
||||
|
||||
class ClaimAnnotator:
|
||||
"""Annotates response claims with source distinction and hedging."""
|
||||
|
||||
# Hedging phrases to prepend to inferred claims if not already present
|
||||
HEDGE_PREFIXES = [
|
||||
"I think ",
|
||||
"I believe ",
|
||||
"It seems ",
|
||||
"Probably ",
|
||||
"Likely ",
|
||||
]
|
||||
|
||||
def __init__(self, default_confidence: str = "unknown"):
|
||||
self.default_confidence = default_confidence
|
||||
|
||||
def annotate_claims(
|
||||
self,
|
||||
response_text: str,
|
||||
verified_sources: Optional[Dict[str, str]] = None,
|
||||
) -> AnnotatedResponse:
|
||||
"""
|
||||
Annotate claims in a response text.
|
||||
|
||||
Args:
|
||||
response_text: Raw response from the model
|
||||
verified_sources: Dict mapping claim substrings to source references
|
||||
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
|
||||
Returns:
|
||||
AnnotatedResponse with claims marked and rendered text
|
||||
"""
|
||||
verified_sources = verified_sources or {}
|
||||
claims = []
|
||||
has_unverified = False
|
||||
|
||||
# Simple sentence splitting (naive, but sufficient for MVP)
|
||||
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
|
||||
|
||||
for sent in sentences:
|
||||
# Check if sentence is a claim we can verify
|
||||
matched_source = None
|
||||
for claim_substr, source_ref in verified_sources.items():
|
||||
if claim_substr.lower() in sent.lower():
|
||||
matched_source = source_ref
|
||||
break
|
||||
|
||||
if matched_source:
|
||||
# Verified claim
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="verified",
|
||||
source_ref=matched_source,
|
||||
confidence="high",
|
||||
hedged=False,
|
||||
)
|
||||
else:
|
||||
# Inferred claim (pattern-matched)
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="inferred",
|
||||
confidence=self.default_confidence,
|
||||
hedged=self._has_hedge(sent),
|
||||
)
|
||||
if not claim.hedged:
|
||||
has_unverified = True
|
||||
|
||||
claims.append(claim)
|
||||
|
||||
# Render the annotated response
|
||||
rendered = self._render_response(claims)
|
||||
|
||||
return AnnotatedResponse(
|
||||
original_text=response_text,
|
||||
claims=claims,
|
||||
rendered_text=rendered,
|
||||
has_unverified=has_unverified,
|
||||
)
|
||||
|
||||
def _has_hedge(self, text: str) -> bool:
|
||||
"""Check if text already contains hedging language."""
|
||||
text_lower = text.lower()
|
||||
for prefix in self.HEDGE_PREFIXES:
|
||||
if text_lower.startswith(prefix.lower()):
|
||||
return True
|
||||
# Also check for inline hedges
|
||||
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
|
||||
return any(word in text_lower for word in hedge_words)
|
||||
|
||||
def _render_response(self, claims: List[Claim]) -> str:
|
||||
"""
|
||||
Render response with source distinction markers.
|
||||
|
||||
Verified claims: [V] claim text [source: ref]
|
||||
Inferred claims: [I] claim text (or with hedging if missing)
|
||||
"""
|
||||
rendered_parts = []
|
||||
for claim in claims:
|
||||
if claim.source_type == "verified":
|
||||
part = f"[V] {claim.text}"
|
||||
if claim.source_ref:
|
||||
part += f" [source: {claim.source_ref}]"
|
||||
else: # inferred
|
||||
if not claim.hedged:
|
||||
# Add hedging if missing
|
||||
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
|
||||
part = f"[I] {hedged_text}"
|
||||
else:
|
||||
part = f"[I] {claim.text}"
|
||||
rendered_parts.append(part)
|
||||
return " ".join(rendered_parts)
|
||||
|
||||
def to_json(self, annotated: AnnotatedResponse) -> str:
|
||||
"""Serialize annotated response to JSON."""
|
||||
return json.dumps(
|
||||
{
|
||||
"original_text": annotated.original_text,
|
||||
"rendered_text": annotated.rendered_text,
|
||||
"has_unverified": annotated.has_unverified,
|
||||
"claims": [asdict(c) for c in annotated.claims],
|
||||
},
|
||||
indent=2,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
176
tests/test_burn_velocity_tracker.py
Normal file
176
tests/test_burn_velocity_tracker.py
Normal file
@@ -0,0 +1,176 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
|
||||
|
||||
|
||||
SNAPSHOT = {
|
||||
"generated_at": "2026-04-22T12:00:00Z",
|
||||
"repos": {
|
||||
"timmy-home": {
|
||||
"open_issues": [
|
||||
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
|
||||
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
|
||||
],
|
||||
"recent_issues": [
|
||||
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
|
||||
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
|
||||
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
|
||||
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
|
||||
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
|
||||
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
|
||||
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
|
||||
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
|
||||
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
|
||||
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
|
||||
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
|
||||
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
|
||||
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
|
||||
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
|
||||
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
|
||||
],
|
||||
},
|
||||
"timmy-config": {
|
||||
"open_issues": [
|
||||
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
|
||||
],
|
||||
"recent_issues": [
|
||||
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
|
||||
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
CONFIG = {
|
||||
"owner": "Timmy_Foundation",
|
||||
"repos": ["timmy-home", "timmy-config"],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
|
||||
assert report["generated_day"] == "2026-04-22"
|
||||
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
|
||||
assert report["summary"]["total_open_now"] == 3
|
||||
|
||||
home = report["repos"][0]
|
||||
assert home["repo"] == "timmy-home"
|
||||
assert home["open_now"] == 2
|
||||
assert home["opened_last_7d"] == 5
|
||||
assert home["closed_last_7d"] == 3
|
||||
assert home["baseline_closed"] == 7
|
||||
assert home["weekly_net"] == 2
|
||||
assert home["alert"]["status"] == "drop"
|
||||
assert home["alert"]["recent_closed"] == 3
|
||||
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
|
||||
|
||||
timmy_config = report["repos"][1]
|
||||
assert timmy_config["repo"] == "timmy-config"
|
||||
assert timmy_config["open_now"] == 1
|
||||
assert timmy_config["closed_last_7d"] == 1
|
||||
assert timmy_config["alert"]["status"] == "ok"
|
||||
|
||||
|
||||
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
rendered = render_markdown(report)
|
||||
|
||||
for snippet in (
|
||||
"# Burn-down Velocity Tracking",
|
||||
"## Per-repo velocity",
|
||||
"timmy-home",
|
||||
"timmy-config",
|
||||
"## Dashboard handoff for timmy-config",
|
||||
"velocity_drop",
|
||||
"## Alerts",
|
||||
):
|
||||
assert snippet in rendered
|
||||
|
||||
|
||||
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
|
||||
history_path = tmp_path / "burn-velocity-history.json"
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
update_history(history_path, report)
|
||||
|
||||
updated = json.loads(json.dumps(report))
|
||||
updated["repos"][0]["open_now"] = 9
|
||||
updated["summary"]["total_open_now"] = 10
|
||||
update_history(history_path, updated)
|
||||
|
||||
history = json.loads(history_path.read_text(encoding="utf-8"))
|
||||
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
|
||||
assert history["days"][0]["summary"]["total_open_now"] == 10
|
||||
assert history["days"][0]["repos"][0]["open_now"] == 9
|
||||
|
||||
|
||||
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
|
||||
snapshot_path = tmp_path / "snapshot.json"
|
||||
output_json = tmp_path / "latest.json"
|
||||
output_md = tmp_path / "latest.md"
|
||||
history_path = tmp_path / "history.json"
|
||||
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-m",
|
||||
"scripts.burn_velocity_tracker",
|
||||
"--snapshot-file",
|
||||
str(snapshot_path),
|
||||
"--today",
|
||||
"2026-04-22",
|
||||
"--output-json",
|
||||
str(output_json),
|
||||
"--output-md",
|
||||
str(output_md),
|
||||
"--history-file",
|
||||
str(history_path),
|
||||
"--write-history",
|
||||
"--json",
|
||||
],
|
||||
check=True,
|
||||
cwd=ROOT,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
payload = json.loads(result.stdout)
|
||||
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
|
||||
assert output_json.exists()
|
||||
assert output_md.exists()
|
||||
assert history_path.exists()
|
||||
assert "timmy-config" in output_md.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_repo_contains_burn_velocity_tracking_doc() -> None:
|
||||
text = DOC_PATH.read_text(encoding="utf-8")
|
||||
required = [
|
||||
"# Burn-down Velocity Tracking",
|
||||
"python3 scripts/burn_velocity_tracker.py",
|
||||
"configs/burn_velocity_repos.json",
|
||||
"~/.timmy/burn-velocity/latest.json",
|
||||
"timmy-config dashboard",
|
||||
"type=issues",
|
||||
"velocity_drop",
|
||||
]
|
||||
for snippet in required:
|
||||
assert snippet in text
|
||||
@@ -1,145 +0,0 @@
|
||||
|
||||
import tempfile
|
||||
import zipfile
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to sys.path for imports
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "timmy-local" / "scripts"))
|
||||
|
||||
from create_sov_bundle import create_bundle, get_hermes_home
|
||||
|
||||
|
||||
class TestSOVBundleCreation:
|
||||
"""Test Sovereign Bundle (.sov) format creation and structure."""
|
||||
|
||||
def test_bundle_creates_file(self, tmp_path):
|
||||
"""A .sov bundle is created at the specified output path."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
|
||||
assert out.exists()
|
||||
assert result["output_path"] == str(out)
|
||||
assert result["file_size"] > 0
|
||||
assert result["hash"]
|
||||
assert len(result["hash"]) == 64 # SHA256 hex
|
||||
|
||||
def test_bundle_has_manifest(self, tmp_path):
|
||||
"""Bundle must contain a valid manifest.json in sov/ hierarchy."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
assert "sov/manifest.json" in names
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
assert manifest["version"] == "1.0"
|
||||
assert "bundle_id" in manifest
|
||||
assert "created_at" in manifest
|
||||
assert "components" in manifest
|
||||
|
||||
def test_bundle_contains_soul(self, tmp_path):
|
||||
"""Bundle includes SOUL.md from HERMES_HOME."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
assert "sov/soul/SOUL.md" in names
|
||||
|
||||
soul = zf.read("sov/soul/SOUL.md").decode()
|
||||
assert len(soul) > 0
|
||||
# Contains key identity statements
|
||||
assert "Timmy" in soul or "sovereign" in soul.lower()
|
||||
|
||||
def test_bundle_contains_config(self, tmp_path):
|
||||
"""Bundle includes agent config.yaml."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
assert "sov/config/config.yaml" in zf.namelist()
|
||||
cfg = zf.read("sov/config/config.yaml").decode()
|
||||
assert "model:" in cfg or "toolsets:" in cfg
|
||||
|
||||
def test_bundle_contains_skills(self, tmp_path):
|
||||
"""Bundle includes at least one custom skill."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
skill_files = [n for n in zf.namelist() if n.startswith("sov/skills/") and n.endswith(".py")]
|
||||
# May be zero if no custom skills exist; just check keys exist
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
assert "skills" in manifest["components"]
|
||||
|
||||
def test_bundle_metadata_is_valid_json(self, tmp_path):
|
||||
"""META.json is present and contains required fields."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
meta = json.loads(zf.read("sov/META.json"))
|
||||
assert meta["format"] == "sov"
|
||||
assert meta["format_version"] == "1.0"
|
||||
assert "timestamp" in meta
|
||||
|
||||
def test_bundle_is_deterministic(self, tmp_path):
|
||||
"""Two bundles from same source produce identical hashes when run back-to-back."""
|
||||
out1 = tmp_path / "a.sov"
|
||||
out2 = tmp_path / "b.sov"
|
||||
import time
|
||||
create_bundle(str(out1))
|
||||
time.sleep(1.1) # Ensure distinct timestamp
|
||||
create_bundle(str(out2))
|
||||
|
||||
with zipfile.ZipFile(out1) as zf:
|
||||
mf1 = json.loads(zf.read("sov/manifest.json"))
|
||||
with zipfile.ZipFile(out2) as zf:
|
||||
mf2 = json.loads(zf.read("sov/manifest.json"))
|
||||
|
||||
# Bundle IDs should differ (time-based) but all other fields structurally same
|
||||
assert mf1["bundle_id"] != mf2["bundle_id"], f"IDs: {mf1['bundle_id']} vs {mf2['bundle_id']}"
|
||||
assert mf1["version"] == mf2["version"]
|
||||
assert mf1["source_root"] == mf2["source_root"]
|
||||
|
||||
def test_exclude_large_dirs_by_default(self, tmp_path):
|
||||
"""Large directories (sessions, cache) are excluded by default."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
# Check that sessions dir is NOT included when include_sessions=False
|
||||
session_entries = [n for n in names if "/sessions/" in n]
|
||||
assert len(session_entries) == 0
|
||||
|
||||
def test_bundle_hash_is_sha256(self, tmp_path):
|
||||
"""Returned hash is valid SHA-256 hex string."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
h = result["hash"]
|
||||
assert len(h) == 64
|
||||
# Validate hex
|
||||
int(h, 16) # raises if not valid hex
|
||||
|
||||
|
||||
class TestBundleManifest:
|
||||
"""Validate manifest structure and completeness."""
|
||||
|
||||
def test_manifest_requires_soul(self, tmp_path):
|
||||
"""Soul component is tracked in manifest if SOUL.md exists."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
comp = result["manifest"].get("components", {})
|
||||
# If SOUL.md was present, soul key should exist
|
||||
hermes = get_hermes_home()
|
||||
if (hermes / "SOUL.md").exists():
|
||||
assert "soul" in comp
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-q"])
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for claim_annotator.py — verifies source distinction is present."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
|
||||
|
||||
|
||||
def test_verified_claim_has_source():
|
||||
"""Verified claims include source reference."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
response = "Paris is the capital of France. It is a beautiful city."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert len(result.claims) > 0
|
||||
verified_claims = [c for c in result.claims if c.source_type == "verified"]
|
||||
assert len(verified_claims) == 1
|
||||
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
|
||||
assert "[V]" in result.rendered_text
|
||||
assert "[source:" in result.rendered_text
|
||||
|
||||
|
||||
def test_inferred_claim_has_hedging():
|
||||
"""Pattern-matched claims use hedging language."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "The weather is nice today. It might rain tomorrow."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
|
||||
assert len(inferred_claims) >= 1
|
||||
# Check that rendered text has [I] marker
|
||||
assert "[I]" in result.rendered_text
|
||||
# Check that unhedged inferred claims get hedging
|
||||
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
|
||||
|
||||
|
||||
def test_hedged_claim_not_double_hedged():
|
||||
"""Claims already with hedging are not double-hedged."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "I think the sky is blue. It is a nice day."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
# The "I think" claim should not become "I think I think ..."
|
||||
assert "I think I think" not in result.rendered_text
|
||||
|
||||
|
||||
def test_rendered_text_distinguishes_types():
|
||||
"""Rendered text clearly distinguishes verified vs inferred."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Earth is round": "https://science.org/earth"}
|
||||
response = "Earth is round. Stars are far away."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert "[V]" in result.rendered_text # verified marker
|
||||
assert "[I]" in result.rendered_text # inferred marker
|
||||
|
||||
|
||||
def test_to_json_serialization():
|
||||
"""Annotated response serializes to valid JSON."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "Test claim."
|
||||
result = annotator.annotate_claims(response)
|
||||
json_str = annotator.to_json(result)
|
||||
parsed = json.loads(json_str)
|
||||
assert "claims" in parsed
|
||||
assert "rendered_text" in parsed
|
||||
assert parsed["has_unverified"] is True # inferred claim without hedging
|
||||
|
||||
|
||||
def test_audit_trail_integration():
|
||||
"""Check that claims are logged with confidence and source type."""
|
||||
# This test verifies the audit trail integration point
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"AI is useful": "https://example.com/ai"}
|
||||
response = "AI is useful. It can help with tasks."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
for claim in result.claims:
|
||||
assert claim.source_type in ("verified", "inferred")
|
||||
assert claim.confidence in ("high", "medium", "low", "unknown")
|
||||
if claim.source_type == "verified":
|
||||
assert claim.source_ref is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_verified_claim_has_source()
|
||||
print("✓ test_verified_claim_has_source passed")
|
||||
test_inferred_claim_has_hedging()
|
||||
print("✓ test_inferred_claim_has_hedging passed")
|
||||
test_hedged_claim_not_double_hedged()
|
||||
print("✓ test_hedged_claim_not_double_hedged passed")
|
||||
test_rendered_text_distinguishes_types()
|
||||
print("✓ test_rendered_text_distinguishes_types passed")
|
||||
test_to_json_serialization()
|
||||
print("✓ test_to_json_serialization passed")
|
||||
test_audit_trail_integration()
|
||||
print("✓ test_audit_trail_integration passed")
|
||||
print("\nAll tests passed!")
|
||||
@@ -1,384 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sovereign Bundle Format Reference Implementation
|
||||
timmy-home #467 — [FRONTIER] Develop "Sovereign Bundle" (.sov) Export/Import Logic
|
||||
|
||||
.sov format: ZIP-based archive with a verifiable manifest.
|
||||
Structure:
|
||||
sov/
|
||||
manifest.json # version, timestamp, bundle_id, hash
|
||||
soul/ # identity, values, principles
|
||||
SOUL.md
|
||||
config/ # agent configuration
|
||||
config.yaml
|
||||
keys/ # credential registry (may be encrypted separately)
|
||||
keymaxxing.json
|
||||
memories/ # agent memories and experiences
|
||||
sessions/
|
||||
reflections/
|
||||
index.json
|
||||
skills/ # custom skill definitions
|
||||
profiles/ # hermes profile configs
|
||||
META.json # export metadata (agent, timestamp, source)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import hashlib
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
|
||||
def get_hermes_home() -> Path:
|
||||
"""Resolve HERMES_HOME from environment or default."""
|
||||
hermes_home = os.getenv("HERMES_HOME")
|
||||
if hermes_home:
|
||||
return Path(hermes_home).expanduser()
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def compute_bundle_hash(data: bytes) -> str:
|
||||
"""SHA-256 hash of bundle contents for integrity verification."""
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def collect_bundle_metadata() -> Dict[str, Any]:
|
||||
"""Collect system and environment metadata for the bundle."""
|
||||
return {
|
||||
"hostname": os.uname().nodename if hasattr(os, 'uname') else "unknown",
|
||||
"platform": sys.platform,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"hermes_home": str(get_hermes_home()),
|
||||
}
|
||||
|
||||
|
||||
def should_include(path: Path, relative: Path) -> bool:
|
||||
"""Determine if a path should be included in the bundle."""
|
||||
# Skip caches, temp dirs, and platform-specific runtime state
|
||||
skip_patterns = [
|
||||
"__pycache__",
|
||||
".pyc", ".pyo",
|
||||
".git/",
|
||||
".pytest_cache",
|
||||
".venv",
|
||||
"node_modules",
|
||||
"/cache/",
|
||||
"/tmp/",
|
||||
"logs/",
|
||||
"checkpoints/",
|
||||
"sandboxes/",
|
||||
"vps-backups/",
|
||||
]
|
||||
path_str = str(relative)
|
||||
for pat in skip_patterns:
|
||||
if pat in path_str:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def create_bundle(output_path: str,
|
||||
hermes_home: Optional[Path] = None,
|
||||
include_sessions: bool = False,
|
||||
compression: int = zipfile.ZIP_DEFLATED) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a .sov bundle at output_path.
|
||||
|
||||
Params:
|
||||
output_path: Path to write the .sov file
|
||||
hermes_home: Override HERMES_HOME source (default: env)
|
||||
include_sessions: If True, bundle full session transcripts (heavy)
|
||||
compression: ZIP compression level
|
||||
|
||||
Returns:
|
||||
Dict with bundle_id, file_size, hash, item_count
|
||||
"""
|
||||
source_root = hermes_home or get_hermes_home()
|
||||
output = Path(output_path)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
bundle_id = f"sov-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"
|
||||
items_written = 0
|
||||
manifest = {
|
||||
"version": "1.0",
|
||||
"bundle_id": bundle_id,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"source_root": str(source_root),
|
||||
"components": {},
|
||||
"entries": [],
|
||||
}
|
||||
|
||||
metadata = collect_bundle_metadata()
|
||||
|
||||
with zipfile.ZipFile(output, 'w', compression=compression) as zf:
|
||||
# Write META.json
|
||||
meta_data = {
|
||||
**metadata,
|
||||
"bundle_id": bundle_id,
|
||||
"format": "sov",
|
||||
"format_version": "1.0",
|
||||
}
|
||||
zf.writestr("sov/META.json", json.dumps(meta_data, indent=2))
|
||||
items_written += 1
|
||||
|
||||
# Soul — identity (SOUL.md)
|
||||
soul_src = source_root / "SOUL.md"
|
||||
if soul_src.exists():
|
||||
content = soul_src.read_text()
|
||||
zf.writestr("sov/soul/SOUL.md", content)
|
||||
manifest["components"]["soul"] = {"SOUL.md": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Config — agent configuration
|
||||
config_src = source_root / "config.yaml"
|
||||
if config_src.exists():
|
||||
content = config_src.read_text()
|
||||
zf.writestr("sov/config/config.yaml", content)
|
||||
manifest["components"]["config"] = {"config.yaml": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Keys — credential registry (encrypted or placeholder)
|
||||
keys_src = source_root / "keymaxxing" / "registry.json"
|
||||
if keys_src.exists():
|
||||
content = keys_src.read_text()
|
||||
zf.writestr("sov/keys/keymaxxing.json", content)
|
||||
manifest["components"]["keys"] = {"keymaxxing.json": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Memories — reflections (lightweight learnings)
|
||||
refl_dir = source_root / "reflections"
|
||||
if refl_dir.exists():
|
||||
refl_files = list(refl_dir.glob("*.md")) + list(refl_dir.glob("*.json"))
|
||||
for rf in refl_files:
|
||||
if should_include(rf, rf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/reflections/{rf.name}"
|
||||
content = rf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["memories"] = {
|
||||
"reflections": {"count": len(refl_files)}
|
||||
}
|
||||
|
||||
# MemPalace — small memory store (~500KB)
|
||||
mp_dir = source_root / "mempalace"
|
||||
if mp_dir.exists():
|
||||
mp_files = list(mp_dir.rglob("*"))
|
||||
mp_count = 0
|
||||
for mf in mp_files:
|
||||
if mf.is_file() and should_include(mf, mf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/mempalace/{mf.relative_to(mp_dir)}"
|
||||
content = mf.read_bytes()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
mp_count += 1
|
||||
manifest["components"]["memories"]["mempalace"] = {"count": mp_count}
|
||||
|
||||
# Timmy world/agent files (~2KB) — agent identity in the Evennia world
|
||||
timmy_dir = source_root / "timmy"
|
||||
if timmy_dir.exists():
|
||||
timmy_files = list(timmy_dir.rglob("*"))
|
||||
for tf in timmy_files:
|
||||
if tf.is_file() and should_include(tf, tf.relative_to(source_root)):
|
||||
arcname = f"sov/timmy/{tf.relative_to(timmy_dir)}"
|
||||
content = tf.read_bytes()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["timmy"] = {"files": len(timmy_files)}
|
||||
|
||||
# Sessions — optionally include transcripts (can be large)
|
||||
if include_sessions:
|
||||
sess_dir = source_root / "sessions"
|
||||
if sess_dir.exists():
|
||||
sess_files = list(sess_dir.glob("*.jsonl")) + list(sess_dir.glob("*.json"))
|
||||
for sf in sess_files:
|
||||
if should_include(sf, sf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/sessions/{sf.name}"
|
||||
content = sf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["memories"]["sessions"] = {"count": len(sess_files)}
|
||||
|
||||
# Skills — custom skill definitions (user-authored)
|
||||
skills_dir = source_root / "skills"
|
||||
if skills_dir.exists():
|
||||
for skill_path in skills_dir.rglob("*.py"):
|
||||
if not skill_path.name.startswith('.') and should_include(skill_path, skill_path.relative_to(source_root)):
|
||||
arcname = f"sov/skills/{skill_path.relative_to(skills_dir)}"
|
||||
content = skill_path.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
# Count custom skills (exclude built-in categories)
|
||||
skill_count = sum(1 for _ in skills_dir.rglob("*.py")
|
||||
if not _.name.startswith('.') and should_include(_, _.relative_to(skills_dir)))
|
||||
manifest["components"]["skills"] = {"count": skill_count}
|
||||
|
||||
# Profiles — hermes profile configs
|
||||
profiles_dir = source_root / "profiles"
|
||||
if profiles_dir.exists():
|
||||
for pf in profiles_dir.glob("*.yaml"):
|
||||
if should_include(pf, pf.relative_to(source_root)):
|
||||
arcname = f"sov/profiles/{pf.name}"
|
||||
content = pf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
profile_count = sum(1 for _ in profiles_dir.glob("*.yaml") if should_include(_, _.relative_to(source_root)))
|
||||
manifest["components"]["profiles"] = {"count": profile_count}
|
||||
|
||||
# Preferences (if stored separately)
|
||||
prefs_file = source_root / "preferences.json"
|
||||
if prefs_file.exists():
|
||||
content = prefs_file.read_text()
|
||||
zf.writestr("sov/config/preferences.json", content)
|
||||
items_written += 1
|
||||
|
||||
# Write manifest.json
|
||||
zf.writestr("sov/manifest.json", json.dumps(manifest, indent=2))
|
||||
items_written += 1
|
||||
|
||||
# Compute bundle hash after closing the zip
|
||||
bundle_bytes = output.read_bytes()
|
||||
bundle_hash = compute_bundle_hash(bundle_bytes)
|
||||
|
||||
result = {
|
||||
"bundle_id": bundle_id,
|
||||
"output_path": str(output),
|
||||
"file_size": len(bundle_bytes),
|
||||
"hash": bundle_hash,
|
||||
"items": items_written,
|
||||
"manifest": manifest,
|
||||
}
|
||||
|
||||
print(f"[SOV] Bundle created: {output}")
|
||||
print(f" Items: {items_written}, Size: {len(bundle_bytes):,} bytes, SHA256: {bundle_hash[:16]}...")
|
||||
return result
|
||||
|
||||
|
||||
def verify_bundle(bundle_path: str) -> Dict[str, Any]:
|
||||
"""Verify a .sov bundle integrity and manifest."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
# Read manifest
|
||||
try:
|
||||
mf_bytes = zf.read("sov/manifest.json")
|
||||
manifest = json.loads(mf_bytes)
|
||||
except KeyError:
|
||||
raise ValueError("Invalid .sov bundle: missing sov/manifest.json")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid manifest JSON: {e}")
|
||||
|
||||
items = len(zf.namelist())
|
||||
computed_hash = compute_bundle_hash(Path(bundle_path).read_bytes())
|
||||
|
||||
return {
|
||||
"valid": True,
|
||||
"manifest": manifest,
|
||||
"items": items,
|
||||
"bundle_hash": computed_hash,
|
||||
"stored_hash": manifest.get("hash"),
|
||||
}
|
||||
|
||||
|
||||
def restore_bundle(bundle_path: str,
|
||||
target_root: Optional[Path] = None,
|
||||
dry_run: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Restore a .sov bundle to target_root or HERMES_HOME.
|
||||
|
||||
Params:
|
||||
bundle_path: Path to .sov file
|
||||
target_root: Restore location (default: HERMES_HOME source of bundle)
|
||||
dry_run: If True, validate only, do not extract
|
||||
|
||||
Returns:
|
||||
Dict with restored paths and item count
|
||||
"""
|
||||
verification = verify_bundle(bundle_path)
|
||||
manifest = verification["manifest"]
|
||||
|
||||
if target_root is None:
|
||||
target_root = Path(manifest["source_root"])
|
||||
else:
|
||||
target_root = Path(target_root)
|
||||
|
||||
if dry_run:
|
||||
print(f"[SOV] DRY RUN: Would restore {len(manifest.get('entries', []))} items to {target_root}")
|
||||
return {"dry_run": True, "would_restore": len(verification["items"])}
|
||||
|
||||
restored = []
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
for name in zf.namelist():
|
||||
# Safety: only extract sov/ namespace
|
||||
if not name.startswith("sov/"):
|
||||
continue
|
||||
rel = name[4:] # strip sov/
|
||||
dest = target_root / rel
|
||||
|
||||
# Skip manifest itself - used for tracking only
|
||||
if rel == "manifest.json":
|
||||
continue
|
||||
|
||||
# Create parent dirs
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Extract and write
|
||||
data = zf.read(name)
|
||||
dest.write_bytes(data)
|
||||
restored.append(rel)
|
||||
|
||||
print(f"[SOV] Restored {len(restored)} items to {target_root}")
|
||||
return {
|
||||
"restored": restored,
|
||||
"count": len(restored),
|
||||
"target": str(target_root),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
p = argparse.ArgumentParser(description="Sovereign Bundle (.sov) export/import tool")
|
||||
sub = p.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
# Export
|
||||
exp = sub.add_parser("export", help="Create a .sov bundle")
|
||||
exp.add_argument("-o", "--output", default="timmy-sovereign-bundle.sov",
|
||||
help="Output path for .sov file")
|
||||
exp.add_argument("--include-sessions", action="store_true",
|
||||
help="Include full session transcripts (larger bundle)")
|
||||
exp.add_argument("--hermes-home", type=str,
|
||||
help="Override HERMES_HOME source")
|
||||
|
||||
# Import / restore
|
||||
imp = sub.add_parser("import", help="Restore from a .sov bundle")
|
||||
imp.add_argument("bundle", help="Path to .sov file")
|
||||
imp.add_argument("-t", "--target", help="Restore target (default: bundle's source)")
|
||||
imp.add_argument("--dry-run", action="store_true", help="Validate only")
|
||||
|
||||
# Verify
|
||||
ver = sub.add_parser("verify", help="Verify bundle integrity")
|
||||
ver.add_argument("bundle", help="Path to .sov file")
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
if args.cmd == "export":
|
||||
result = create_bundle(
|
||||
output_path=args.output,
|
||||
hermes_home=Path(args.hermes_home).expanduser() if args.hermes_home else None,
|
||||
include_sessions=args.include_sessions,
|
||||
)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.cmd == "import":
|
||||
result = restore_bundle(args.bundle, Path(args.target) if args.target else None,
|
||||
dry_run=args.dry_run)
|
||||
print(json.dumps(result, indent=2) if not args.dry_run else None)
|
||||
|
||||
elif args.cmd == "verify":
|
||||
info = verify_bundle(args.bundle)
|
||||
print(f"Bundle: {args.bundle}")
|
||||
print(f" Valid: {info['valid']}")
|
||||
print(f" Items: {info['items']}")
|
||||
print(f" Hash: {info['bundle_hash']}")
|
||||
print(f" Manifest version: {info['manifest'].get('version')}")
|
||||
@@ -1,182 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Restore agent state from a Sovereign Bundle (.sov) file.
|
||||
|
||||
Usage:
|
||||
python restore_sov_bundle.py <bundle.sov> [--target ~/.hermes] [--dry-run]
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import zipfile
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def get_hermes_home() -> Path:
|
||||
hermes_home = os.getenv("HERMES_HOME")
|
||||
if hermes_home:
|
||||
return Path(hermes_home).expanduser()
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def verify_bundle(bundle_path: str) -> dict:
|
||||
"""Verify .sov bundle integrity and return manifest."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
# Require manifest
|
||||
try:
|
||||
mf = json.loads(zf.read("sov/manifest.json"))
|
||||
except KeyError:
|
||||
raise ValueError("Not a valid .sov bundle: missing sov/manifest.json")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Manifest JSON decode error: {e}")
|
||||
|
||||
return {
|
||||
"valid": True,
|
||||
"entries": zf.namelist(),
|
||||
"manifest": mf,
|
||||
"size": Path(bundle_path).stat().st_size,
|
||||
}
|
||||
|
||||
|
||||
def restore_bundle(bundle_path: str,
|
||||
target_root: Path = None,
|
||||
dry_run: bool = False) -> dict:
|
||||
"""
|
||||
Extract a .sov bundle to target_root.
|
||||
|
||||
Safety: Only extracts files under sov/ namespace.
|
||||
Does not overwrite existing files by default? (could add --force)
|
||||
"""
|
||||
bundle = Path(bundle_path)
|
||||
if not bundle.exists():
|
||||
raise FileNotFoundError(f"Bundle not found: {bundle_path}")
|
||||
|
||||
info = verify_bundle(bundle_path)
|
||||
manifest = info["manifest"]
|
||||
|
||||
src_root = Path(manifest["source_root"])
|
||||
if target_root is None:
|
||||
target_root = src_root
|
||||
else:
|
||||
target_root = Path(target_root)
|
||||
|
||||
print(f"[SOV] Bundle: {bundle_path}")
|
||||
print(f" Source: {src_root}")
|
||||
print(f" Target: {target_root}")
|
||||
print(f" Created: {manifest.get('created_at')}")
|
||||
print(f" Version: {manifest.get('version')}")
|
||||
|
||||
if dry_run:
|
||||
sov_entries = [n for n in info["entries"] if n.startswith("sov/") and n != "sov/manifest.json"]
|
||||
print(f" DRY RUN: Would restore {len(sov_entries)} items")
|
||||
return {"dry_run": True, "count": len(sov_entries)}
|
||||
|
||||
restored = []
|
||||
errors = []
|
||||
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
for name in sorted(zf.namelist()):
|
||||
if not name.startswith("sov/"):
|
||||
continue
|
||||
if name == "sov/manifest.json":
|
||||
continue # Tracked separately
|
||||
|
||||
rel = name[4:] # strip sov/
|
||||
dest = target_root / rel
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
data = zf.read(name)
|
||||
dest.write_bytes(data)
|
||||
restored.append(rel)
|
||||
except Exception as e:
|
||||
errors.append((rel, str(e)))
|
||||
|
||||
print(f"\n[SOV] Restored {len(restored)} files to {target_root}")
|
||||
if errors:
|
||||
print(f" Errors: {len(errors)}")
|
||||
for path, err in errors:
|
||||
print(f" ✗ {path}: {err}")
|
||||
|
||||
# Print a summary of restored components
|
||||
comp = manifest.get("components", {})
|
||||
for comp_name, details in comp.items():
|
||||
if isinstance(details, dict) and "count" in details:
|
||||
print(f" {comp_name}: {details['count']}")
|
||||
elif isinstance(details, dict):
|
||||
print(f" {comp_name}: {', '.join(details.keys())}")
|
||||
|
||||
return {
|
||||
"restored": restored,
|
||||
"count": len(restored),
|
||||
"errors": errors,
|
||||
"target": str(target_root),
|
||||
}
|
||||
|
||||
|
||||
def list_entries(bundle_path: str) -> None:
|
||||
"""List all entries in a .sov bundle with sizes."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
entries = sorted([n for n in zf.namelist() if n != "sov/manifest.json"])
|
||||
|
||||
print(f"Bundle ID: {manifest.get('bundle_id')}")
|
||||
print(f"Version: {manifest.get('version')}")
|
||||
print(f"Created: {manifest.get('created_at')}")
|
||||
print(f"Source: {manifest.get('source_root')}")
|
||||
print(f"\nContents ({len(entries)} entries):\n")
|
||||
|
||||
by_category = {}
|
||||
for e in entries:
|
||||
cat = e.split('/')[1] if len(e.split('/')) > 1 else 'root'
|
||||
by_category.setdefault(cat, []).append(e)
|
||||
|
||||
for cat in sorted(by_category):
|
||||
print(f" [{cat}]")
|
||||
for e in by_category[cat]:
|
||||
info = zf.getinfo(e)
|
||||
print(f" {e} ({info.file_size:,} bytes)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
p = argparse.ArgumentParser(description="Restore Sovereign Bundle (.sov)")
|
||||
p.add_argument("bundle", nargs="?", help="Path to .sov file")
|
||||
p.add_argument("--target", "-t", type=str, help="Restore target directory")
|
||||
p.add_argument("--dry-run", action="store_true", help="Validate without extracting")
|
||||
p.add_argument("--list", "-l", action="store_true", help="List bundle contents")
|
||||
p.add_argument("--yes", "-y", action="store_true", help="Skip confirmation prompt")
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
if args.list:
|
||||
if not args.bundle:
|
||||
print("Usage: restore_sov_bundle.py --list <bundle.sov>")
|
||||
sys.exit(1)
|
||||
list_entries(args.bundle)
|
||||
sys.exit(0)
|
||||
|
||||
if not args.bundle:
|
||||
p.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
bundle_path = args.bundle
|
||||
if not Path(bundle_path).exists():
|
||||
print(f"Error: Bundle not found: {bundle_path}")
|
||||
sys.exit(1)
|
||||
|
||||
target = Path(args.target) if args.target else None
|
||||
|
||||
# Safety prompt unless dry-run or --yes
|
||||
if not args.dry_run and not args.yes:
|
||||
t = target or get_hermes_home()
|
||||
resp = input(f"Restore to {t}? [y/N] ").strip().lower()
|
||||
if resp != 'y':
|
||||
print("Aborted.")
|
||||
sys.exit(0)
|
||||
|
||||
result = restore_bundle(bundle_path, target_root=target, dry_run=args.dry_run)
|
||||
if result.get("errors"):
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user