219 lines
8.0 KiB
Python
219 lines
8.0 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Status/reporting helper for the codebase genome pipeline.
|
||
|
|
|
||
|
|
This lands a parent-epic slice for timmy-home #665 by making the current genome
|
||
|
|
coverage across repos inspectable: which repos have artifacts, which have tests,
|
||
|
|
what duplicates exist, and which repo is still uncovered next.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Iterable
|
||
|
|
import urllib.request
|
||
|
|
|
||
|
|
|
||
|
|
def artifact_repo_name(path: Path, host_repo_name: str = 'timmy-home') -> str | None:
|
||
|
|
normalized = path.as_posix()
|
||
|
|
name = path.name
|
||
|
|
if normalized == 'GENOME.md':
|
||
|
|
return host_repo_name
|
||
|
|
if path.parts[:1] == ('genomes',) and name == 'GENOME.md' and len(path.parts) == 3:
|
||
|
|
return path.parts[1]
|
||
|
|
if path.parts[:1] == ('genomes',) and name.endswith('-GENOME.md'):
|
||
|
|
return name[:-len('-GENOME.md')]
|
||
|
|
if path.parent == Path('.') and name.startswith('GENOME-') and name.endswith('.md'):
|
||
|
|
return name[len('GENOME-'):-len('.md')]
|
||
|
|
if path.parent == Path('.') and name.endswith('-GENOME.md'):
|
||
|
|
return name[:-len('-GENOME.md')]
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def test_repo_name(path: Path, host_repo_name: str = 'timmy-home') -> str | None:
|
||
|
|
if path.name == 'test_codebase_genome_pipeline.py':
|
||
|
|
return host_repo_name
|
||
|
|
stem = path.stem
|
||
|
|
if not stem.startswith('test_') or not stem.endswith('_genome'):
|
||
|
|
return None
|
||
|
|
middle = stem[len('test_'):-len('_genome')]
|
||
|
|
return middle.replace('_', '-') if middle else None
|
||
|
|
|
||
|
|
|
||
|
|
def scan_artifacts(repo_root: Path, host_repo_name: str = 'timmy-home') -> dict[str, list[str]]:
|
||
|
|
artifacts: dict[str, list[str]] = {}
|
||
|
|
for path in sorted(repo_root.rglob('*.md')):
|
||
|
|
rel = path.relative_to(repo_root)
|
||
|
|
repo_name = artifact_repo_name(rel, host_repo_name=host_repo_name)
|
||
|
|
if repo_name is None:
|
||
|
|
continue
|
||
|
|
artifacts.setdefault(repo_name, []).append(rel.as_posix())
|
||
|
|
return artifacts
|
||
|
|
|
||
|
|
|
||
|
|
def scan_tests(repo_root: Path, host_repo_name: str = 'timmy-home') -> set[str]:
|
||
|
|
tests = set()
|
||
|
|
tests_root = repo_root / 'tests'
|
||
|
|
if not tests_root.exists():
|
||
|
|
return tests
|
||
|
|
for path in sorted(tests_root.rglob('test_*.py')):
|
||
|
|
repo_name = test_repo_name(path.relative_to(repo_root), host_repo_name=host_repo_name)
|
||
|
|
if repo_name:
|
||
|
|
tests.add(repo_name)
|
||
|
|
return tests
|
||
|
|
|
||
|
|
|
||
|
|
def build_status_summary(
|
||
|
|
*,
|
||
|
|
repo_root: str | Path,
|
||
|
|
expected_repos: Iterable[str],
|
||
|
|
state: dict | None = None,
|
||
|
|
host_repo_name: str = 'timmy-home',
|
||
|
|
) -> dict:
|
||
|
|
root = Path(repo_root)
|
||
|
|
expected = list(expected_repos)
|
||
|
|
artifacts = scan_artifacts(root, host_repo_name=host_repo_name)
|
||
|
|
tested_repos = scan_tests(root, host_repo_name=host_repo_name)
|
||
|
|
|
||
|
|
coverage = {}
|
||
|
|
duplicates = {}
|
||
|
|
for repo in sorted(artifacts):
|
||
|
|
paths = artifacts[repo]
|
||
|
|
coverage[repo] = {
|
||
|
|
'artifact_paths': paths,
|
||
|
|
'has_test': repo in tested_repos,
|
||
|
|
}
|
||
|
|
if len(paths) > 1:
|
||
|
|
duplicates[repo] = paths
|
||
|
|
|
||
|
|
missing_repos = [repo for repo in expected if repo not in artifacts]
|
||
|
|
next_uncovered_repo = missing_repos[0] if missing_repos else None
|
||
|
|
|
||
|
|
return {
|
||
|
|
'generated_at': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'),
|
||
|
|
'total_expected_repos': len(expected),
|
||
|
|
'artifact_count': len(artifacts),
|
||
|
|
'tested_artifact_count': sum(1 for repo in artifacts if repo in tested_repos),
|
||
|
|
'last_repo': (state or {}).get('last_repo'),
|
||
|
|
'next_uncovered_repo': next_uncovered_repo,
|
||
|
|
'missing_repos': missing_repos,
|
||
|
|
'duplicates': duplicates,
|
||
|
|
'artifacts': coverage,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def render_markdown(summary: dict) -> str:
|
||
|
|
lines = [
|
||
|
|
'# Codebase Genome Status',
|
||
|
|
'',
|
||
|
|
f"Generated: {summary['generated_at']}",
|
||
|
|
'',
|
||
|
|
'## Summary',
|
||
|
|
'',
|
||
|
|
f"- expected repos: {summary['total_expected_repos']}",
|
||
|
|
f"- repos with genome artifacts: {summary['artifact_count']}",
|
||
|
|
f"- repos with genome tests: {summary['tested_artifact_count']}",
|
||
|
|
]
|
||
|
|
if summary.get('last_repo'):
|
||
|
|
lines.append(f"- last repo processed by nightly rotation: {summary['last_repo']}")
|
||
|
|
if summary.get('next_uncovered_repo'):
|
||
|
|
lines.append(f"- next uncovered repo: {summary['next_uncovered_repo']}")
|
||
|
|
|
||
|
|
lines += [
|
||
|
|
'',
|
||
|
|
'## Coverage Matrix',
|
||
|
|
'',
|
||
|
|
'| Repo | Artifact Paths | Test? |',
|
||
|
|
'|------|----------------|-------|',
|
||
|
|
]
|
||
|
|
for repo, data in summary['artifacts'].items():
|
||
|
|
artifact_paths = '<br>'.join(data['artifact_paths'])
|
||
|
|
has_test = 'yes' if data['has_test'] else 'no'
|
||
|
|
lines.append(f'| `{repo}` | `{artifact_paths}` | {has_test} |')
|
||
|
|
|
||
|
|
lines += ['', '## Missing Repo Artifacts', '']
|
||
|
|
if summary['missing_repos']:
|
||
|
|
for repo in summary['missing_repos']:
|
||
|
|
lines.append(f'- `{repo}`')
|
||
|
|
else:
|
||
|
|
lines.append('- none')
|
||
|
|
|
||
|
|
lines += ['', '## Duplicate Artifact Paths', '']
|
||
|
|
if summary['duplicates']:
|
||
|
|
for repo, paths in summary['duplicates'].items():
|
||
|
|
lines.append(f'- `{repo}`')
|
||
|
|
for path in paths:
|
||
|
|
lines.append(f' - `{path}`')
|
||
|
|
else:
|
||
|
|
lines.append('- none')
|
||
|
|
|
||
|
|
return '\n'.join(lines) + '\n'
|
||
|
|
|
||
|
|
|
||
|
|
def load_state(path: str | Path | None) -> dict:
|
||
|
|
if not path:
|
||
|
|
return {}
|
||
|
|
state_path = Path(path).expanduser()
|
||
|
|
if not state_path.exists():
|
||
|
|
return {}
|
||
|
|
return json.loads(state_path.read_text(encoding='utf-8'))
|
||
|
|
|
||
|
|
|
||
|
|
def fetch_org_repo_names(org: str, host: str, token_file: str | Path, *, include_archived: bool = False) -> list[str]:
|
||
|
|
token = Path(token_file).expanduser().read_text(encoding='utf-8').strip()
|
||
|
|
headers = {'Authorization': f'token {token}', 'Accept': 'application/json'}
|
||
|
|
repos = []
|
||
|
|
page = 1
|
||
|
|
while True:
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{host.rstrip('/')}/api/v1/orgs/{org}/repos?limit=100&page={page}",
|
||
|
|
headers=headers,
|
||
|
|
)
|
||
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
|
|
batch = json.loads(resp.read().decode('utf-8'))
|
||
|
|
if not batch:
|
||
|
|
break
|
||
|
|
for repo in batch:
|
||
|
|
if repo.get('archived') and not include_archived:
|
||
|
|
continue
|
||
|
|
name = repo['name']
|
||
|
|
if name.startswith('.'):
|
||
|
|
continue
|
||
|
|
repos.append(name)
|
||
|
|
if len(batch) < 100:
|
||
|
|
break
|
||
|
|
page += 1
|
||
|
|
return sorted(set(repos))
|
||
|
|
|
||
|
|
|
||
|
|
def main(argv: list[str] | None = None) -> int:
|
||
|
|
parser = argparse.ArgumentParser(description='Summarize codebase genome coverage across repos')
|
||
|
|
parser.add_argument('--repo-root', default='.', help='timmy-home repo root')
|
||
|
|
parser.add_argument('--expected-repos-json', help='JSON array of expected repo names')
|
||
|
|
parser.add_argument('--org', default='Timmy_Foundation', help='Gitea org to inspect when expected repos are not provided')
|
||
|
|
parser.add_argument('--host', default='https://forge.alexanderwhitestone.com', help='Gitea host')
|
||
|
|
parser.add_argument('--token-file', default='~/.config/gitea/token', help='Gitea token file for org scan fallback')
|
||
|
|
parser.add_argument('--include-archived', action='store_true', help='Include archived repos in org scan fallback')
|
||
|
|
parser.add_argument('--state-path', default='~/.timmy/codebase_genome_state.json')
|
||
|
|
parser.add_argument('--output', help='Optional markdown output path')
|
||
|
|
args = parser.parse_args(argv)
|
||
|
|
|
||
|
|
if args.expected_repos_json:
|
||
|
|
expected = json.loads(args.expected_repos_json)
|
||
|
|
else:
|
||
|
|
expected = fetch_org_repo_names(args.org, args.host, args.token_file, include_archived=args.include_archived)
|
||
|
|
summary = build_status_summary(repo_root=args.repo_root, expected_repos=expected, state=load_state(args.state_path))
|
||
|
|
rendered = render_markdown(summary)
|
||
|
|
print(rendered)
|
||
|
|
if args.output:
|
||
|
|
out = Path(args.output)
|
||
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
out.write_text(rendered, encoding='utf-8')
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
raise SystemExit(main())
|