#!/usr/bin/env python3 """Shared helpers for the private Twitter archive pipeline.""" from __future__ import annotations import hashlib import json import os import re from copy import deepcopy from pathlib import Path from typing import Any TIMMY_HOME = Path.home() / ".timmy" ARCHIVE_DIR = TIMMY_HOME / "twitter-archive" EXTRACTED_DIR = ARCHIVE_DIR / "extracted" NOTES_DIR = ARCHIVE_DIR / "notes" KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge" CANDIDATES_DIR = KNOWLEDGE_DIR / "candidates" INSIGHTS_DIR = ARCHIVE_DIR / "insights" TRAINING_DIR = ARCHIVE_DIR / "training" TRAINING_EXAMPLES_DIR = TRAINING_DIR / "examples" TRAINING_DPO_DIR = TRAINING_DIR / "dpo" TRAINING_EVALS_DIR = TRAINING_DIR / "evals" TRAINING_RUNS_DIR = TRAINING_DIR / "runs" METRICS_DIR = ARCHIVE_DIR / "metrics" PROFILE_FILE = KNOWLEDGE_DIR / "profile.json" CHANGES_FILE = KNOWLEDGE_DIR / "changes.jsonl" CHECKPOINT_FILE = ARCHIVE_DIR / "checkpoint.json" PROGRESS_FILE = METRICS_DIR / "progress.json" SOURCE_CONFIG_FILE = ARCHIVE_DIR / "source_config.json" PIPELINE_CONFIG_FILE = ARCHIVE_DIR / "pipeline_config.json" DEFAULT_SOURCE_DIR = ( Path.home() / "Downloads" / "twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e" / "data" ) BATCH_SIZE = 50 def deep_default(default: Any) -> Any: return deepcopy(default) def ensure_layout() -> None: for path in ( ARCHIVE_DIR, EXTRACTED_DIR, NOTES_DIR, KNOWLEDGE_DIR, CANDIDATES_DIR, INSIGHTS_DIR, TRAINING_DIR, TRAINING_EXAMPLES_DIR, TRAINING_DPO_DIR, TRAINING_EVALS_DIR, TRAINING_RUNS_DIR, METRICS_DIR, ): path.mkdir(parents=True, exist_ok=True) def load_json(path: Path, default: Any) -> Any: if not path.exists(): return deep_default(default) try: return json.loads(path.read_text()) except json.JSONDecodeError: return deep_default(default) def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") def load_jsonl(path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] rows: list[dict[str, Any]] = [] for line in path.read_text().splitlines(): line = line.strip() if not line: continue rows.append(json.loads(line)) return rows def append_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: if not rows: return path.parent.mkdir(parents=True, exist_ok=True) with open(path, "a") as handle: for row in rows: handle.write(json.dumps(row, sort_keys=True) + "\n") def stable_sha256(path: Path) -> str: digest = hashlib.sha256() with open(path, "rb") as handle: while True: chunk = handle.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def slugify(value: str) -> str: cleaned = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return cleaned or "item" def batch_id_from_number(batch_number: int) -> str: return f"batch_{batch_number:03d}" def default_checkpoint() -> dict[str, Any]: return { "data_source": "tweets", "batch_size": BATCH_SIZE, "next_offset": 0, "batches_completed": 0, "phase": "discovery", "confidence": "low", "next_focus": "look for recurring themes and recurring people", "understanding_version": 0, "last_batch_id": None, "last_batch_sessions": {}, "last_profile_update": None, "last_dpo_build": None, "last_insight_file": None, } def load_checkpoint() -> dict[str, Any]: checkpoint = default_checkpoint() checkpoint.update(load_json(CHECKPOINT_FILE, {})) return checkpoint def resolve_source_dir() -> Path: env_value = os.environ.get("TIMMY_TWITTER_ARCHIVE_SOURCE") if env_value: return Path(os.path.expanduser(env_value)) config = load_json(SOURCE_CONFIG_FILE, {}) configured = config.get("source_path") if configured: return Path(os.path.expanduser(configured)) return DEFAULT_SOURCE_DIR def load_pipeline_config() -> dict[str, Any]: return load_json(PIPELINE_CONFIG_FILE, {}) def tweet_text(tweet: dict[str, Any]) -> str: raw = tweet.get("full_text") or tweet.get("text") or "" return re.sub(r"\s+", " ", raw).strip() def parse_entities(tweet: dict[str, Any], entity_key: str, value_key: str) -> list[str]: entities = tweet.get("entities", {}) or {} items = entities.get(entity_key, []) or [] return [item.get(value_key, "").strip() for item in items if item.get(value_key)] def normalize_tweet(raw_entry: dict[str, Any], source_file: str) -> dict[str, Any] | None: tweet = raw_entry.get("tweet", raw_entry) text = tweet_text(tweet) tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip() if not tweet_id or not text: return None normalized = { "tweet_id": tweet_id, "created_at": tweet.get("created_at"), "full_text": text, "is_retweet": bool(tweet.get("retweeted")) or text.startswith("RT @"), "reply_to_tweet_id": tweet.get("in_reply_to_status_id_str") or tweet.get("in_reply_to_status_id"), "reply_to_user_id": tweet.get("in_reply_to_user_id_str") or tweet.get("in_reply_to_user_id"), "lang": tweet.get("lang"), "favorite_count": int(tweet.get("favorite_count") or 0), "retweet_count": int(tweet.get("retweet_count") or 0), "mentions": parse_entities(tweet, "user_mentions", "screen_name"), "hashtags": parse_entities(tweet, "hashtags", "text"), "urls": parse_entities(tweet, "urls", "expanded_url"), "source_file": source_file, } return normalized def sort_tweets(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: return sorted(rows, key=lambda row: (row.get("created_at") or "", row["tweet_id"])) def latest_file(directory: Path, pattern: str) -> Path | None: files = sorted(directory.glob(pattern)) return files[-1] if files else None def count_jsonl_rows(path: Path) -> int: if not path.exists(): return 0 with open(path) as handle: return sum(1 for line in handle if line.strip()) def gather_evidence_ids(batch_payload: dict[str, Any]) -> list[str]: ids = set(batch_payload.get("tweet_ids", [])) for candidate in batch_payload.get("knowledge_candidates", []): ids.update(str(tweet_id) for tweet_id in candidate.get("evidence_tweet_ids", [])) return sorted(ids) def compute_progress_snapshot() -> dict[str, Any]: checkpoint = load_checkpoint() profile = load_json(PROFILE_FILE, {"claims": []}) dpo_files = sorted(TRAINING_DPO_DIR.glob("pairs_*.jsonl")) dpo_pairs = sum(count_jsonl_rows(path) for path in dpo_files) durable_claims = [ claim for claim in profile.get("claims", []) if claim.get("status") == "durable" ] snapshot = { "batches_completed": checkpoint.get("batches_completed", 0), "next_offset": checkpoint.get("next_offset", 0), "phase": checkpoint.get("phase", "discovery"), "candidate_batches": len(list(CANDIDATES_DIR.glob("batch_*.json"))), "durable_claims": len(durable_claims), "training_examples": sum( count_jsonl_rows(path) for path in TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl") ), "dpo_pair_files": len(dpo_files), "dpo_pairs": dpo_pairs, "latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl") else None, "latest_note": latest_file(NOTES_DIR, "batch_*.md").name if latest_file(NOTES_DIR, "batch_*.md") else None, "latest_eval": latest_file(TRAINING_EVALS_DIR, "run_*.json").name if latest_file(TRAINING_EVALS_DIR, "run_*.json") else None, } return snapshot def write_progress_snapshot() -> dict[str, Any]: snapshot = compute_progress_snapshot() write_json(PROGRESS_FILE, snapshot) return snapshot