Files

257 lines
8.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Shared helpers for the private Twitter archive pipeline."""
from __future__ import annotations
import hashlib
import json
import os
import re
from copy import deepcopy
from pathlib import Path
from typing import Any
TIMMY_HOME = Path.home() / ".timmy"
ARCHIVE_DIR = TIMMY_HOME / "twitter-archive"
EXTRACTED_DIR = ARCHIVE_DIR / "extracted"
NOTES_DIR = ARCHIVE_DIR / "notes"
KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge"
CANDIDATES_DIR = KNOWLEDGE_DIR / "candidates"
INSIGHTS_DIR = ARCHIVE_DIR / "insights"
TRAINING_DIR = ARCHIVE_DIR / "training"
TRAINING_EXAMPLES_DIR = TRAINING_DIR / "examples"
TRAINING_DPO_DIR = TRAINING_DIR / "dpo"
TRAINING_EVALS_DIR = TRAINING_DIR / "evals"
TRAINING_RUNS_DIR = TRAINING_DIR / "runs"
METRICS_DIR = ARCHIVE_DIR / "metrics"
PROFILE_FILE = KNOWLEDGE_DIR / "profile.json"
CHANGES_FILE = KNOWLEDGE_DIR / "changes.jsonl"
CHECKPOINT_FILE = ARCHIVE_DIR / "checkpoint.json"
PROGRESS_FILE = METRICS_DIR / "progress.json"
SOURCE_CONFIG_FILE = ARCHIVE_DIR / "source_config.json"
PIPELINE_CONFIG_FILE = ARCHIVE_DIR / "pipeline_config.json"
DEFAULT_SOURCE_DIR = (
Path.home()
/ "Downloads"
/ "twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e"
/ "data"
)
BATCH_SIZE = 50
def deep_default(default: Any) -> Any:
return deepcopy(default)
def ensure_layout() -> None:
for path in (
ARCHIVE_DIR,
EXTRACTED_DIR,
NOTES_DIR,
KNOWLEDGE_DIR,
CANDIDATES_DIR,
INSIGHTS_DIR,
TRAINING_DIR,
TRAINING_EXAMPLES_DIR,
TRAINING_DPO_DIR,
TRAINING_EVALS_DIR,
TRAINING_RUNS_DIR,
METRICS_DIR,
):
path.mkdir(parents=True, exist_ok=True)
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return deep_default(default)
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return deep_default(default)
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")
def load_jsonl(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
rows: list[dict[str, Any]] = []
for line in path.read_text().splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def append_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
if not rows:
return
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "a") as handle:
for row in rows:
handle.write(json.dumps(row, sort_keys=True) + "\n")
def stable_sha256(path: Path) -> str:
digest = hashlib.sha256()
with open(path, "rb") as handle:
while True:
chunk = handle.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def slugify(value: str) -> str:
cleaned = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return cleaned or "item"
def batch_id_from_number(batch_number: int) -> str:
return f"batch_{batch_number:03d}"
def default_checkpoint() -> dict[str, Any]:
return {
"data_source": "tweets",
"batch_size": BATCH_SIZE,
"next_offset": 0,
"batches_completed": 0,
"phase": "discovery",
"confidence": "low",
"next_focus": "look for recurring themes and recurring people",
"understanding_version": 0,
"last_batch_id": None,
"last_batch_sessions": {},
"last_profile_update": None,
"last_dpo_build": None,
"last_insight_file": None,
}
def load_checkpoint() -> dict[str, Any]:
checkpoint = default_checkpoint()
checkpoint.update(load_json(CHECKPOINT_FILE, {}))
return checkpoint
def resolve_source_dir() -> Path:
env_value = os.environ.get("TIMMY_TWITTER_ARCHIVE_SOURCE")
if env_value:
return Path(os.path.expanduser(env_value))
config = load_json(SOURCE_CONFIG_FILE, {})
configured = config.get("source_path")
if configured:
return Path(os.path.expanduser(configured))
return DEFAULT_SOURCE_DIR
def load_pipeline_config() -> dict[str, Any]:
return load_json(PIPELINE_CONFIG_FILE, {})
def tweet_text(tweet: dict[str, Any]) -> str:
raw = tweet.get("full_text") or tweet.get("text") or ""
return re.sub(r"\s+", " ", raw).strip()
def parse_entities(tweet: dict[str, Any], entity_key: str, value_key: str) -> list[str]:
entities = tweet.get("entities", {}) or {}
items = entities.get(entity_key, []) or []
return [item.get(value_key, "").strip() for item in items if item.get(value_key)]
def normalize_tweet(raw_entry: dict[str, Any], source_file: str) -> dict[str, Any] | None:
tweet = raw_entry.get("tweet", raw_entry)
text = tweet_text(tweet)
tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip()
if not tweet_id or not text:
return None
normalized = {
"tweet_id": tweet_id,
"created_at": tweet.get("created_at"),
"full_text": text,
"is_retweet": bool(tweet.get("retweeted")) or text.startswith("RT @"),
"reply_to_tweet_id": tweet.get("in_reply_to_status_id_str")
or tweet.get("in_reply_to_status_id"),
"reply_to_user_id": tweet.get("in_reply_to_user_id_str")
or tweet.get("in_reply_to_user_id"),
"lang": tweet.get("lang"),
"favorite_count": int(tweet.get("favorite_count") or 0),
"retweet_count": int(tweet.get("retweet_count") or 0),
"mentions": parse_entities(tweet, "user_mentions", "screen_name"),
"hashtags": parse_entities(tweet, "hashtags", "text"),
"urls": parse_entities(tweet, "urls", "expanded_url"),
"source_file": source_file,
}
return normalized
def sort_tweets(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(rows, key=lambda row: (row.get("created_at") or "", row["tweet_id"]))
def latest_file(directory: Path, pattern: str) -> Path | None:
files = sorted(directory.glob(pattern))
return files[-1] if files else None
def count_jsonl_rows(path: Path) -> int:
if not path.exists():
return 0
with open(path) as handle:
return sum(1 for line in handle if line.strip())
def gather_evidence_ids(batch_payload: dict[str, Any]) -> list[str]:
ids = set(batch_payload.get("tweet_ids", []))
for candidate in batch_payload.get("knowledge_candidates", []):
ids.update(str(tweet_id) for tweet_id in candidate.get("evidence_tweet_ids", []))
return sorted(ids)
def compute_progress_snapshot() -> dict[str, Any]:
checkpoint = load_checkpoint()
profile = load_json(PROFILE_FILE, {"claims": []})
dpo_files = sorted(TRAINING_DPO_DIR.glob("pairs_*.jsonl"))
dpo_pairs = sum(count_jsonl_rows(path) for path in dpo_files)
durable_claims = [
claim for claim in profile.get("claims", []) if claim.get("status") == "durable"
]
snapshot = {
"batches_completed": checkpoint.get("batches_completed", 0),
"next_offset": checkpoint.get("next_offset", 0),
"phase": checkpoint.get("phase", "discovery"),
"candidate_batches": len(list(CANDIDATES_DIR.glob("batch_*.json"))),
"durable_claims": len(durable_claims),
"training_examples": sum(
count_jsonl_rows(path) for path in TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl")
),
"dpo_pair_files": len(dpo_files),
"dpo_pairs": dpo_pairs,
"latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name
if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl")
else None,
"latest_note": latest_file(NOTES_DIR, "batch_*.md").name
if latest_file(NOTES_DIR, "batch_*.md")
else None,
"latest_eval": latest_file(TRAINING_EVALS_DIR, "run_*.json").name
if latest_file(TRAINING_EVALS_DIR, "run_*.json")
else None,
}
return snapshot
def write_progress_snapshot() -> dict[str, Any]:
snapshot = compute_progress_snapshot()
write_json(PROGRESS_FILE, snapshot)
return snapshot