Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
212a5befba test: fix burn velocity alert coverage
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 25s
Smoke Test / smoke (pull_request) Failing after 26s
Agent PR Gate / gate (pull_request) Failing after 34s
Agent PR Gate / report (pull_request) Successful in 8s
2026-04-22 14:45:13 -04:00
Alexander Whitestone
1eaf6df7d0 wip: add burn velocity tracker config and docs 2026-04-22 14:42:47 -04:00
Alexander Whitestone
735fafa235 wip: add burn velocity tracker script 2026-04-22 14:42:07 -04:00
Alexander Whitestone
e53d54f873 wip: add burn velocity tracker tests 2026-04-22 14:39:53 -04:00
8 changed files with 670 additions and 501 deletions

View File

@@ -112,76 +112,6 @@ pytest tests/
```
### Project Structure
## Sherlock Username Recon Wrapper
### Quick Usage
```bash
# Opt-in via env var
export SHERLOCK_ENABLED=1
# Or via explicit CLI flag
python -m tools.sherlock_wrapper --query "alice" --opt-in --json
# With site whitelist
python -m tools.sherlock_wrapper --query "alice" --opt-in --sites github twitter --json
```
### What It Does
Builds a bounded local wrapper around the Sherlock username OSINT tool that:
- **Opt-in gate** — SHERLOCK_ENABLED=1 or `--opt-in` required before any external call
- **Local-first caching** — results cached in `~/.cache/timmy/sherlock_cache.db` (TTL: 7 days)
- **Normalized JSON** — stable schema with `found`, `missing`, `errors`, and `metadata` sections
- **No network egress** — only makes outbound HTTP to target sites through sherlock; never phones home
### Output Schema
```json
{
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T14:23:00+00:00",
"found": [
{"site": "github", "url": "https://github.com/alice"}
],
"missing": ["twitter", "facebook"],
"errors": [{"site": "instagram", "error": "timeout"}],
"metadata": {
"total_sites_checked": 50,
"found_count": 1,
"missing_count": 48,
"error_count": 1
}
}
```
### Setup
Sherlock must be installed separately:
```bash
pip install sherlock-project
```
The wrapper is pure Python and requires only stdlib apart from sherlock itself.
### Why an Opt-In Gate?
Sherlock makes outbound HTTP requests to dozens of third-party sites. The opt-in gate:
1. Ensures a human operator explicitly approves this dependency
2. Makes the outbound traffic auditable in session logs
3. Prevents accidental invocation in automated pipelines
### Running the Smoke Test
```bash
# Run unit + integration tests
pytest tests/test_sherlock_wrapper.py -v
```
```
.

View File

@@ -0,0 +1,18 @@
{
"owner": "Timmy_Foundation",
"repos": [
"timmy-home",
"timmy-config",
"fleet-ops",
"the-beacon",
"the-door",
"the-nexus"
],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6
}
}

View File

@@ -0,0 +1,70 @@
# Burn-down Velocity Tracking
Refs #519.
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
## What it emits
Daily run outputs:
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
Tracked repos live in `configs/burn_velocity_repos.json`.
## Cron command
```bash
cd ~/timmy-home && \
python3 scripts/burn_velocity_tracker.py \
--config configs/burn_velocity_repos.json \
--output-json ~/.timmy/burn-velocity/latest.json \
--output-md ~/.timmy/burn-velocity/latest.md \
--history-file ~/.timmy/burn-velocity/history.json \
--write-history
```
Example crontab entry:
```cron
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
```
## Dashboard handoff
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
- `open_now`
- `opened_last_7d`
- `closed_last_7d`
- `baseline_closed`
- `weekly_net`
- `alert.status`
- `alert.kind`
- `alert.reason`
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
## Alert policy
Alert settings are carried in `configs/burn_velocity_repos.json`:
- `recent_days`
- `baseline_days`
- `minimum_baseline_closed`
- `drop_ratio`
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
## Gitea API contract
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
Live collection shape:
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
## Honest scope boundary
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.

View File

@@ -0,0 +1,406 @@
#!/usr/bin/env python3
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
Refs: timmy-home #519
"""
from __future__ import annotations
import argparse
import json
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import parse, request
from base64 import b64encode
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
DEFAULT_CONFIG = {
"owner": DEFAULT_OWNER,
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def parse_iso8601(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def normalize_today(value: str | date | None = None) -> date:
if value is None:
return datetime.now(timezone.utc).date()
if isinstance(value, date):
return value
return date.fromisoformat(value)
def build_day_window(today: date, lookback_days: int) -> list[date]:
start = today - timedelta(days=lookback_days - 1)
return [start + timedelta(days=offset) for offset in range(lookback_days)]
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [item for item in items if not item.get("pull_request")]
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
days = build_day_window(today, lookback_days)
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
start_day = days[0]
for item in filter_issue_items(items):
created_at = parse_iso8601(item.get("created_at"))
if created_at is not None:
created_day = created_at.date()
if start_day <= created_day <= today:
counts[created_day.isoformat()]["opened"] += 1
closed_at = parse_iso8601(item.get("closed_at"))
if closed_at is not None:
closed_day = closed_at.date()
if start_day <= closed_day <= today:
counts[closed_day.isoformat()]["closed"] += 1
return [
{
"date": day.isoformat(),
"opened": counts[day.isoformat()]["opened"],
"closed": counts[day.isoformat()]["closed"],
}
for day in days
]
def summarize_velocity_alert(
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
) -> dict[str, Any]:
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
drop_ratio = float(config.get("drop_ratio", 0.6))
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": (
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
),
}
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
}
return {
"status": "ok",
"kind": "none",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity stable",
}
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
if days <= 0:
return 0
return sum(int(item[field]) for item in daily[-days:])
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
if baseline_days <= 0:
return 0
if recent_days <= 0:
return sum(int(item["closed"]) for item in daily[-baseline_days:])
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
return sum(int(item["closed"]) for item in baseline_slice)
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
report_day = normalize_today(today)
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
owner = config.get("owner", DEFAULT_OWNER)
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
lookback_days = int(config.get("lookback_days", 14))
alert_config = dict(DEFAULT_CONFIG["alert"])
alert_config.update(config.get("alert") or {})
recent_days = int(alert_config.get("recent_days", 7))
baseline_days = int(alert_config.get("baseline_days", 7))
repo_reports: list[dict[str, Any]] = []
total_open_now = 0
total_closed_last_7d = 0
repos_with_alerts: list[str] = []
for repo_name in repos:
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
daily = build_daily_series(recent_issues, report_day, lookback_days)
open_now = len(open_issues)
opened_last_7d = _sum_window(daily, "opened", recent_days)
closed_last_7d = _sum_window(daily, "closed", recent_days)
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
weekly_net = opened_last_7d - closed_last_7d
alert = summarize_velocity_alert(
recent_closed=closed_last_7d,
baseline_closed=baseline_closed,
open_now=open_now,
config=alert_config,
)
repo_report = {
"repo": repo_name,
"open_now": open_now,
"opened_last_7d": opened_last_7d,
"closed_last_7d": closed_last_7d,
"baseline_closed": baseline_closed,
"weekly_net": weekly_net,
"daily": daily,
"alert": alert,
}
repo_reports.append(repo_report)
total_open_now += open_now
total_closed_last_7d += closed_last_7d
if alert["status"] != "ok":
repos_with_alerts.append(repo_name)
return {
"owner": owner,
"generated_at": generated_at,
"generated_day": report_day.isoformat(),
"lookback_days": lookback_days,
"dashboard_contract_version": 1,
"repos": repo_reports,
"summary": {
"total_open_now": total_open_now,
"total_closed_last_7d": total_closed_last_7d,
"repos_with_alerts": repos_with_alerts,
},
}
def render_markdown(report: dict[str, Any]) -> str:
lines = [
"# Burn-down Velocity Tracking",
"",
f"Generated: {report['generated_at']}",
f"Owner: {report['owner']}",
f"Lookback days: {report['lookback_days']}",
"",
"## Per-repo velocity",
"",
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
"| --- | ---: | ---: | ---: | ---: | --- |",
]
for repo in report["repos"]:
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
lines.append(
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
)
lines.extend(
[
"",
"## Dashboard handoff for timmy-config",
"",
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
"- `open_now`",
"- `opened_last_7d`",
"- `closed_last_7d`",
"- `baseline_closed`",
"- `alert.status` / `alert.kind` / `alert.reason`",
"",
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
"",
"## Alerts",
"",
]
)
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
if not alerts:
lines.append("- none")
else:
for repo in alerts:
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
return "\n".join(lines) + "\n"
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
if history_path.exists():
history = json.loads(history_path.read_text(encoding="utf-8"))
else:
history = {"days": []}
entry = {
"date": report["generated_day"],
"generated_at": report["generated_at"],
"summary": report["summary"],
"repos": report["repos"],
}
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
retained.append(entry)
retained.sort(key=lambda item: item["date"])
history["days"] = retained
history_path.parent.mkdir(parents=True, exist_ok=True)
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
return history
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.base_url = base_url.rstrip("/")
def _headers(self) -> list[dict[str, str]]:
return [
{"Authorization": f"token {self.token}", "Accept": "application/json"},
{
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
"Accept": "application/json",
},
]
def _request_json(self, url: str) -> list[dict[str, Any]]:
last_error: Exception | None = None
for headers in self._headers():
try:
req = request.Request(url, headers=headers)
with request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except Exception as exc: # pragma: no cover - exercised only on live API failure
last_error = exc
if last_error is None: # pragma: no cover - defensive
raise RuntimeError("request failed without an exception")
raise last_error
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
issues: list[dict[str, Any]] = []
page = 1
while True:
query = {"state": state, "type": "issues", "limit": 100, "page": page}
if since:
query["since"] = since
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
batch = self._request_json(url)
if not batch:
break
issues.extend(filter_issue_items(batch))
page += 1
return issues
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def load_config(path: Path) -> dict[str, Any]:
config = dict(DEFAULT_CONFIG)
alert = dict(DEFAULT_CONFIG["alert"])
raw = load_json(path, {})
config.update(raw)
alert.update(raw.get("alert") or {})
config["alert"] = alert
return config
def collect_live_snapshot(
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
) -> dict[str, Any]:
token = token_file.read_text(encoding="utf-8").strip()
report_day = normalize_today(today)
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
repos = list(config.get("repos") or [])
repo_payload = {}
for repo in repos:
repo_payload[repo] = {
"open_issues": client.list_issues(repo, state="open"),
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
}
return {
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"repos": repo_payload,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config(args.config)
if args.snapshot_file:
snapshot = load_json(args.snapshot_file, {"repos": {}})
else:
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
report = build_velocity_report(config, snapshot, today=args.today)
args.output_json.parent.mkdir(parents=True, exist_ok=True)
args.output_md.parent.mkdir(parents=True, exist_ok=True)
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
args.output_md.write_text(render_markdown(report), encoding="utf-8")
if args.write_history:
update_history(args.history_file, report)
if args.json:
print(json.dumps(report, indent=2))
else:
print(render_markdown(report))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,176 @@
from __future__ import annotations
import json
import subprocess
import sys
from datetime import date
from pathlib import Path
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
ROOT = Path(__file__).resolve().parent.parent
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
SNAPSHOT = {
"generated_at": "2026-04-22T12:00:00Z",
"repos": {
"timmy-home": {
"open_issues": [
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
],
"recent_issues": [
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
],
},
"timmy-config": {
"open_issues": [
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
],
"recent_issues": [
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
],
},
},
}
CONFIG = {
"owner": "Timmy_Foundation",
"repos": ["timmy-home", "timmy-config"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
assert report["generated_day"] == "2026-04-22"
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
assert report["summary"]["total_open_now"] == 3
home = report["repos"][0]
assert home["repo"] == "timmy-home"
assert home["open_now"] == 2
assert home["opened_last_7d"] == 5
assert home["closed_last_7d"] == 3
assert home["baseline_closed"] == 7
assert home["weekly_net"] == 2
assert home["alert"]["status"] == "drop"
assert home["alert"]["recent_closed"] == 3
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
timmy_config = report["repos"][1]
assert timmy_config["repo"] == "timmy-config"
assert timmy_config["open_now"] == 1
assert timmy_config["closed_last_7d"] == 1
assert timmy_config["alert"]["status"] == "ok"
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
rendered = render_markdown(report)
for snippet in (
"# Burn-down Velocity Tracking",
"## Per-repo velocity",
"timmy-home",
"timmy-config",
"## Dashboard handoff for timmy-config",
"velocity_drop",
"## Alerts",
):
assert snippet in rendered
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
history_path = tmp_path / "burn-velocity-history.json"
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
update_history(history_path, report)
updated = json.loads(json.dumps(report))
updated["repos"][0]["open_now"] = 9
updated["summary"]["total_open_now"] = 10
update_history(history_path, updated)
history = json.loads(history_path.read_text(encoding="utf-8"))
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
assert history["days"][0]["summary"]["total_open_now"] == 10
assert history["days"][0]["repos"][0]["open_now"] == 9
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
snapshot_path = tmp_path / "snapshot.json"
output_json = tmp_path / "latest.json"
output_md = tmp_path / "latest.md"
history_path = tmp_path / "history.json"
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.burn_velocity_tracker",
"--snapshot-file",
str(snapshot_path),
"--today",
"2026-04-22",
"--output-json",
str(output_json),
"--output-md",
str(output_md),
"--history-file",
str(history_path),
"--write-history",
"--json",
],
check=True,
cwd=ROOT,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
assert output_json.exists()
assert output_md.exists()
assert history_path.exists()
assert "timmy-config" in output_md.read_text(encoding="utf-8")
def test_repo_contains_burn_velocity_tracking_doc() -> None:
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Burn-down Velocity Tracking",
"python3 scripts/burn_velocity_tracker.py",
"configs/burn_velocity_repos.json",
"~/.timmy/burn-velocity/latest.json",
"timmy-config dashboard",
"type=issues",
"velocity_drop",
]
for snippet in required:
assert snippet in text

View File

@@ -1,182 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for sherlock_wrapper — validates schema, caching, opt-in gate,
and error handling without requiring sherlock to be installed.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
from sherlock_wrapper import (
compute_query_hash,
normalize_sherlock_output,
require_opt_in,
check_sherlock_available,
get_cache_connection,
save_to_cache,
get_cached_result,
)
class TestSherlockWrapperSmoke(unittest.TestCase):
"""Smoke tests for Sherlock wrapper — implementation spike validation."""
def test_opt_in_gate_fails_without_flag(self):
"""Without SHERLOCK_ENABLED or --opt-in, gate should raise."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
with self.assertRaises(RuntimeError) as ctx:
require_opt_in(opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_opt_in_gate_succeeds_with_env(self):
"""SHERLOCK_ENABLED=1 bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", True):
require_opt_in(opt_in=False) # Should not raise
def test_opt_in_gate_succeeds_with_flag(self):
"""--opt-in flag bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
require_opt_in(opt_in=True) # Should not raise
def test_query_hash_deterministic(self):
"""Same input produces same hash."""
h1 = compute_query_hash("alice")
h2 = compute_query_hash("alice")
self.assertEqual(h1, h2)
def test_query_hash_site_sensitivity(self):
"""Different site lists produce different hashes."""
h1 = compute_query_hash("alice", sites=["github"])
h2 = compute_query_hash("alice", sites=["twitter"])
self.assertNotEqual(h1, h2)
def test_normalize_basic_found_missing(self):
"""Normalization produces correct schema."""
raw = {
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
"instagram": {"status": "error", "error_detail": "timeout"},
}
normalized = normalize_sherlock_output(raw, "alice")
self.assertEqual(normalized["query"], "alice")
self.assertEqual(normalized["metadata"]["found_count"], 1)
self.assertEqual(normalized["metadata"]["missing_count"], 1)
self.assertEqual(normalized["metadata"]["error_count"], 1)
self.assertEqual(len(normalized["found"]), 1)
self.assertEqual(normalized["found"][0]["site"], "github")
self.assertIn("twitter", normalized["missing"])
self.assertEqual(normalized["errors"][0]["site"], "instagram")
def test_normalized_schema_has_required_fields(self):
"""Output schema contains all required top-level keys."""
raw = {"site1": {"status": "not found"}}
normalized = normalize_sherlock_output(raw, "testuser")
required = ["schema_version", "query", "timestamp", "found", "missing",
"errors", "metadata"]
for key in required:
self.assertIn(key, normalized)
self.assertIsInstance(normalized["timestamp"], str)
self.assertIsInstance(normalized["found"], list)
self.assertIsInstance(normalized["missing"], list)
self.assertIsInstance(normalized["errors"], list)
self.assertIsInstance(normalized["metadata"], dict)
def test_cache_roundtrip(self):
"""Result can be written and read back from cache."""
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
test_result = {
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T00:00:00+00:00",
"found": [],
"missing": ["github"],
"errors": [],
"metadata": {"total_sites_checked": 1, "found_count": 0, "missing_count": 1, "error_count": 0},
}
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, test_result)
retrieved = get_cached_result(query_hash)
self.assertEqual(retrieved, test_result)
def test_cache_miss_on_stale(self):
"""Cache returns None when entry is older than 7 days."""
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "cache.db"
with patch("sherlock_wrapper.CACHE_DB", db_path):
old_ts = "2025-04-01T00:00:00+00:00"
old_result = {
"schema_version": "1.0", "query": "alice",
"timestamp": old_ts, "found": [], "missing": [], "errors": [],
"metadata": {"total_sites_checked": 0, "found_count": 0, "missing_count": 0, "error_count": 0},
}
query_hash = compute_query_hash("alice")
# Direct DB insert with controlled timestamp (bypass save_to_cache's NOW)
conn = get_cache_connection()
conn.execute(
"INSERT INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(old_result), old_ts)
)
conn.commit()
retrieved = get_cached_result(query_hash)
self.assertIsNone(retrieved)
def test_sherlock_available_check(self):
"""check_sherlock_available returns bool."""
available = check_sherlock_available()
self.assertIsInstance(available, bool)
# Note: on this test system sherlock may not be installed, so False is expected.
# The important thing is the function returns a bool.
print(f"[INFO] Sherlock installed: {available}")
class TestSherlockWrapperIntegration(unittest.TestCase):
"""Integration tests with mocked sherlock module."""
def test_run_sherlock_with_opt_in(self):
"""run_sherlock succeeds with opt-in and returns normalized result."""
fake_sherlock = MagicMock()
fake_sherlock.sherlock = MagicMock(return_value={
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
})
with patch.dict("sys.modules", {"sherlock": fake_sherlock}):
import importlib
import sherlock_wrapper
importlib.reload(sherlock_wrapper)
with patch.dict(os.environ, {"SHERLOCK_ENABLED": "1"}):
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result["query"], "alice")
self.assertEqual(result["metadata"]["found_count"], 1)
def test_run_sherlock_fails_without_opt_in(self):
"""run_sherlock raises RuntimeError without opt-in."""
from sherlock_wrapper import run_sherlock
with self.assertRaises(RuntimeError) as ctx:
run_sherlock("alice", opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_run_sherlock_uses_cache(self):
"""Cached result short-circuits sherlock execution."""
cached = {
"schema_version": "1.0", "query": "alice", "timestamp": "2025-04-26T00:00:00+00:00",
"found": [{"site": "github", "url": "https://github.com/alice"}],
"missing": ["twitter"],
"errors": [],
"metadata": {"total_sites_checked": 2, "found_count": 1, "missing_count": 1, "error_count": 0},
}
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, cached)
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result, cached)

View File

View File

@@ -1,249 +0,0 @@
#!/usr/bin/env python3
"""
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
This is an implementation spike (issue #874) to validate local integration
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
"""
import argparse
import hashlib
import json
import os
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
# Cache location
CACHE_DIR = Path.home() / ".cache" / "timmy"
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
# Normalized output schema version
SCHEMA_VERSION = "1.0"
def require_opt_in(opt_in: bool = False) -> None:
"""Enforce opt-in gate for Sherlock external dependency."""
if not (SHERLOCK_ENABLED or opt_in):
raise RuntimeError(
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
)
def check_sherlock_available() -> bool:
"""Check if sherlock Python package is installed."""
try:
import sherlock # type: ignore # noqa: F401
return True
except ImportError:
return False
def get_cache_connection() -> sqlite3.Connection:
"""Initialize cache directory and return DB connection."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(CACHE_DB))
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
query_hash TEXT PRIMARY KEY,
result_json TEXT NOT NULL,
timestamp DATETIME NOT NULL
)
""")
return conn
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
"""Deterministic hash for cache key."""
components = [username.lower().strip()]
if sites:
components.extend(sorted(sites))
raw = "|".join(components)
return hashlib.sha256(raw.encode()).hexdigest()
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
conn = get_cache_connection()
cur = conn.execute(
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
(query_hash,)
)
row = cur.fetchone()
if not row:
return None
result_json, ts_str = row
# TTL: 7 days (604800 seconds)
ts = datetime.fromisoformat(ts_str)
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
if age_seconds >= 604800:
return None
return json.loads(result_json)
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
"""Persist result to cache."""
conn = get_cache_connection()
conn.execute(
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
)
conn.commit()
conn.close()
def normalize_sherlock_output(
raw_result: Dict[str, Any],
username: str,
sites_checked: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Convert raw sherlock output into a stable, normalized schema.
Expected sherlock result shape (via Python API):
{
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
...
}
"""
found: List[Dict[str, str]] = []
missing: List[str] = []
errors: List[Dict[str, str]] = []
for site_name, site_data in raw_result.items():
status = site_data.get("status", "")
url = site_data.get("url", "")
if status == "found" and url:
found.append({"site": site_name, "url": url})
elif status == "not found":
missing.append(site_name)
else:
errors.append({"site": site_name, "error": status or "unknown"})
# Compute totals from the original site list if provided
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
return {
"schema_version": SCHEMA_VERSION,
"query": username,
"timestamp": datetime.now(timezone.utc).isoformat(),
"found": found,
"missing": missing,
"errors": errors,
"metadata": {
"total_sites_checked": total_sites,
"found_count": len(found),
"missing_count": len(missing),
"error_count": len(errors),
},
}
def run_sherlock(
username: str,
sites: Optional[List[str]] = None,
timeout: Optional[int] = None,
opt_in: bool = False
) -> Dict[str, Any]:
"""
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
"""
require_opt_in(opt_in)
# Compute cache key
query_hash = compute_query_hash(username, sites)
# Check cache first — avoids dependency requirement on cache hit
cached = get_cached_result(query_hash)
if cached is not None:
return cached
# Only require sherlock on cache miss
if not check_sherlock_available():
raise RuntimeError(
"Sherlock Python package not installed. "
"Install with: pip install sherlock-project"
)
# Call sherlock
try:
import sherlock
from sherlock import sherlock as sherlock_main # type: ignore
if sites:
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
else:
result = sherlock_main(username, timeout=timeout or 10)
normalized = normalize_sherlock_output(result, username, sites)
save_to_cache(query_hash, normalized)
return normalized
except Exception as e:
raise RuntimeError(f"Sherlock execution failed: {e}") from e
def main() -> int:
parser = argparse.ArgumentParser(
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
)
parser.add_argument(
"--query", "-q", required=True,
help="Username to search across sites"
)
parser.add_argument(
"--opt-in", action="store_true",
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
)
parser.add_argument(
"--sites", "-s", nargs="+",
help="Specific sites to check (default: all supported)"
)
parser.add_argument(
"--timeout", "-t", type=int, default=10,
help="Request timeout per site (default: 10)"
)
parser.add_argument(
"--json", action="store_true",
help="Output normalized JSON to stdout"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass cached result (if any)"
)
args = parser.parse_args()
try:
result = run_sherlock(
username=args.query,
sites=args.sites,
timeout=args.timeout,
opt_in=args.opt_in
)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Query: {result['query']}")
print(f"Found: {result['metadata']['found_count']} site(s)")
print(f"Missing: {result['metadata']['missing_count']} site(s)")
print(f"Errors: {result['metadata']['error_count']} site(s)")
for f in result['found']:
print(f" [{f['site']}] {f['url']}")
return 0
except RuntimeError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())