Compare commits
4 Commits
step35/666
...
fix/519
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
212a5befba | ||
|
|
1eaf6df7d0 | ||
|
|
735fafa235 | ||
|
|
e53d54f873 |
20
SOUL.md
20
SOUL.md
@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
|
||||
|
||||
---
|
||||
|
||||
## What Honesty Requires
|
||||
|
||||
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
|
||||
|
||||
The following are not optional. Any implementation that calls itself Timmy must build them.
|
||||
|
||||
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
|
||||
|
||||
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
|
||||
|
||||
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
|
||||
|
||||
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
|
||||
|
||||
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
|
||||
|
||||
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
|
||||
|
||||
---
|
||||
|
||||
## For the Audio Overview
|
||||
|
||||
If you are listening to this as a NotebookLM Audio Overview:
|
||||
|
||||
18
configs/burn_velocity_repos.json
Normal file
18
configs/burn_velocity_repos.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
"owner": "Timmy_Foundation",
|
||||
"repos": [
|
||||
"timmy-home",
|
||||
"timmy-config",
|
||||
"fleet-ops",
|
||||
"the-beacon",
|
||||
"the-door",
|
||||
"the-nexus"
|
||||
],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6
|
||||
}
|
||||
}
|
||||
70
docs/BURN_VELOCITY_TRACKING.md
Normal file
70
docs/BURN_VELOCITY_TRACKING.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Burn-down Velocity Tracking
|
||||
|
||||
Refs #519.
|
||||
|
||||
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
|
||||
|
||||
## What it emits
|
||||
|
||||
Daily run outputs:
|
||||
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
|
||||
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
|
||||
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
|
||||
|
||||
Tracked repos live in `configs/burn_velocity_repos.json`.
|
||||
|
||||
## Cron command
|
||||
|
||||
```bash
|
||||
cd ~/timmy-home && \
|
||||
python3 scripts/burn_velocity_tracker.py \
|
||||
--config configs/burn_velocity_repos.json \
|
||||
--output-json ~/.timmy/burn-velocity/latest.json \
|
||||
--output-md ~/.timmy/burn-velocity/latest.md \
|
||||
--history-file ~/.timmy/burn-velocity/history.json \
|
||||
--write-history
|
||||
```
|
||||
|
||||
Example crontab entry:
|
||||
|
||||
```cron
|
||||
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
|
||||
```
|
||||
|
||||
## Dashboard handoff
|
||||
|
||||
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
|
||||
- `open_now`
|
||||
- `opened_last_7d`
|
||||
- `closed_last_7d`
|
||||
- `baseline_closed`
|
||||
- `weekly_net`
|
||||
- `alert.status`
|
||||
- `alert.kind`
|
||||
- `alert.reason`
|
||||
|
||||
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
|
||||
|
||||
## Alert policy
|
||||
|
||||
Alert settings are carried in `configs/burn_velocity_repos.json`:
|
||||
- `recent_days`
|
||||
- `baseline_days`
|
||||
- `minimum_baseline_closed`
|
||||
- `drop_ratio`
|
||||
|
||||
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
|
||||
|
||||
## Gitea API contract
|
||||
|
||||
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
|
||||
|
||||
Live collection shape:
|
||||
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
|
||||
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
|
||||
|
||||
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
|
||||
|
||||
## Honest scope boundary
|
||||
|
||||
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.
|
||||
406
scripts/burn_velocity_tracker.py
Normal file
406
scripts/burn_velocity_tracker.py
Normal file
@@ -0,0 +1,406 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
|
||||
|
||||
Refs: timmy-home #519
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from datetime import date, datetime, time, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import parse, request
|
||||
from base64 import b64encode
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
|
||||
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
|
||||
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
|
||||
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
|
||||
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
|
||||
DEFAULT_CONFIG = {
|
||||
"owner": DEFAULT_OWNER,
|
||||
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def parse_iso8601(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
normalized = value.replace("Z", "+00:00")
|
||||
parsed = datetime.fromisoformat(normalized)
|
||||
if parsed.tzinfo is None:
|
||||
return parsed.replace(tzinfo=timezone.utc)
|
||||
return parsed.astimezone(timezone.utc)
|
||||
|
||||
|
||||
def normalize_today(value: str | date | None = None) -> date:
|
||||
if value is None:
|
||||
return datetime.now(timezone.utc).date()
|
||||
if isinstance(value, date):
|
||||
return value
|
||||
return date.fromisoformat(value)
|
||||
|
||||
|
||||
def build_day_window(today: date, lookback_days: int) -> list[date]:
|
||||
start = today - timedelta(days=lookback_days - 1)
|
||||
return [start + timedelta(days=offset) for offset in range(lookback_days)]
|
||||
|
||||
|
||||
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
return [item for item in items if not item.get("pull_request")]
|
||||
|
||||
|
||||
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
|
||||
days = build_day_window(today, lookback_days)
|
||||
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
|
||||
start_day = days[0]
|
||||
|
||||
for item in filter_issue_items(items):
|
||||
created_at = parse_iso8601(item.get("created_at"))
|
||||
if created_at is not None:
|
||||
created_day = created_at.date()
|
||||
if start_day <= created_day <= today:
|
||||
counts[created_day.isoformat()]["opened"] += 1
|
||||
|
||||
closed_at = parse_iso8601(item.get("closed_at"))
|
||||
if closed_at is not None:
|
||||
closed_day = closed_at.date()
|
||||
if start_day <= closed_day <= today:
|
||||
counts[closed_day.isoformat()]["closed"] += 1
|
||||
|
||||
return [
|
||||
{
|
||||
"date": day.isoformat(),
|
||||
"opened": counts[day.isoformat()]["opened"],
|
||||
"closed": counts[day.isoformat()]["closed"],
|
||||
}
|
||||
for day in days
|
||||
]
|
||||
|
||||
|
||||
def summarize_velocity_alert(
|
||||
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
|
||||
drop_ratio = float(config.get("drop_ratio", 0.6))
|
||||
|
||||
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
|
||||
return {
|
||||
"status": "drop",
|
||||
"kind": "velocity_drop",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": (
|
||||
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
|
||||
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
|
||||
),
|
||||
}
|
||||
|
||||
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
|
||||
return {
|
||||
"status": "drop",
|
||||
"kind": "velocity_drop",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
|
||||
}
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"kind": "none",
|
||||
"recent_closed": recent_closed,
|
||||
"baseline_closed": baseline_closed,
|
||||
"reason": "velocity stable",
|
||||
}
|
||||
|
||||
|
||||
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
|
||||
if days <= 0:
|
||||
return 0
|
||||
return sum(int(item[field]) for item in daily[-days:])
|
||||
|
||||
|
||||
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
|
||||
if baseline_days <= 0:
|
||||
return 0
|
||||
if recent_days <= 0:
|
||||
return sum(int(item["closed"]) for item in daily[-baseline_days:])
|
||||
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
|
||||
return sum(int(item["closed"]) for item in baseline_slice)
|
||||
|
||||
|
||||
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
|
||||
report_day = normalize_today(today)
|
||||
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
owner = config.get("owner", DEFAULT_OWNER)
|
||||
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
|
||||
lookback_days = int(config.get("lookback_days", 14))
|
||||
alert_config = dict(DEFAULT_CONFIG["alert"])
|
||||
alert_config.update(config.get("alert") or {})
|
||||
recent_days = int(alert_config.get("recent_days", 7))
|
||||
baseline_days = int(alert_config.get("baseline_days", 7))
|
||||
|
||||
repo_reports: list[dict[str, Any]] = []
|
||||
total_open_now = 0
|
||||
total_closed_last_7d = 0
|
||||
repos_with_alerts: list[str] = []
|
||||
|
||||
for repo_name in repos:
|
||||
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
|
||||
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
|
||||
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
|
||||
daily = build_daily_series(recent_issues, report_day, lookback_days)
|
||||
|
||||
open_now = len(open_issues)
|
||||
opened_last_7d = _sum_window(daily, "opened", recent_days)
|
||||
closed_last_7d = _sum_window(daily, "closed", recent_days)
|
||||
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
|
||||
weekly_net = opened_last_7d - closed_last_7d
|
||||
alert = summarize_velocity_alert(
|
||||
recent_closed=closed_last_7d,
|
||||
baseline_closed=baseline_closed,
|
||||
open_now=open_now,
|
||||
config=alert_config,
|
||||
)
|
||||
|
||||
repo_report = {
|
||||
"repo": repo_name,
|
||||
"open_now": open_now,
|
||||
"opened_last_7d": opened_last_7d,
|
||||
"closed_last_7d": closed_last_7d,
|
||||
"baseline_closed": baseline_closed,
|
||||
"weekly_net": weekly_net,
|
||||
"daily": daily,
|
||||
"alert": alert,
|
||||
}
|
||||
repo_reports.append(repo_report)
|
||||
|
||||
total_open_now += open_now
|
||||
total_closed_last_7d += closed_last_7d
|
||||
if alert["status"] != "ok":
|
||||
repos_with_alerts.append(repo_name)
|
||||
|
||||
return {
|
||||
"owner": owner,
|
||||
"generated_at": generated_at,
|
||||
"generated_day": report_day.isoformat(),
|
||||
"lookback_days": lookback_days,
|
||||
"dashboard_contract_version": 1,
|
||||
"repos": repo_reports,
|
||||
"summary": {
|
||||
"total_open_now": total_open_now,
|
||||
"total_closed_last_7d": total_closed_last_7d,
|
||||
"repos_with_alerts": repos_with_alerts,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def render_markdown(report: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
"# Burn-down Velocity Tracking",
|
||||
"",
|
||||
f"Generated: {report['generated_at']}",
|
||||
f"Owner: {report['owner']}",
|
||||
f"Lookback days: {report['lookback_days']}",
|
||||
"",
|
||||
"## Per-repo velocity",
|
||||
"",
|
||||
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
|
||||
"| --- | ---: | ---: | ---: | ---: | --- |",
|
||||
]
|
||||
|
||||
for repo in report["repos"]:
|
||||
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
|
||||
lines.append(
|
||||
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
|
||||
)
|
||||
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"## Dashboard handoff for timmy-config",
|
||||
"",
|
||||
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
|
||||
"- `open_now`",
|
||||
"- `opened_last_7d`",
|
||||
"- `closed_last_7d`",
|
||||
"- `baseline_closed`",
|
||||
"- `alert.status` / `alert.kind` / `alert.reason`",
|
||||
"",
|
||||
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
|
||||
"",
|
||||
"## Alerts",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
|
||||
if not alerts:
|
||||
lines.append("- none")
|
||||
else:
|
||||
for repo in alerts:
|
||||
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
|
||||
if history_path.exists():
|
||||
history = json.loads(history_path.read_text(encoding="utf-8"))
|
||||
else:
|
||||
history = {"days": []}
|
||||
|
||||
entry = {
|
||||
"date": report["generated_day"],
|
||||
"generated_at": report["generated_at"],
|
||||
"summary": report["summary"],
|
||||
"repos": report["repos"],
|
||||
}
|
||||
|
||||
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
|
||||
retained.append(entry)
|
||||
retained.sort(key=lambda item: item["date"])
|
||||
history["days"] = retained
|
||||
|
||||
history_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
|
||||
return history
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def _headers(self) -> list[dict[str, str]]:
|
||||
return [
|
||||
{"Authorization": f"token {self.token}", "Accept": "application/json"},
|
||||
{
|
||||
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
|
||||
"Accept": "application/json",
|
||||
},
|
||||
]
|
||||
|
||||
def _request_json(self, url: str) -> list[dict[str, Any]]:
|
||||
last_error: Exception | None = None
|
||||
for headers in self._headers():
|
||||
try:
|
||||
req = request.Request(url, headers=headers)
|
||||
with request.urlopen(req, timeout=30) as response:
|
||||
return json.loads(response.read().decode())
|
||||
except Exception as exc: # pragma: no cover - exercised only on live API failure
|
||||
last_error = exc
|
||||
if last_error is None: # pragma: no cover - defensive
|
||||
raise RuntimeError("request failed without an exception")
|
||||
raise last_error
|
||||
|
||||
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
|
||||
issues: list[dict[str, Any]] = []
|
||||
page = 1
|
||||
while True:
|
||||
query = {"state": state, "type": "issues", "limit": 100, "page": page}
|
||||
if since:
|
||||
query["since"] = since
|
||||
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
|
||||
batch = self._request_json(url)
|
||||
if not batch:
|
||||
break
|
||||
issues.extend(filter_issue_items(batch))
|
||||
page += 1
|
||||
return issues
|
||||
|
||||
|
||||
def load_json(path: Path, default: Any) -> Any:
|
||||
if not path.exists():
|
||||
return default
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def load_config(path: Path) -> dict[str, Any]:
|
||||
config = dict(DEFAULT_CONFIG)
|
||||
alert = dict(DEFAULT_CONFIG["alert"])
|
||||
raw = load_json(path, {})
|
||||
config.update(raw)
|
||||
alert.update(raw.get("alert") or {})
|
||||
config["alert"] = alert
|
||||
return config
|
||||
|
||||
|
||||
def collect_live_snapshot(
|
||||
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
|
||||
) -> dict[str, Any]:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
report_day = normalize_today(today)
|
||||
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
|
||||
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
|
||||
|
||||
repos = list(config.get("repos") or [])
|
||||
repo_payload = {}
|
||||
for repo in repos:
|
||||
repo_payload[repo] = {
|
||||
"open_issues": client.list_issues(repo, state="open"),
|
||||
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
|
||||
}
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
||||
"repos": repo_payload,
|
||||
}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
|
||||
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
|
||||
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
|
||||
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
|
||||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
|
||||
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
|
||||
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
|
||||
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
|
||||
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
|
||||
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
|
||||
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
config = load_config(args.config)
|
||||
|
||||
if args.snapshot_file:
|
||||
snapshot = load_json(args.snapshot_file, {"repos": {}})
|
||||
else:
|
||||
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
|
||||
|
||||
report = build_velocity_report(config, snapshot, today=args.today)
|
||||
|
||||
args.output_json.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output_md.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
|
||||
args.output_md.write_text(render_markdown(report), encoding="utf-8")
|
||||
|
||||
if args.write_history:
|
||||
update_history(args.history_file, report)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(report, indent=2))
|
||||
else:
|
||||
print(render_markdown(report))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,12 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
genome_analyzer.py — Generate a GENOME.md from a codebase using the canonical template.
|
||||
genome_analyzer.py — Generate a GENOME.md from a codebase.
|
||||
|
||||
Scans a repository and fills in templates/GENOME-template.md with discovered
|
||||
structure, entry points, and test coverage. Manual analysis sections are
|
||||
preserved with "(To be completed...)" placeholders.
|
||||
Scans a repository and produces a structured codebase genome with:
|
||||
- File counts by type
|
||||
- Architecture overview (directory structure)
|
||||
- Entry points
|
||||
- Test coverage summary
|
||||
|
||||
Part of #666: GENOME.md Template + Single-Repo Analyzer."""
|
||||
Usage:
|
||||
python3 scripts/genome_analyzer.py /path/to/repo
|
||||
python3 scripts/genome_analyzer.py /path/to/repo --output GENOME.md
|
||||
python3 scripts/genome_analyzer.py /path/to/repo --dry-run
|
||||
|
||||
Part of #666: GENOME.md Template + Single-Repo Analyzer.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
@@ -15,32 +23,25 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
SKIP_DIRS = {".git", "__pycache__", ".venv", "venv", "node_modules",
|
||||
".tox", ".pytest_cache", ".DS_Store", "dist", "build", "coverage"}
|
||||
|
||||
|
||||
def _is_source(p: Path) -> bool:
|
||||
return p.suffix in {".py", ".js", ".ts", ".mjs", ".cjs", ".jsx",
|
||||
".tsx", ".sh"} and not p.name.startswith("test_")
|
||||
SKIP_DIRS = {".git", "__pycache__", ".venv", "venv", "node_modules", ".tox", ".pytest_cache", ".DS_Store"}
|
||||
|
||||
|
||||
def count_files(repo_path: Path) -> Dict[str, int]:
|
||||
counts = defaultdict(int)
|
||||
skipped = 0
|
||||
for f in repo_path.rglob("*"):
|
||||
if any(part in SKIP_DIRS for part in f.parts):
|
||||
continue
|
||||
if f.is_file():
|
||||
if any(part in SKIP_DIRS for part in f.parts):
|
||||
continue
|
||||
ext = f.suffix or "(no ext)"
|
||||
counts[ext] += 1
|
||||
return dict(sorted(counts.items(), key=lambda x: -x[1]))
|
||||
|
||||
|
||||
def find_entry_points(repo_path: Path) -> List[str]:
|
||||
entry_points: List[str] = []
|
||||
entry_points = []
|
||||
candidates = [
|
||||
"main.py", "app.py", "server.py", "cli.py", "manage.py",
|
||||
"__main__.py", "index.html", "index.js", "index.ts",
|
||||
"index.html", "index.js", "index.ts",
|
||||
"Makefile", "Dockerfile", "docker-compose.yml",
|
||||
"README.md", "deploy.sh", "setup.py", "pyproject.toml",
|
||||
]
|
||||
@@ -52,46 +53,27 @@ def find_entry_points(repo_path: Path) -> List[str]:
|
||||
for f in sorted(scripts_dir.iterdir()):
|
||||
if f.suffix in (".py", ".sh") and not f.name.startswith("test_"):
|
||||
entry_points.append(f"scripts/{f.name}")
|
||||
src_dir = repo_path / "src"
|
||||
if src_dir.is_dir():
|
||||
for f in sorted(src_dir.iterdir()):
|
||||
if f.is_file() and f.suffix == ".py" and not f.name.startswith("test_"):
|
||||
entry_points.append(f"src/{f.name}")
|
||||
top_py = [f.name for f in repo_path.iterdir()
|
||||
if f.is_file() and f.suffix == ".py" and _is_source(f)]
|
||||
entry_points.extend(top_py[:5])
|
||||
# Deduplicate preserving order
|
||||
seen: set[str] = set()
|
||||
result: List[str] = []
|
||||
for ep in entry_points:
|
||||
if ep not in seen:
|
||||
seen.add(ep)
|
||||
result.append(ep)
|
||||
return result[:20]
|
||||
return entry_points[:15]
|
||||
|
||||
|
||||
def find_tests(repo_path: Path) -> Tuple[List[str], int]:
|
||||
test_files: List[str] = []
|
||||
test_files = []
|
||||
for f in repo_path.rglob("*"):
|
||||
if f.is_file():
|
||||
if any(part in SKIP_DIRS for part in f.parts):
|
||||
continue
|
||||
name = f.name
|
||||
if name.startswith("test_") or name.endswith("_test.py") or name.endswith(".test.js"):
|
||||
test_files.append(str(f.relative_to(repo_path)))
|
||||
if any(part in SKIP_DIRS for part in f.parts):
|
||||
continue
|
||||
if f.is_file() and (f.name.startswith("test_") or f.name.endswith("_test.py") or f.name.endswith("_test.js")):
|
||||
test_files.append(str(f.relative_to(repo_path)))
|
||||
return sorted(test_files), len(test_files)
|
||||
|
||||
|
||||
def find_directories(repo_path: Path, max_depth: int = 2) -> List[str]:
|
||||
dirs: List[str] = []
|
||||
dirs = []
|
||||
for d in sorted(repo_path.rglob("*")):
|
||||
if d.is_dir():
|
||||
depth = len(d.relative_to(repo_path).parts)
|
||||
if depth <= max_depth:
|
||||
if not any(part in SKIP_DIRS for part in d.parts):
|
||||
rel = str(d.relative_to(repo_path))
|
||||
if rel != "." and rel not in dirs:
|
||||
dirs.append(rel)
|
||||
if d.is_dir() and len(d.relative_to(repo_path).parts) <= max_depth:
|
||||
if not any(part in SKIP_DIRS for part in d.parts):
|
||||
rel = str(d.relative_to(repo_path))
|
||||
if rel != ".":
|
||||
dirs.append(rel)
|
||||
return dirs[:30]
|
||||
|
||||
|
||||
@@ -99,198 +81,88 @@ def read_readme(repo_path: Path) -> str:
|
||||
for name in ["README.md", "README.rst", "README.txt", "README"]:
|
||||
readme = repo_path / name
|
||||
if readme.exists():
|
||||
text = readme.read_text(encoding="utf-8", errors="replace")
|
||||
paras: List[str] = []
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
lines = readme.read_text(encoding="utf-8", errors="replace").split("\n")
|
||||
para = []
|
||||
started = False
|
||||
for line in lines:
|
||||
if line.startswith("#") and not started:
|
||||
continue
|
||||
if stripped:
|
||||
paras.append(stripped)
|
||||
elif paras:
|
||||
if line.strip():
|
||||
started = True
|
||||
para.append(line.strip())
|
||||
elif started:
|
||||
break
|
||||
return " ".join(paras[:3]) if paras else "(README exists but is mostly empty)"
|
||||
return " ".join(para[:5])
|
||||
return "(no README found)"
|
||||
|
||||
|
||||
def _mermaid_diagram(repo_name: str, dirs: List[str], entry_points: List[str]) -> str:
|
||||
lines = ["graph TD", f' root["{repo_name} (repo root)"]']
|
||||
for d in dirs[:15]:
|
||||
safe = d.replace("/", "_").replace("-", "_")
|
||||
lines.append(f' root --> {safe}["{d}/"]')
|
||||
lines.append("")
|
||||
lines.append(" %% Entry points (leaf nodes)")
|
||||
for ep in entry_points[:10]:
|
||||
safe_ep = ep.replace("/", "_").replace(".", "_").replace("-", "_")
|
||||
parent = ep.split("/")[0] if "/" in ep else "root"
|
||||
parent_safe = parent.replace("/", "_").replace("-", "_")
|
||||
lines.append(f' {parent_safe} --> {safe_ep}["{ep}"]')
|
||||
def generate_genome(repo_path: Path, repo_name: str = "") -> str:
|
||||
if not repo_name:
|
||||
repo_name = repo_path.name
|
||||
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
readme_desc = read_readme(repo_path)
|
||||
file_counts = count_files(repo_path)
|
||||
total_files = sum(file_counts.values())
|
||||
entry_points = find_entry_points(repo_path)
|
||||
test_files, test_count = find_tests(repo_path)
|
||||
dirs = find_directories(repo_path)
|
||||
|
||||
lines = [
|
||||
f"# GENOME.md — {repo_name}", "",
|
||||
f"> Codebase analysis generated {date}. {readme_desc[:100]}.", "",
|
||||
"## Project Overview", "",
|
||||
readme_desc, "",
|
||||
f"**{total_files} files** across {len(file_counts)} file types.", "",
|
||||
"## Architecture", "",
|
||||
"```",
|
||||
]
|
||||
for d in dirs[:20]:
|
||||
lines.append(f" {d}/")
|
||||
lines.append("```")
|
||||
lines += ["", "### File Types", "", "| Type | Count |", "|------|-------|"]
|
||||
for ext, count in list(file_counts.items())[:15]:
|
||||
lines.append(f"| {ext} | {count} |")
|
||||
lines += ["", "## Entry Points", ""]
|
||||
for ep in entry_points:
|
||||
lines.append(f"- `{ep}`")
|
||||
lines += ["", "## Test Coverage", "", f"**{test_count} test files** found.", ""]
|
||||
if test_files:
|
||||
for tf in test_files[:10]:
|
||||
lines.append(f"- `{tf}`")
|
||||
if len(test_files) > 10:
|
||||
lines.append(f"- ... and {len(test_files) - 10} more")
|
||||
else:
|
||||
lines.append("No test files found.")
|
||||
lines += ["", "## Security Considerations", "", "(To be filled during analysis)", ""]
|
||||
lines += ["## Design Decisions", "", "(To be filled during analysis)", ""]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _bullet_list(items: List[str]) -> str:
|
||||
if not items:
|
||||
return "(none discovered)"
|
||||
return "\n".join(f"- `{item}`" for item in items[:20])
|
||||
|
||||
|
||||
def _comma_list(items: List[str]) -> str:
|
||||
return ", ".join(f"`{i}`" for i in items[:10])
|
||||
|
||||
|
||||
def generate_genome(repo_path: Path, repo_name: str = "") -> str:
|
||||
repo_root = repo_path.resolve()
|
||||
if not repo_name:
|
||||
repo_name = repo_path.name
|
||||
|
||||
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
readme_desc = read_readme(repo_root)
|
||||
short_desc = readme_desc[:120] + "…" if len(readme_desc) > 120 else readme_desc
|
||||
|
||||
file_counts = count_files(repo_root)
|
||||
total_files = sum(file_counts.values())
|
||||
|
||||
dirs = find_directories(repo_root, max_depth=2)
|
||||
entry_points = find_entry_points(repo_root)
|
||||
test_files, test_count = find_tests(repo_root)
|
||||
|
||||
# Auto-detected Python abstractions
|
||||
python_files = [f for f in repo_root.rglob("*.py")
|
||||
if f.is_file() and not any(p in SKIP_DIRS for p in f.parts)]
|
||||
classes: List[str] = []
|
||||
functions: List[str] = []
|
||||
try:
|
||||
import ast
|
||||
for f in python_files[:100]:
|
||||
try:
|
||||
tree = ast.parse(f.read_text(encoding="utf-8", errors="replace"))
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
classes.append(f"{f.relative_to(repo_root)}::{node.name}")
|
||||
elif isinstance(node, ast.FunctionDef) and not node.name.startswith("_"):
|
||||
qual = f"{f.relative_to(repo_root)}::{node.name}"
|
||||
functions.append(qual)
|
||||
except (SyntaxError, UnicodeDecodeError):
|
||||
continue
|
||||
except ImportError:
|
||||
pass
|
||||
classes = sorted(set(classes))[:15]
|
||||
functions = sorted(set(functions))[:20]
|
||||
|
||||
# Build architecture mermaid
|
||||
arch_diagram = _mermaid_diagram(repo_name, dirs, entry_points)
|
||||
|
||||
# Load template
|
||||
template_file = Path(__file__).resolve().parent.parent / "templates" / "GENOME-template.md"
|
||||
|
||||
if template_file.exists():
|
||||
template_text = template_file.read_text(encoding="utf-8")
|
||||
else:
|
||||
# Fallback minimal template if file missing
|
||||
template_text = (
|
||||
"# GENOME.md — {REPO_NAME}\n\n"
|
||||
"> Codebase analysis generated {DATE}. {SHORT_DESCRIPTION}.\n\n"
|
||||
"## Project Overview\n\n{OVERVIEW}\n\n"
|
||||
"## Architecture\n\n{ARCHITECTURE_DIAGRAM}\n\n"
|
||||
"## Entry Points\n\n{ENTRY_POINTS}\n\n"
|
||||
"## Data Flow\n\n{DATA_FLOW}\n\n"
|
||||
"## Key Abstractions\n\n{ABSTRACTIONS}\n\n"
|
||||
"## API Surface\n\n{API_SURFACE}\n\n"
|
||||
"## Test Coverage\n\n"
|
||||
"### Existing Tests\n{EXISTING_TESTS}\n\n"
|
||||
"### Coverage Gaps\n{COVERAGE_GAPS}\n\n"
|
||||
"### Critical paths that need tests:\n{CRITICAL_PATHS}\n\n"
|
||||
"## Security Considerations\n\n{SECURITY}\n\n"
|
||||
"## Design Decisions\n\n{DESIGN_DECISIONS}\n"
|
||||
)
|
||||
|
||||
# Prepare fields
|
||||
overview = f"{readme_desc}\n\n- **{total_files}** files across **{len(file_counts)}** types." + (
|
||||
f"\n- Primary languages: {_comma_list([f'{k}:{v}' for k,v in list(file_counts.items())[:5]])}."
|
||||
)
|
||||
|
||||
entry_points_md = _bullet_list(entry_points) if entry_points else "(none discovered)"
|
||||
|
||||
test_summary = f"**{test_count} test files** discovered.\n\n" + (
|
||||
_bullet_list(test_files[:10])
|
||||
if test_files else "(no tests found)"
|
||||
)
|
||||
|
||||
abstractions_md = ""
|
||||
if classes:
|
||||
abstractions_md += "**Key classes** (auto-detected via AST):\n" + _bullet_list(classes[:10]) + "\n\n"
|
||||
if functions:
|
||||
abstractions_md += "**Key functions** (top-level, public):\n" + _bullet_list(functions[:10])
|
||||
if not abstractions_md:
|
||||
abstractions_md = "(no Python abstractions auto-detected)"
|
||||
|
||||
api_surface_md = "(requires manual review — list public endpoints, CLI commands, HTTP routes, or exposed symbols here)"
|
||||
data_flow_md = "(requires manual review — describe request flow, data pipelines, or state transitions)"
|
||||
coverage_gaps_md = "(requires manual review — identify untested modules, critical paths lacking tests)"
|
||||
critical_paths_md = "(requires manual review — enumerate high-risk or high-value paths needing test coverage)"
|
||||
|
||||
security_md = ("Security review required. Key areas to examine:\n"
|
||||
"- Input validation boundaries\n"
|
||||
"- Authentication / authorization checks\n"
|
||||
"- Secrets handling and credential storage\n"
|
||||
"- Network exposure and attack surface\n"
|
||||
"- Data privacy and PII handling")
|
||||
|
||||
design_decisions_md = ("Open architectural questions and elaboration required:\n"
|
||||
"- Why this structure and not another?\n"
|
||||
"- What constraints shaped current abstractions?\n"
|
||||
"- What trade-offs were accepted and why?\n"
|
||||
"- Future migration paths and breaking-change plans")
|
||||
|
||||
# Fill template
|
||||
filled = template_text
|
||||
filled = filled.replace("{{REPO_NAME}}", repo_name)
|
||||
filled = filled.replace("{{DATE}}", date)
|
||||
filled = filled.replace("{{SHORT_DESCRIPTION}}", short_desc)
|
||||
filled = filled.replace("{{OVERVIEW}}", overview)
|
||||
filled = filled.replace("{{ARCHITECTURE_DIAGRAM}}", arch_diagram)
|
||||
filled = filled.replace("{{ENTRY_POINTS}}", entry_points_md)
|
||||
filled = filled.replace("{{DATA_FLOW}}", data_flow_md)
|
||||
filled = filled.replace("{{ABSTRACTIONS}}", abstractions_md)
|
||||
filled = filled.replace("{{API_SURFACE}}", api_surface_md)
|
||||
filled = filled.replace("{{EXISTING_TESTS}}", test_summary)
|
||||
filled = filled.replace("{{COVERAGE_GAPS}}", coverage_gaps_md)
|
||||
filled = filled.replace("{{CRITICAL_PATHS}}", critical_paths_md)
|
||||
filled = filled.replace("{{SECURITY}}", security_md)
|
||||
filled = filled.replace("{{DESIGN_DECISIONS}}", design_decisions_md)
|
||||
return filled
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Generate GENOME.md from a codebase using the canonical template")
|
||||
parser.add_argument("repo_path", help="Path to repository root")
|
||||
parser.add_argument("--output", "-o", default="", help="Write GENOME.md to this path (default: stdout)")
|
||||
parser.add_argument("--name", default="", help="Override repository display name")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print discovered stats without generating file")
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate GENOME.md from a codebase")
|
||||
parser.add_argument("repo_path", help="Path to repository")
|
||||
parser.add_argument("--output", default="", help="Output file (default: stdout)")
|
||||
parser.add_argument("--name", default="", help="Repository name")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
|
||||
args = parser.parse_args()
|
||||
|
||||
repo_path = Path(args.repo_path).resolve()
|
||||
if not repo_path.is_dir():
|
||||
print(f"ERROR: {repo_path} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
repo_name = args.name or repo_path.name
|
||||
|
||||
if args.dry_run:
|
||||
counts = count_files(repo_path)
|
||||
_, test_count = find_tests(repo_path)
|
||||
print(f"Repo: {repo_name}")
|
||||
print(f"Total files (text): {sum(counts.values())}")
|
||||
print(f"Total files: {sum(counts.values())}")
|
||||
print(f"Test files: {test_count}")
|
||||
print(f"Top types: {', '.join(f'{k}={v}' for k,v in list(counts.items())[:5])}")
|
||||
sys.exit(0)
|
||||
|
||||
genome = generate_genome(repo_path, repo_name)
|
||||
|
||||
if args.output:
|
||||
out = Path(args.output)
|
||||
out.write_text(genome, encoding="utf-8")
|
||||
print(f"GENOME.md written: {out}")
|
||||
with open(args.output, "w") as f:
|
||||
f.write(genome)
|
||||
print(f"Written: {args.output}")
|
||||
else:
|
||||
print(genome)
|
||||
|
||||
|
||||
@@ -1,12 +1 @@
|
||||
# Timmy core module
|
||||
|
||||
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
|
||||
from .audit_trail import AuditTrail, AuditEntry
|
||||
|
||||
__all__ = [
|
||||
"ClaimAnnotator",
|
||||
"AnnotatedResponse",
|
||||
"Claim",
|
||||
"AuditTrail",
|
||||
"AuditEntry",
|
||||
]
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Response Claim Annotator — Source Distinction System
|
||||
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
|
||||
a verified source I can point to, or my own pattern-matching. My user must be
|
||||
able to tell which is which."
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Claim:
|
||||
"""A single claim in a response, annotated with source type."""
|
||||
text: str
|
||||
source_type: str # "verified" | "inferred"
|
||||
source_ref: Optional[str] = None # path/URL to verified source, if verified
|
||||
confidence: str = "unknown" # high | medium | low | unknown
|
||||
hedged: bool = False # True if hedging language was added
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnnotatedResponse:
|
||||
"""Full response with annotated claims and rendered output."""
|
||||
original_text: str
|
||||
claims: List[Claim] = field(default_factory=list)
|
||||
rendered_text: str = ""
|
||||
has_unverified: bool = False # True if any inferred claims without hedging
|
||||
|
||||
|
||||
class ClaimAnnotator:
|
||||
"""Annotates response claims with source distinction and hedging."""
|
||||
|
||||
# Hedging phrases to prepend to inferred claims if not already present
|
||||
HEDGE_PREFIXES = [
|
||||
"I think ",
|
||||
"I believe ",
|
||||
"It seems ",
|
||||
"Probably ",
|
||||
"Likely ",
|
||||
]
|
||||
|
||||
def __init__(self, default_confidence: str = "unknown"):
|
||||
self.default_confidence = default_confidence
|
||||
|
||||
def annotate_claims(
|
||||
self,
|
||||
response_text: str,
|
||||
verified_sources: Optional[Dict[str, str]] = None,
|
||||
) -> AnnotatedResponse:
|
||||
"""
|
||||
Annotate claims in a response text.
|
||||
|
||||
Args:
|
||||
response_text: Raw response from the model
|
||||
verified_sources: Dict mapping claim substrings to source references
|
||||
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
|
||||
Returns:
|
||||
AnnotatedResponse with claims marked and rendered text
|
||||
"""
|
||||
verified_sources = verified_sources or {}
|
||||
claims = []
|
||||
has_unverified = False
|
||||
|
||||
# Simple sentence splitting (naive, but sufficient for MVP)
|
||||
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
|
||||
|
||||
for sent in sentences:
|
||||
# Check if sentence is a claim we can verify
|
||||
matched_source = None
|
||||
for claim_substr, source_ref in verified_sources.items():
|
||||
if claim_substr.lower() in sent.lower():
|
||||
matched_source = source_ref
|
||||
break
|
||||
|
||||
if matched_source:
|
||||
# Verified claim
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="verified",
|
||||
source_ref=matched_source,
|
||||
confidence="high",
|
||||
hedged=False,
|
||||
)
|
||||
else:
|
||||
# Inferred claim (pattern-matched)
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="inferred",
|
||||
confidence=self.default_confidence,
|
||||
hedged=self._has_hedge(sent),
|
||||
)
|
||||
if not claim.hedged:
|
||||
has_unverified = True
|
||||
|
||||
claims.append(claim)
|
||||
|
||||
# Render the annotated response
|
||||
rendered = self._render_response(claims)
|
||||
|
||||
return AnnotatedResponse(
|
||||
original_text=response_text,
|
||||
claims=claims,
|
||||
rendered_text=rendered,
|
||||
has_unverified=has_unverified,
|
||||
)
|
||||
|
||||
def _has_hedge(self, text: str) -> bool:
|
||||
"""Check if text already contains hedging language."""
|
||||
text_lower = text.lower()
|
||||
for prefix in self.HEDGE_PREFIXES:
|
||||
if text_lower.startswith(prefix.lower()):
|
||||
return True
|
||||
# Also check for inline hedges
|
||||
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
|
||||
return any(word in text_lower for word in hedge_words)
|
||||
|
||||
def _render_response(self, claims: List[Claim]) -> str:
|
||||
"""
|
||||
Render response with source distinction markers.
|
||||
|
||||
Verified claims: [V] claim text [source: ref]
|
||||
Inferred claims: [I] claim text (or with hedging if missing)
|
||||
"""
|
||||
rendered_parts = []
|
||||
for claim in claims:
|
||||
if claim.source_type == "verified":
|
||||
part = f"[V] {claim.text}"
|
||||
if claim.source_ref:
|
||||
part += f" [source: {claim.source_ref}]"
|
||||
else: # inferred
|
||||
if not claim.hedged:
|
||||
# Add hedging if missing
|
||||
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
|
||||
part = f"[I] {hedged_text}"
|
||||
else:
|
||||
part = f"[I] {claim.text}"
|
||||
rendered_parts.append(part)
|
||||
return " ".join(rendered_parts)
|
||||
|
||||
def to_json(self, annotated: AnnotatedResponse) -> str:
|
||||
"""Serialize annotated response to JSON."""
|
||||
return json.dumps(
|
||||
{
|
||||
"original_text": annotated.original_text,
|
||||
"rendered_text": annotated.rendered_text,
|
||||
"has_unverified": annotated.has_unverified,
|
||||
"claims": [asdict(c) for c in annotated.claims],
|
||||
},
|
||||
indent=2,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
176
tests/test_burn_velocity_tracker.py
Normal file
176
tests/test_burn_velocity_tracker.py
Normal file
@@ -0,0 +1,176 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
|
||||
|
||||
|
||||
SNAPSHOT = {
|
||||
"generated_at": "2026-04-22T12:00:00Z",
|
||||
"repos": {
|
||||
"timmy-home": {
|
||||
"open_issues": [
|
||||
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
|
||||
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
|
||||
],
|
||||
"recent_issues": [
|
||||
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
|
||||
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
|
||||
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
|
||||
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
|
||||
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
|
||||
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
|
||||
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
|
||||
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
|
||||
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
|
||||
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
|
||||
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
|
||||
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
|
||||
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
|
||||
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
|
||||
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
|
||||
],
|
||||
},
|
||||
"timmy-config": {
|
||||
"open_issues": [
|
||||
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
|
||||
],
|
||||
"recent_issues": [
|
||||
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
|
||||
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
CONFIG = {
|
||||
"owner": "Timmy_Foundation",
|
||||
"repos": ["timmy-home", "timmy-config"],
|
||||
"lookback_days": 14,
|
||||
"alert": {
|
||||
"recent_days": 7,
|
||||
"baseline_days": 7,
|
||||
"minimum_baseline_closed": 4,
|
||||
"drop_ratio": 0.6,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
|
||||
assert report["generated_day"] == "2026-04-22"
|
||||
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
|
||||
assert report["summary"]["total_open_now"] == 3
|
||||
|
||||
home = report["repos"][0]
|
||||
assert home["repo"] == "timmy-home"
|
||||
assert home["open_now"] == 2
|
||||
assert home["opened_last_7d"] == 5
|
||||
assert home["closed_last_7d"] == 3
|
||||
assert home["baseline_closed"] == 7
|
||||
assert home["weekly_net"] == 2
|
||||
assert home["alert"]["status"] == "drop"
|
||||
assert home["alert"]["recent_closed"] == 3
|
||||
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
|
||||
|
||||
timmy_config = report["repos"][1]
|
||||
assert timmy_config["repo"] == "timmy-config"
|
||||
assert timmy_config["open_now"] == 1
|
||||
assert timmy_config["closed_last_7d"] == 1
|
||||
assert timmy_config["alert"]["status"] == "ok"
|
||||
|
||||
|
||||
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
rendered = render_markdown(report)
|
||||
|
||||
for snippet in (
|
||||
"# Burn-down Velocity Tracking",
|
||||
"## Per-repo velocity",
|
||||
"timmy-home",
|
||||
"timmy-config",
|
||||
"## Dashboard handoff for timmy-config",
|
||||
"velocity_drop",
|
||||
"## Alerts",
|
||||
):
|
||||
assert snippet in rendered
|
||||
|
||||
|
||||
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
|
||||
history_path = tmp_path / "burn-velocity-history.json"
|
||||
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
|
||||
update_history(history_path, report)
|
||||
|
||||
updated = json.loads(json.dumps(report))
|
||||
updated["repos"][0]["open_now"] = 9
|
||||
updated["summary"]["total_open_now"] = 10
|
||||
update_history(history_path, updated)
|
||||
|
||||
history = json.loads(history_path.read_text(encoding="utf-8"))
|
||||
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
|
||||
assert history["days"][0]["summary"]["total_open_now"] == 10
|
||||
assert history["days"][0]["repos"][0]["open_now"] == 9
|
||||
|
||||
|
||||
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
|
||||
snapshot_path = tmp_path / "snapshot.json"
|
||||
output_json = tmp_path / "latest.json"
|
||||
output_md = tmp_path / "latest.md"
|
||||
history_path = tmp_path / "history.json"
|
||||
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-m",
|
||||
"scripts.burn_velocity_tracker",
|
||||
"--snapshot-file",
|
||||
str(snapshot_path),
|
||||
"--today",
|
||||
"2026-04-22",
|
||||
"--output-json",
|
||||
str(output_json),
|
||||
"--output-md",
|
||||
str(output_md),
|
||||
"--history-file",
|
||||
str(history_path),
|
||||
"--write-history",
|
||||
"--json",
|
||||
],
|
||||
check=True,
|
||||
cwd=ROOT,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
payload = json.loads(result.stdout)
|
||||
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
|
||||
assert output_json.exists()
|
||||
assert output_md.exists()
|
||||
assert history_path.exists()
|
||||
assert "timmy-config" in output_md.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_repo_contains_burn_velocity_tracking_doc() -> None:
|
||||
text = DOC_PATH.read_text(encoding="utf-8")
|
||||
required = [
|
||||
"# Burn-down Velocity Tracking",
|
||||
"python3 scripts/burn_velocity_tracker.py",
|
||||
"configs/burn_velocity_repos.json",
|
||||
"~/.timmy/burn-velocity/latest.json",
|
||||
"timmy-config dashboard",
|
||||
"type=issues",
|
||||
"velocity_drop",
|
||||
]
|
||||
for snippet in required:
|
||||
assert snippet in text
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for claim_annotator.py — verifies source distinction is present."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
|
||||
|
||||
|
||||
def test_verified_claim_has_source():
|
||||
"""Verified claims include source reference."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
response = "Paris is the capital of France. It is a beautiful city."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert len(result.claims) > 0
|
||||
verified_claims = [c for c in result.claims if c.source_type == "verified"]
|
||||
assert len(verified_claims) == 1
|
||||
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
|
||||
assert "[V]" in result.rendered_text
|
||||
assert "[source:" in result.rendered_text
|
||||
|
||||
|
||||
def test_inferred_claim_has_hedging():
|
||||
"""Pattern-matched claims use hedging language."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "The weather is nice today. It might rain tomorrow."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
|
||||
assert len(inferred_claims) >= 1
|
||||
# Check that rendered text has [I] marker
|
||||
assert "[I]" in result.rendered_text
|
||||
# Check that unhedged inferred claims get hedging
|
||||
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
|
||||
|
||||
|
||||
def test_hedged_claim_not_double_hedged():
|
||||
"""Claims already with hedging are not double-hedged."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "I think the sky is blue. It is a nice day."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
# The "I think" claim should not become "I think I think ..."
|
||||
assert "I think I think" not in result.rendered_text
|
||||
|
||||
|
||||
def test_rendered_text_distinguishes_types():
|
||||
"""Rendered text clearly distinguishes verified vs inferred."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Earth is round": "https://science.org/earth"}
|
||||
response = "Earth is round. Stars are far away."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert "[V]" in result.rendered_text # verified marker
|
||||
assert "[I]" in result.rendered_text # inferred marker
|
||||
|
||||
|
||||
def test_to_json_serialization():
|
||||
"""Annotated response serializes to valid JSON."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "Test claim."
|
||||
result = annotator.annotate_claims(response)
|
||||
json_str = annotator.to_json(result)
|
||||
parsed = json.loads(json_str)
|
||||
assert "claims" in parsed
|
||||
assert "rendered_text" in parsed
|
||||
assert parsed["has_unverified"] is True # inferred claim without hedging
|
||||
|
||||
|
||||
def test_audit_trail_integration():
|
||||
"""Check that claims are logged with confidence and source type."""
|
||||
# This test verifies the audit trail integration point
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"AI is useful": "https://example.com/ai"}
|
||||
response = "AI is useful. It can help with tasks."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
for claim in result.claims:
|
||||
assert claim.source_type in ("verified", "inferred")
|
||||
assert claim.confidence in ("high", "medium", "low", "unknown")
|
||||
if claim.source_type == "verified":
|
||||
assert claim.source_ref is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_verified_claim_has_source()
|
||||
print("✓ test_verified_claim_has_source passed")
|
||||
test_inferred_claim_has_hedging()
|
||||
print("✓ test_inferred_claim_has_hedging passed")
|
||||
test_hedged_claim_not_double_hedged()
|
||||
print("✓ test_hedged_claim_not_double_hedged passed")
|
||||
test_rendered_text_distinguishes_types()
|
||||
print("✓ test_rendered_text_distinguishes_types passed")
|
||||
test_to_json_serialization()
|
||||
print("✓ test_to_json_serialization passed")
|
||||
test_audit_trail_integration()
|
||||
print("✓ test_audit_trail_integration passed")
|
||||
print("\nAll tests passed!")
|
||||
Reference in New Issue
Block a user