Files
timmy-home/scripts/twitter_archive/pipeline_health.py
2026-03-27 18:09:28 -04:00

67 lines
2.0 KiB
Python

#!/usr/bin/env python3
"""Check the private archive pipeline for missing inputs or stalled stages."""
from __future__ import annotations
import json
from .common import (
ARCHIVE_DIR,
CANDIDATES_DIR,
CHECKPOINT_FILE,
EXTRACTED_DIR,
NOTES_DIR,
PROGRESS_FILE,
TRAINING_DPO_DIR,
ensure_layout,
latest_file,
load_json,
resolve_source_dir,
write_progress_snapshot,
)
def main() -> None:
ensure_layout()
source_dir = resolve_source_dir()
source_file = source_dir / "tweets.js"
checkpoint = load_json(CHECKPOINT_FILE, {})
status = {
"workspace": str(ARCHIVE_DIR),
"source_dir": str(source_dir),
"source_present": source_file.exists(),
"checkpoint_present": CHECKPOINT_FILE.exists(),
"checkpoint_valid": bool(checkpoint),
"extracted_manifest_present": (EXTRACTED_DIR / "manifest.json").exists(),
"tweets_present": (EXTRACTED_DIR / "tweets.jsonl").exists(),
"retweets_present": (EXTRACTED_DIR / "retweets.jsonl").exists(),
"latest_note": latest_file(NOTES_DIR, "batch_*.md").name
if latest_file(NOTES_DIR, "batch_*.md")
else None,
"latest_candidate": latest_file(CANDIDATES_DIR, "batch_*.json").name
if latest_file(CANDIDATES_DIR, "batch_*.json")
else None,
"latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name
if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl")
else None,
}
issues = []
if not status["source_present"]:
issues.append("missing raw archive source")
if not status["extracted_manifest_present"]:
issues.append("archive not extracted")
if status["latest_candidate"] and not status["latest_dpo_file"]:
issues.append("batch artifacts exist but DPO pairs have not been built")
snapshot = write_progress_snapshot()
status["progress"] = snapshot
status["issues"] = issues
status["ok"] = not issues
print(json.dumps(status, sort_keys=True))
if __name__ == "__main__":
main()