From cf5c763d0e6eb563c899a6ea65c5239ac96c5502 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Fri, 27 Mar 2026 18:09:28 -0400 Subject: [PATCH] feat: add private twitter archive pipeline scripts --- .gitignore | 1 + scripts/__init__.py | 1 + scripts/twitter_archive/__init__.py | 1 + scripts/twitter_archive/build_dpo_pairs.py | 85 ++++++ scripts/twitter_archive/common.py | 256 ++++++++++++++++++ .../twitter_archive/consolidate_profile.py | 150 ++++++++++ scripts/twitter_archive/evaluate_candidate.py | 69 +++++ scripts/twitter_archive/extract_archive.py | 100 +++++++ scripts/twitter_archive/pipeline_health.py | 66 +++++ specs/twitter-archive-learning-pipeline.md | 112 ++++++++ ...itter-archive-pipeline-config.example.json | 5 + 11 files changed, 846 insertions(+) create mode 100644 scripts/__init__.py create mode 100644 scripts/twitter_archive/__init__.py create mode 100644 scripts/twitter_archive/build_dpo_pairs.py create mode 100644 scripts/twitter_archive/common.py create mode 100644 scripts/twitter_archive/consolidate_profile.py create mode 100644 scripts/twitter_archive/evaluate_candidate.py create mode 100644 scripts/twitter_archive/extract_archive.py create mode 100644 scripts/twitter_archive/pipeline_health.py create mode 100644 specs/twitter-archive-learning-pipeline.md create mode 100644 specs/twitter-archive-pipeline-config.example.json diff --git a/.gitignore b/.gitignore index ae7a06a..103df1c 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ pairing/ # Already separate repos timmy-config/ timmy-telemetry/ +twitter-archive/ # Python __pycache__/ diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..2400b15 --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Tracked helper scripts for Timmy's local workspace.""" diff --git a/scripts/twitter_archive/__init__.py b/scripts/twitter_archive/__init__.py new file mode 100644 index 0000000..a7ad0c9 --- /dev/null +++ b/scripts/twitter_archive/__init__.py @@ -0,0 +1 @@ +"""Private Twitter archive pipeline helpers.""" diff --git a/scripts/twitter_archive/build_dpo_pairs.py b/scripts/twitter_archive/build_dpo_pairs.py new file mode 100644 index 0000000..e28b135 --- /dev/null +++ b/scripts/twitter_archive/build_dpo_pairs.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""Build local-only DPO pairs from archive batch artifacts.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone + +from .common import ( + CANDIDATES_DIR, + TRAINING_DPO_DIR, + ensure_layout, + gather_evidence_ids, + load_json, + write_json, + append_jsonl, + write_progress_snapshot, +) + +STATE_FILE = TRAINING_DPO_DIR / "processed_batches.json" + + +def main() -> None: + ensure_layout() + state = load_json(STATE_FILE, {"processed_batches": []}) + processed = set(state.get("processed_batches", [])) + date_key = datetime.now(timezone.utc).strftime("%Y%m%d") + output_file = TRAINING_DPO_DIR / f"pairs_{date_key}.jsonl" + + new_pairs = [] + newly_processed = [] + for path in sorted(CANDIDATES_DIR.glob("batch_*.json")): + batch = load_json(path, {}) + batch_id = batch.get("batch_id", path.stem) + if batch_id in processed: + continue + prompt = batch.get("prompt", "").strip() + chosen = batch.get("chosen", "").strip() + rejected = batch.get("rejected", "").strip() + if not prompt or not chosen or not rejected: + continue + + evidence_ids = gather_evidence_ids(batch) + safety_flags = ["archive-private", "evidence-required"] + if not evidence_ids: + safety_flags.append("missing-evidence") + + new_pairs.append( + { + "prompt": prompt, + "chosen": chosen, + "rejected": rejected, + "evidence_ids": evidence_ids, + "source_session": { + "draft": batch.get("draft_session_id"), + "critique": batch.get("critique_session_id"), + }, + "task_type": "analysis", + "rubric_scores": batch.get("rubric_scores", {}), + "batch_id": batch_id, + "safety_flags": safety_flags, + } + ) + newly_processed.append(batch_id) + + append_jsonl(output_file, new_pairs) + state["processed_batches"] = sorted(processed.union(newly_processed)) + write_json(STATE_FILE, state) + snapshot = write_progress_snapshot() + print( + json.dumps( + { + "status": "ok", + "pairs_written": len(new_pairs), + "output_file": output_file.name, + "processed_batches": len(state["processed_batches"]), + "progress": snapshot, + }, + sort_keys=True, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/twitter_archive/common.py b/scripts/twitter_archive/common.py new file mode 100644 index 0000000..2c57b9a --- /dev/null +++ b/scripts/twitter_archive/common.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +"""Shared helpers for the private Twitter archive pipeline.""" + +from __future__ import annotations + +import hashlib +import json +import os +import re +from copy import deepcopy +from pathlib import Path +from typing import Any + +TIMMY_HOME = Path.home() / ".timmy" +ARCHIVE_DIR = TIMMY_HOME / "twitter-archive" +EXTRACTED_DIR = ARCHIVE_DIR / "extracted" +NOTES_DIR = ARCHIVE_DIR / "notes" +KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge" +CANDIDATES_DIR = KNOWLEDGE_DIR / "candidates" +INSIGHTS_DIR = ARCHIVE_DIR / "insights" +TRAINING_DIR = ARCHIVE_DIR / "training" +TRAINING_EXAMPLES_DIR = TRAINING_DIR / "examples" +TRAINING_DPO_DIR = TRAINING_DIR / "dpo" +TRAINING_EVALS_DIR = TRAINING_DIR / "evals" +TRAINING_RUNS_DIR = TRAINING_DIR / "runs" +METRICS_DIR = ARCHIVE_DIR / "metrics" +PROFILE_FILE = KNOWLEDGE_DIR / "profile.json" +CHANGES_FILE = KNOWLEDGE_DIR / "changes.jsonl" +CHECKPOINT_FILE = ARCHIVE_DIR / "checkpoint.json" +PROGRESS_FILE = METRICS_DIR / "progress.json" +SOURCE_CONFIG_FILE = ARCHIVE_DIR / "source_config.json" +PIPELINE_CONFIG_FILE = ARCHIVE_DIR / "pipeline_config.json" +DEFAULT_SOURCE_DIR = ( + Path.home() + / "Downloads" + / "twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e" + / "data" +) +BATCH_SIZE = 50 + + +def deep_default(default: Any) -> Any: + return deepcopy(default) + + +def ensure_layout() -> None: + for path in ( + ARCHIVE_DIR, + EXTRACTED_DIR, + NOTES_DIR, + KNOWLEDGE_DIR, + CANDIDATES_DIR, + INSIGHTS_DIR, + TRAINING_DIR, + TRAINING_EXAMPLES_DIR, + TRAINING_DPO_DIR, + TRAINING_EVALS_DIR, + TRAINING_RUNS_DIR, + METRICS_DIR, + ): + path.mkdir(parents=True, exist_ok=True) + + +def load_json(path: Path, default: Any) -> Any: + if not path.exists(): + return deep_default(default) + try: + return json.loads(path.read_text()) + except json.JSONDecodeError: + return deep_default(default) + + +def write_json(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") + + +def load_jsonl(path: Path) -> list[dict[str, Any]]: + if not path.exists(): + return [] + rows: list[dict[str, Any]] = [] + for line in path.read_text().splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.loads(line)) + return rows + + +def append_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: + if not rows: + return + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "a") as handle: + for row in rows: + handle.write(json.dumps(row, sort_keys=True) + "\n") + + +def stable_sha256(path: Path) -> str: + digest = hashlib.sha256() + with open(path, "rb") as handle: + while True: + chunk = handle.read(1024 * 1024) + if not chunk: + break + digest.update(chunk) + return digest.hexdigest() + + +def slugify(value: str) -> str: + cleaned = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") + return cleaned or "item" + + +def batch_id_from_number(batch_number: int) -> str: + return f"batch_{batch_number:03d}" + + +def default_checkpoint() -> dict[str, Any]: + return { + "data_source": "tweets", + "batch_size": BATCH_SIZE, + "next_offset": 0, + "batches_completed": 0, + "phase": "discovery", + "confidence": "low", + "next_focus": "look for recurring themes and recurring people", + "understanding_version": 0, + "last_batch_id": None, + "last_batch_sessions": {}, + "last_profile_update": None, + "last_dpo_build": None, + "last_insight_file": None, + } + + +def load_checkpoint() -> dict[str, Any]: + checkpoint = default_checkpoint() + checkpoint.update(load_json(CHECKPOINT_FILE, {})) + return checkpoint + + +def resolve_source_dir() -> Path: + env_value = os.environ.get("TIMMY_TWITTER_ARCHIVE_SOURCE") + if env_value: + return Path(os.path.expanduser(env_value)) + + config = load_json(SOURCE_CONFIG_FILE, {}) + configured = config.get("source_path") + if configured: + return Path(os.path.expanduser(configured)) + + return DEFAULT_SOURCE_DIR + + +def load_pipeline_config() -> dict[str, Any]: + return load_json(PIPELINE_CONFIG_FILE, {}) + + +def tweet_text(tweet: dict[str, Any]) -> str: + raw = tweet.get("full_text") or tweet.get("text") or "" + return re.sub(r"\s+", " ", raw).strip() + + +def parse_entities(tweet: dict[str, Any], entity_key: str, value_key: str) -> list[str]: + entities = tweet.get("entities", {}) or {} + items = entities.get(entity_key, []) or [] + return [item.get(value_key, "").strip() for item in items if item.get(value_key)] + + +def normalize_tweet(raw_entry: dict[str, Any], source_file: str) -> dict[str, Any] | None: + tweet = raw_entry.get("tweet", raw_entry) + text = tweet_text(tweet) + tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip() + if not tweet_id or not text: + return None + + normalized = { + "tweet_id": tweet_id, + "created_at": tweet.get("created_at"), + "full_text": text, + "is_retweet": bool(tweet.get("retweeted")) or text.startswith("RT @"), + "reply_to_tweet_id": tweet.get("in_reply_to_status_id_str") + or tweet.get("in_reply_to_status_id"), + "reply_to_user_id": tweet.get("in_reply_to_user_id_str") + or tweet.get("in_reply_to_user_id"), + "lang": tweet.get("lang"), + "favorite_count": int(tweet.get("favorite_count") or 0), + "retweet_count": int(tweet.get("retweet_count") or 0), + "mentions": parse_entities(tweet, "user_mentions", "screen_name"), + "hashtags": parse_entities(tweet, "hashtags", "text"), + "urls": parse_entities(tweet, "urls", "expanded_url"), + "source_file": source_file, + } + return normalized + + +def sort_tweets(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + return sorted(rows, key=lambda row: (row.get("created_at") or "", row["tweet_id"])) + + +def latest_file(directory: Path, pattern: str) -> Path | None: + files = sorted(directory.glob(pattern)) + return files[-1] if files else None + + +def count_jsonl_rows(path: Path) -> int: + if not path.exists(): + return 0 + with open(path) as handle: + return sum(1 for line in handle if line.strip()) + + +def gather_evidence_ids(batch_payload: dict[str, Any]) -> list[str]: + ids = set(batch_payload.get("tweet_ids", [])) + for candidate in batch_payload.get("knowledge_candidates", []): + ids.update(str(tweet_id) for tweet_id in candidate.get("evidence_tweet_ids", [])) + return sorted(ids) + + +def compute_progress_snapshot() -> dict[str, Any]: + checkpoint = load_checkpoint() + profile = load_json(PROFILE_FILE, {"claims": []}) + dpo_files = sorted(TRAINING_DPO_DIR.glob("pairs_*.jsonl")) + dpo_pairs = sum(count_jsonl_rows(path) for path in dpo_files) + durable_claims = [ + claim for claim in profile.get("claims", []) if claim.get("status") == "durable" + ] + snapshot = { + "batches_completed": checkpoint.get("batches_completed", 0), + "next_offset": checkpoint.get("next_offset", 0), + "phase": checkpoint.get("phase", "discovery"), + "candidate_batches": len(list(CANDIDATES_DIR.glob("batch_*.json"))), + "durable_claims": len(durable_claims), + "training_examples": sum( + count_jsonl_rows(path) for path in TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl") + ), + "dpo_pair_files": len(dpo_files), + "dpo_pairs": dpo_pairs, + "latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name + if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl") + else None, + "latest_note": latest_file(NOTES_DIR, "batch_*.md").name + if latest_file(NOTES_DIR, "batch_*.md") + else None, + "latest_eval": latest_file(TRAINING_EVALS_DIR, "run_*.json").name + if latest_file(TRAINING_EVALS_DIR, "run_*.json") + else None, + } + return snapshot + + +def write_progress_snapshot() -> dict[str, Any]: + snapshot = compute_progress_snapshot() + write_json(PROGRESS_FILE, snapshot) + return snapshot diff --git a/scripts/twitter_archive/consolidate_profile.py b/scripts/twitter_archive/consolidate_profile.py new file mode 100644 index 0000000..02d4ffd --- /dev/null +++ b/scripts/twitter_archive/consolidate_profile.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +"""Merge batch knowledge candidates into a deterministic profile.""" + +from __future__ import annotations + +import hashlib +import json +from collections import defaultdict + +from .common import ( + CANDIDATES_DIR, + CHANGES_FILE, + PROFILE_FILE, + ensure_layout, + load_json, + load_jsonl, + slugify, + write_json, + append_jsonl, + write_progress_snapshot, +) + + +def claim_id(category: str, claim: str) -> str: + digest = hashlib.sha1(f"{category}:{claim.strip().lower()}".encode()).hexdigest()[:12] + return f"{slugify(category)}-{digest}" + + +def contradiction_matches(target_claim: str, contradiction_hint: str) -> bool: + target = target_claim.strip().lower() + hint = contradiction_hint.strip().lower() + return bool(hint) and (hint in target or target in hint) + + +def main() -> None: + ensure_layout() + old_profile = load_json(PROFILE_FILE, {"claims": []}) + old_by_id = {claim["id"]: claim for claim in old_profile.get("claims", [])} + + aggregates: dict[str, dict] = {} + contradiction_hints: defaultdict[str, list[str]] = defaultdict(list) + batch_files = sorted(CANDIDATES_DIR.glob("batch_*.json")) + for path in batch_files: + batch = load_json(path, {}) + batch_id = batch.get("batch_id", path.stem) + for candidate in batch.get("knowledge_candidates", []): + claim = candidate.get("claim", "").strip() + category = candidate.get("category", "recurring-theme").strip() or "recurring-theme" + if not claim: + continue + item_id = claim_id(category, claim) + existing = aggregates.setdefault( + item_id, + { + "id": item_id, + "category": category, + "claim": claim, + "evidence_tweet_ids": set(), + "evidence_quotes": [], + "confidence": 0.0, + "status": "provisional", + "first_seen_at": batch_id, + "last_confirmed_at": batch_id, + "contradicts": [], + "support_count": 0, + }, + ) + existing["support_count"] += 1 + existing["confidence"] = max( + existing["confidence"], float(candidate.get("confidence", 0.0) or 0.0) + ) + existing["last_confirmed_at"] = batch_id + for tweet_id in candidate.get("evidence_tweet_ids", []): + existing["evidence_tweet_ids"].add(str(tweet_id)) + for quote in candidate.get("evidence_quotes", []): + if quote and quote not in existing["evidence_quotes"]: + existing["evidence_quotes"].append(quote) + for hint in candidate.get("contradicts", []) or []: + if hint: + contradiction_hints[item_id].append(str(hint)) + + for item_id, claim in aggregates.items(): + if claim["confidence"] >= 0.85 or len(claim["evidence_tweet_ids"]) >= 2: + claim["status"] = "durable" + if contradiction_hints.get(item_id): + claim["contradicts"] = sorted(set(contradiction_hints[item_id])) + + for source_id, hints in contradiction_hints.items(): + for hint in hints: + for target_id, target in aggregates.items(): + if target_id == source_id: + continue + if contradiction_matches(target["claim"], hint): + target["status"] = "retracted" + + claims = [] + for item_id in sorted(aggregates): + claim = aggregates[item_id] + claims.append( + { + "id": claim["id"], + "category": claim["category"], + "claim": claim["claim"], + "evidence_tweet_ids": sorted(claim["evidence_tweet_ids"]), + "evidence_quotes": claim["evidence_quotes"][:5], + "confidence": round(claim["confidence"], 3), + "status": claim["status"], + "first_seen_at": claim["first_seen_at"], + "last_confirmed_at": claim["last_confirmed_at"], + "contradicts": claim["contradicts"], + } + ) + + profile = { + "schema_version": 1, + "claim_count": len(claims), + "durable_claims": sum(1 for claim in claims if claim["status"] == "durable"), + "retracted_claims": sum(1 for claim in claims if claim["status"] == "retracted"), + "claims": claims, + } + write_json(PROFILE_FILE, profile) + + existing_events = { + event.get("event_id") for event in load_jsonl(CHANGES_FILE) if event.get("event_id") + } + new_events = [] + for claim in claims: + previous = old_by_id.get(claim["id"]) + if previous == claim: + continue + event_id = f"{claim['id']}:{claim['status']}:{claim['last_confirmed_at']}" + if event_id in existing_events: + continue + new_events.append( + { + "event_id": event_id, + "claim_id": claim["id"], + "change_type": "new" if previous is None else "updated", + "status": claim["status"], + "batch_id": claim["last_confirmed_at"], + } + ) + + append_jsonl(CHANGES_FILE, new_events) + snapshot = write_progress_snapshot() + print(json.dumps({"status": "ok", "claims": len(claims), "events": len(new_events), "progress": snapshot}, sort_keys=True)) + + +if __name__ == "__main__": + main() diff --git a/scripts/twitter_archive/evaluate_candidate.py b/scripts/twitter_archive/evaluate_candidate.py new file mode 100644 index 0000000..f5d4e95 --- /dev/null +++ b/scripts/twitter_archive/evaluate_candidate.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +"""Evaluate whether the latest archive candidate is promotable.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from .common import TRAINING_EVALS_DIR + + +def load_eval(path: Path) -> dict: + return json.loads(path.read_text()) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Evaluate archive candidate promotion gates") + parser.add_argument("--eval-file", default="", help="Specific eval JSON file to inspect") + args = parser.parse_args() + + if args.eval_file: + eval_path = Path(args.eval_file).expanduser() + else: + eval_files = sorted(TRAINING_EVALS_DIR.glob("run_*.json")) + if not eval_files: + print(json.dumps({"pass": False, "reason": "missing eval file"})) + return + eval_path = eval_files[-1] + + payload = load_eval(eval_path) + baseline = float(payload.get("baseline_composite", 0.0)) + candidate = float(payload.get("candidate_composite", 0.0)) + improvement = 0.0 + if baseline > 0: + improvement = (candidate - baseline) / baseline + + refusal_regression = bool(payload.get("refusal_over_fabrication_regression", True)) + source_regression = bool(payload.get("source_distinction_regression", True)) + evidence_rate = float(payload.get("evidence_citation_rate", 0.0)) + + gate_pass = ( + improvement >= 0.05 + and not refusal_regression + and not source_regression + and evidence_rate >= 0.95 + ) + + print( + json.dumps( + { + "pass": gate_pass, + "eval_file": eval_path.name, + "candidate_id": payload.get("candidate_id"), + "baseline_composite": baseline, + "candidate_composite": candidate, + "improvement_pct": round(improvement * 100, 2), + "evidence_citation_rate": evidence_rate, + "refusal_over_fabrication_regression": refusal_regression, + "source_distinction_regression": source_regression, + "rollback_model": payload.get("rollback_model"), + }, + sort_keys=True, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/twitter_archive/extract_archive.py b/scripts/twitter_archive/extract_archive.py new file mode 100644 index 0000000..47f352f --- /dev/null +++ b/scripts/twitter_archive/extract_archive.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""Deterministically extract the private Twitter archive into JSONL.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from .common import ( + ARCHIVE_DIR, + EXTRACTED_DIR, + ensure_layout, + normalize_tweet, + resolve_source_dir, + sort_tweets, + stable_sha256, + write_json, +) + + +def strip_js_prefix(raw_text: str) -> str: + start = raw_text.find("[") + if start == -1: + raise ValueError("Could not find JSON array in tweets.js") + return raw_text[start:].strip() + + +def write_jsonl(path: Path, rows: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as handle: + for row in rows: + handle.write(json.dumps(row, sort_keys=True) + "\n") + + +def build_manifest(source_path: Path, tweets: list[dict], retweets: list[dict]) -> dict: + ordered = sort_tweets(tweets + retweets) + return { + "schema_version": 1, + "source_dir": str(source_path.parent), + "source_file": source_path.name, + "source_sha256": stable_sha256(source_path), + "source_size_bytes": source_path.stat().st_size, + "tweet_count": len(tweets), + "retweet_count": len(retweets), + "earliest_date": ordered[0]["created_at"] if ordered else None, + "latest_date": ordered[-1]["created_at"] if ordered else None, + "fields": [ + "tweet_id", + "created_at", + "full_text", + "is_retweet", + "reply_to_tweet_id", + "reply_to_user_id", + "lang", + "favorite_count", + "retweet_count", + "mentions", + "hashtags", + "urls", + "source_file", + ], + } + + +def main() -> None: + ensure_layout() + source_dir = resolve_source_dir() + source_path = source_dir / "tweets.js" + if not source_path.exists(): + raise SystemExit(json.dumps({"status": "error", "reason": f"missing {source_path}"})) + + raw_payload = strip_js_prefix(source_path.read_text()) + tweet_entries = json.loads(raw_payload) + + tweets: list[dict] = [] + retweets: list[dict] = [] + for entry in tweet_entries: + normalized = normalize_tweet(entry, source_path.name) + if not normalized: + continue + if normalized["is_retweet"]: + retweets.append(normalized) + else: + tweets.append(normalized) + + tweets = sort_tweets(tweets) + retweets = sort_tweets(retweets) + + write_jsonl(EXTRACTED_DIR / "tweets.jsonl", tweets) + write_jsonl(EXTRACTED_DIR / "retweets.jsonl", retweets) + + manifest = build_manifest(source_path, tweets, retweets) + write_json(EXTRACTED_DIR / "manifest.json", manifest) + write_json(ARCHIVE_DIR / "manifest.json", manifest) + + print(json.dumps({"status": "ok", **manifest}, sort_keys=True)) + + +if __name__ == "__main__": + main() diff --git a/scripts/twitter_archive/pipeline_health.py b/scripts/twitter_archive/pipeline_health.py new file mode 100644 index 0000000..1085841 --- /dev/null +++ b/scripts/twitter_archive/pipeline_health.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Check the private archive pipeline for missing inputs or stalled stages.""" + +from __future__ import annotations + +import json + +from .common import ( + ARCHIVE_DIR, + CANDIDATES_DIR, + CHECKPOINT_FILE, + EXTRACTED_DIR, + NOTES_DIR, + PROGRESS_FILE, + TRAINING_DPO_DIR, + ensure_layout, + latest_file, + load_json, + resolve_source_dir, + write_progress_snapshot, +) + + +def main() -> None: + ensure_layout() + source_dir = resolve_source_dir() + source_file = source_dir / "tweets.js" + checkpoint = load_json(CHECKPOINT_FILE, {}) + + status = { + "workspace": str(ARCHIVE_DIR), + "source_dir": str(source_dir), + "source_present": source_file.exists(), + "checkpoint_present": CHECKPOINT_FILE.exists(), + "checkpoint_valid": bool(checkpoint), + "extracted_manifest_present": (EXTRACTED_DIR / "manifest.json").exists(), + "tweets_present": (EXTRACTED_DIR / "tweets.jsonl").exists(), + "retweets_present": (EXTRACTED_DIR / "retweets.jsonl").exists(), + "latest_note": latest_file(NOTES_DIR, "batch_*.md").name + if latest_file(NOTES_DIR, "batch_*.md") + else None, + "latest_candidate": latest_file(CANDIDATES_DIR, "batch_*.json").name + if latest_file(CANDIDATES_DIR, "batch_*.json") + else None, + "latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name + if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl") + else None, + } + + issues = [] + if not status["source_present"]: + issues.append("missing raw archive source") + if not status["extracted_manifest_present"]: + issues.append("archive not extracted") + if status["latest_candidate"] and not status["latest_dpo_file"]: + issues.append("batch artifacts exist but DPO pairs have not been built") + + snapshot = write_progress_snapshot() + status["progress"] = snapshot + status["issues"] = issues + status["ok"] = not issues + print(json.dumps(status, sort_keys=True)) + + +if __name__ == "__main__": + main() diff --git a/specs/twitter-archive-learning-pipeline.md b/specs/twitter-archive-learning-pipeline.md new file mode 100644 index 0000000..ef996f9 --- /dev/null +++ b/specs/twitter-archive-learning-pipeline.md @@ -0,0 +1,112 @@ +# Twitter Archive Learning Pipeline + +This repo owns the tracked code, schemas, prompts, and eval contracts for +Timmy's private Twitter archive learning loop. + +## Privacy Boundary + +- Raw archive files stay outside git. +- Derived runtime artifacts live under `~/.timmy/twitter-archive/`. +- `twitter-archive/` is ignored by `timmy-home` so private notes and training + artifacts do not get pushed by accident. + +Tracked here: +- deterministic extraction and consolidation scripts +- output schemas +- eval gate contract +- prompt/orchestration code in `timmy-config` + +Not tracked here: +- raw tweets +- extracted tweet text +- batch notes +- private profile artifacts +- local-only DPO pairs +- local eval outputs + +## Runtime Layout + +The runtime workspace is: + +```text +~/.timmy/twitter-archive/ + extracted/ + notes/ + knowledge/ + insights/ + training/ + checkpoint.json + metrics/progress.json + source_config.json + pipeline_config.json +``` + +## Source Config + +Optional local file: + +```json +{ + "source_path": "~/Downloads/twitter-.../data" +} +``` + +Environment override: + +```bash +TIMMY_TWITTER_ARCHIVE_SOURCE=~/Downloads/twitter-.../data +``` + +## Knowledge Candidate Schema + +Each batch candidate file contains: + +- `id` +- `category` +- `claim` +- `evidence_tweet_ids` +- `evidence_quotes` +- `confidence` +- `status` +- `first_seen_at` +- `last_confirmed_at` +- `contradicts` + +The consolidator computes durable vs provisional vs retracted from these fields. + +## Candidate Eval Contract + +Local eval JSON files under `~/.timmy/twitter-archive/training/evals/` must use: + +```json +{ + "candidate_id": "timmy-archive-v0.1", + "baseline_composite": 0.71, + "candidate_composite": 0.76, + "refusal_over_fabrication_regression": false, + "source_distinction_regression": false, + "evidence_citation_rate": 0.98, + "rollback_model": "timmy-archive-v0.0" +} +``` + +Promotion gate: + +- candidate composite improves by at least 5% +- no refusal regression +- no source distinction regression +- evidence citation rate stays at or above 95% + +## Training Command Contract + +Optional local file `pipeline_config.json` can define: + +```json +{ + "train_command": "bash -lc 'echo train me'", + "promote_command": "bash -lc 'echo promote me'" +} +``` + +If these commands are absent, the pipeline still prepares artifacts and run +manifests, but training/promotion stays in a ready state instead of executing. diff --git a/specs/twitter-archive-pipeline-config.example.json b/specs/twitter-archive-pipeline-config.example.json new file mode 100644 index 0000000..5ef0987 --- /dev/null +++ b/specs/twitter-archive-pipeline-config.example.json @@ -0,0 +1,5 @@ +{ + "source_path": "~/Downloads/twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e/data", + "train_command": "bash -lc 'echo configure local DPO training here'", + "promote_command": "bash -lc 'echo configure local model promotion here'" +}