172 lines
6.3 KiB
Python
172 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Nightly runner for the codebase genome pipeline."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
|
|
class RunPlan(NamedTuple):
|
|
repo: dict
|
|
repo_dir: Path
|
|
output_path: Path
|
|
command: list[str]
|
|
|
|
|
|
def load_state(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {}
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def save_state(path: Path, state: dict) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
|
|
def select_next_repo(repos: list[dict], state: dict) -> dict:
|
|
if not repos:
|
|
raise ValueError("no repositories available for nightly genome run")
|
|
ordered = sorted(repos, key=lambda item: item.get("full_name", item.get("name", "")).lower())
|
|
last_repo = state.get("last_repo")
|
|
for index, repo in enumerate(ordered):
|
|
if repo.get("name") == last_repo or repo.get("full_name") == last_repo:
|
|
return ordered[(index + 1) % len(ordered)]
|
|
last_index = int(state.get("last_index", -1))
|
|
return ordered[(last_index + 1) % len(ordered)]
|
|
|
|
|
|
def build_run_plan(repo: dict, workspace_root: Path, output_root: Path, pipeline_script: Path) -> RunPlan:
|
|
repo_dir = workspace_root / repo["name"]
|
|
output_path = output_root / repo["name"] / "GENOME.md"
|
|
command = [
|
|
sys.executable,
|
|
str(pipeline_script),
|
|
"--repo-root",
|
|
str(repo_dir),
|
|
"--repo-name",
|
|
repo.get("full_name", repo["name"]),
|
|
"--output",
|
|
str(output_path),
|
|
]
|
|
return RunPlan(repo=repo, repo_dir=repo_dir, output_path=output_path, command=command)
|
|
|
|
|
|
def fetch_org_repos(org: str, host: str, token_file: Path, include_archived: bool = False) -> list[dict]:
|
|
token = token_file.read_text(encoding="utf-8").strip()
|
|
page = 1
|
|
repos: list[dict] = []
|
|
while True:
|
|
req = urllib.request.Request(
|
|
f"{host.rstrip('/')}/api/v1/orgs/{org}/repos?limit=100&page={page}",
|
|
headers={"Authorization": f"token {token}", "Accept": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
chunk = json.loads(resp.read().decode("utf-8"))
|
|
if not chunk:
|
|
break
|
|
for item in chunk:
|
|
if item.get("archived") and not include_archived:
|
|
continue
|
|
repos.append(
|
|
{
|
|
"name": item["name"],
|
|
"full_name": item["full_name"],
|
|
"clone_url": item["clone_url"],
|
|
"default_branch": item.get("default_branch") or "main",
|
|
}
|
|
)
|
|
page += 1
|
|
return repos
|
|
|
|
|
|
def _authenticated_clone_url(clone_url: str, token_file: Path) -> str:
|
|
token = token_file.read_text(encoding="utf-8").strip()
|
|
if clone_url.startswith("https://"):
|
|
return f"https://{token}@{clone_url[len('https://') :]}"
|
|
return clone_url
|
|
|
|
|
|
def ensure_checkout(repo: dict, workspace_root: Path, token_file: Path) -> Path:
|
|
workspace_root.mkdir(parents=True, exist_ok=True)
|
|
repo_dir = workspace_root / repo["name"]
|
|
branch = repo.get("default_branch") or "main"
|
|
clone_url = _authenticated_clone_url(repo["clone_url"], token_file)
|
|
|
|
if (repo_dir / ".git").exists():
|
|
subprocess.run(["git", "-C", str(repo_dir), "fetch", "origin", branch, "--depth", "1"], check=True)
|
|
subprocess.run(["git", "-C", str(repo_dir), "checkout", branch], check=True)
|
|
subprocess.run(["git", "-C", str(repo_dir), "reset", "--hard", f"origin/{branch}"], check=True)
|
|
else:
|
|
subprocess.run(
|
|
["git", "clone", "--depth", "1", "--single-branch", "--branch", branch, clone_url, str(repo_dir)],
|
|
check=True,
|
|
)
|
|
return repo_dir
|
|
|
|
|
|
def run_plan(plan: RunPlan) -> None:
|
|
plan.output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
subprocess.run(plan.command, check=True)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Run one nightly codebase genome pass for the next repo in an org")
|
|
parser.add_argument("--org", default="Timmy_Foundation")
|
|
parser.add_argument("--host", default="https://forge.alexanderwhitestone.com")
|
|
parser.add_argument("--token-file", default=os.path.expanduser("~/.config/gitea/token"))
|
|
parser.add_argument("--workspace-root", default=os.path.expanduser("~/timmy-foundation-repos"))
|
|
parser.add_argument("--output-root", default=os.path.expanduser("~/.timmy/codebase-genomes"))
|
|
parser.add_argument("--state-path", default=os.path.expanduser("~/.timmy/codebase_genome_state.json"))
|
|
parser.add_argument("--pipeline-script", default=str(Path(__file__).resolve().parents[1] / "pipelines" / "codebase_genome.py"))
|
|
parser.add_argument("--include-archived", action="store_true")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
token_file = Path(args.token_file).expanduser()
|
|
workspace_root = Path(args.workspace_root).expanduser()
|
|
output_root = Path(args.output_root).expanduser()
|
|
state_path = Path(args.state_path).expanduser()
|
|
pipeline_script = Path(args.pipeline_script).expanduser()
|
|
|
|
repos = fetch_org_repos(args.org, args.host, token_file, include_archived=args.include_archived)
|
|
state = load_state(state_path)
|
|
repo = select_next_repo(repos, state)
|
|
plan = build_run_plan(repo, workspace_root=workspace_root, output_root=output_root, pipeline_script=pipeline_script)
|
|
|
|
if args.dry_run:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"repo": repo,
|
|
"repo_dir": str(plan.repo_dir),
|
|
"output_path": str(plan.output_path),
|
|
"command": plan.command,
|
|
},
|
|
indent=2,
|
|
)
|
|
)
|
|
return
|
|
|
|
ensure_checkout(repo, workspace_root=workspace_root, token_file=token_file)
|
|
run_plan(plan)
|
|
save_state(
|
|
state_path,
|
|
{
|
|
"last_index": sorted(repos, key=lambda item: item.get("full_name", item.get("name", "")).lower()).index(repo),
|
|
"last_repo": repo.get("name"),
|
|
},
|
|
)
|
|
print(f"Completed genome run for {repo['full_name']} -> {plan.output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|