257 lines
8.1 KiB
Python
257 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Shared helpers for the private Twitter archive pipeline."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
from copy import deepcopy
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
TIMMY_HOME = Path.home() / ".timmy"
|
|
ARCHIVE_DIR = TIMMY_HOME / "twitter-archive"
|
|
EXTRACTED_DIR = ARCHIVE_DIR / "extracted"
|
|
NOTES_DIR = ARCHIVE_DIR / "notes"
|
|
KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge"
|
|
CANDIDATES_DIR = KNOWLEDGE_DIR / "candidates"
|
|
INSIGHTS_DIR = ARCHIVE_DIR / "insights"
|
|
TRAINING_DIR = ARCHIVE_DIR / "training"
|
|
TRAINING_EXAMPLES_DIR = TRAINING_DIR / "examples"
|
|
TRAINING_DPO_DIR = TRAINING_DIR / "dpo"
|
|
TRAINING_EVALS_DIR = TRAINING_DIR / "evals"
|
|
TRAINING_RUNS_DIR = TRAINING_DIR / "runs"
|
|
METRICS_DIR = ARCHIVE_DIR / "metrics"
|
|
PROFILE_FILE = KNOWLEDGE_DIR / "profile.json"
|
|
CHANGES_FILE = KNOWLEDGE_DIR / "changes.jsonl"
|
|
CHECKPOINT_FILE = ARCHIVE_DIR / "checkpoint.json"
|
|
PROGRESS_FILE = METRICS_DIR / "progress.json"
|
|
SOURCE_CONFIG_FILE = ARCHIVE_DIR / "source_config.json"
|
|
PIPELINE_CONFIG_FILE = ARCHIVE_DIR / "pipeline_config.json"
|
|
DEFAULT_SOURCE_DIR = (
|
|
Path.home()
|
|
/ "Downloads"
|
|
/ "twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e"
|
|
/ "data"
|
|
)
|
|
BATCH_SIZE = 50
|
|
|
|
|
|
def deep_default(default: Any) -> Any:
|
|
return deepcopy(default)
|
|
|
|
|
|
def ensure_layout() -> None:
|
|
for path in (
|
|
ARCHIVE_DIR,
|
|
EXTRACTED_DIR,
|
|
NOTES_DIR,
|
|
KNOWLEDGE_DIR,
|
|
CANDIDATES_DIR,
|
|
INSIGHTS_DIR,
|
|
TRAINING_DIR,
|
|
TRAINING_EXAMPLES_DIR,
|
|
TRAINING_DPO_DIR,
|
|
TRAINING_EVALS_DIR,
|
|
TRAINING_RUNS_DIR,
|
|
METRICS_DIR,
|
|
):
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_json(path: Path, default: Any) -> Any:
|
|
if not path.exists():
|
|
return deep_default(default)
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except json.JSONDecodeError:
|
|
return deep_default(default)
|
|
|
|
|
|
def write_json(path: Path, payload: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")
|
|
|
|
|
|
def load_jsonl(path: Path) -> list[dict[str, Any]]:
|
|
if not path.exists():
|
|
return []
|
|
rows: list[dict[str, Any]] = []
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
rows.append(json.loads(line))
|
|
return rows
|
|
|
|
|
|
def append_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
|
|
if not rows:
|
|
return
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "a") as handle:
|
|
for row in rows:
|
|
handle.write(json.dumps(row, sort_keys=True) + "\n")
|
|
|
|
|
|
def stable_sha256(path: Path) -> str:
|
|
digest = hashlib.sha256()
|
|
with open(path, "rb") as handle:
|
|
while True:
|
|
chunk = handle.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def slugify(value: str) -> str:
|
|
cleaned = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
|
|
return cleaned or "item"
|
|
|
|
|
|
def batch_id_from_number(batch_number: int) -> str:
|
|
return f"batch_{batch_number:03d}"
|
|
|
|
|
|
def default_checkpoint() -> dict[str, Any]:
|
|
return {
|
|
"data_source": "tweets",
|
|
"batch_size": BATCH_SIZE,
|
|
"next_offset": 0,
|
|
"batches_completed": 0,
|
|
"phase": "discovery",
|
|
"confidence": "low",
|
|
"next_focus": "look for recurring themes and recurring people",
|
|
"understanding_version": 0,
|
|
"last_batch_id": None,
|
|
"last_batch_sessions": {},
|
|
"last_profile_update": None,
|
|
"last_dpo_build": None,
|
|
"last_insight_file": None,
|
|
}
|
|
|
|
|
|
def load_checkpoint() -> dict[str, Any]:
|
|
checkpoint = default_checkpoint()
|
|
checkpoint.update(load_json(CHECKPOINT_FILE, {}))
|
|
return checkpoint
|
|
|
|
|
|
def resolve_source_dir() -> Path:
|
|
env_value = os.environ.get("TIMMY_TWITTER_ARCHIVE_SOURCE")
|
|
if env_value:
|
|
return Path(os.path.expanduser(env_value))
|
|
|
|
config = load_json(SOURCE_CONFIG_FILE, {})
|
|
configured = config.get("source_path")
|
|
if configured:
|
|
return Path(os.path.expanduser(configured))
|
|
|
|
return DEFAULT_SOURCE_DIR
|
|
|
|
|
|
def load_pipeline_config() -> dict[str, Any]:
|
|
return load_json(PIPELINE_CONFIG_FILE, {})
|
|
|
|
|
|
def tweet_text(tweet: dict[str, Any]) -> str:
|
|
raw = tweet.get("full_text") or tweet.get("text") or ""
|
|
return re.sub(r"\s+", " ", raw).strip()
|
|
|
|
|
|
def parse_entities(tweet: dict[str, Any], entity_key: str, value_key: str) -> list[str]:
|
|
entities = tweet.get("entities", {}) or {}
|
|
items = entities.get(entity_key, []) or []
|
|
return [item.get(value_key, "").strip() for item in items if item.get(value_key)]
|
|
|
|
|
|
def normalize_tweet(raw_entry: dict[str, Any], source_file: str) -> dict[str, Any] | None:
|
|
tweet = raw_entry.get("tweet", raw_entry)
|
|
text = tweet_text(tweet)
|
|
tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip()
|
|
if not tweet_id or not text:
|
|
return None
|
|
|
|
normalized = {
|
|
"tweet_id": tweet_id,
|
|
"created_at": tweet.get("created_at"),
|
|
"full_text": text,
|
|
"is_retweet": bool(tweet.get("retweeted")) or text.startswith("RT @"),
|
|
"reply_to_tweet_id": tweet.get("in_reply_to_status_id_str")
|
|
or tweet.get("in_reply_to_status_id"),
|
|
"reply_to_user_id": tweet.get("in_reply_to_user_id_str")
|
|
or tweet.get("in_reply_to_user_id"),
|
|
"lang": tweet.get("lang"),
|
|
"favorite_count": int(tweet.get("favorite_count") or 0),
|
|
"retweet_count": int(tweet.get("retweet_count") or 0),
|
|
"mentions": parse_entities(tweet, "user_mentions", "screen_name"),
|
|
"hashtags": parse_entities(tweet, "hashtags", "text"),
|
|
"urls": parse_entities(tweet, "urls", "expanded_url"),
|
|
"source_file": source_file,
|
|
}
|
|
return normalized
|
|
|
|
|
|
def sort_tweets(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
return sorted(rows, key=lambda row: (row.get("created_at") or "", row["tweet_id"]))
|
|
|
|
|
|
def latest_file(directory: Path, pattern: str) -> Path | None:
|
|
files = sorted(directory.glob(pattern))
|
|
return files[-1] if files else None
|
|
|
|
|
|
def count_jsonl_rows(path: Path) -> int:
|
|
if not path.exists():
|
|
return 0
|
|
with open(path) as handle:
|
|
return sum(1 for line in handle if line.strip())
|
|
|
|
|
|
def gather_evidence_ids(batch_payload: dict[str, Any]) -> list[str]:
|
|
ids = set(batch_payload.get("tweet_ids", []))
|
|
for candidate in batch_payload.get("knowledge_candidates", []):
|
|
ids.update(str(tweet_id) for tweet_id in candidate.get("evidence_tweet_ids", []))
|
|
return sorted(ids)
|
|
|
|
|
|
def compute_progress_snapshot() -> dict[str, Any]:
|
|
checkpoint = load_checkpoint()
|
|
profile = load_json(PROFILE_FILE, {"claims": []})
|
|
dpo_files = sorted(TRAINING_DPO_DIR.glob("pairs_*.jsonl"))
|
|
dpo_pairs = sum(count_jsonl_rows(path) for path in dpo_files)
|
|
durable_claims = [
|
|
claim for claim in profile.get("claims", []) if claim.get("status") == "durable"
|
|
]
|
|
snapshot = {
|
|
"batches_completed": checkpoint.get("batches_completed", 0),
|
|
"next_offset": checkpoint.get("next_offset", 0),
|
|
"phase": checkpoint.get("phase", "discovery"),
|
|
"candidate_batches": len(list(CANDIDATES_DIR.glob("batch_*.json"))),
|
|
"durable_claims": len(durable_claims),
|
|
"training_examples": sum(
|
|
count_jsonl_rows(path) for path in TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl")
|
|
),
|
|
"dpo_pair_files": len(dpo_files),
|
|
"dpo_pairs": dpo_pairs,
|
|
"latest_dpo_file": latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl").name
|
|
if latest_file(TRAINING_DPO_DIR, "pairs_*.jsonl")
|
|
else None,
|
|
"latest_note": latest_file(NOTES_DIR, "batch_*.md").name
|
|
if latest_file(NOTES_DIR, "batch_*.md")
|
|
else None,
|
|
"latest_eval": latest_file(TRAINING_EVALS_DIR, "run_*.json").name
|
|
if latest_file(TRAINING_EVALS_DIR, "run_*.json")
|
|
else None,
|
|
}
|
|
return snapshot
|
|
|
|
|
|
def write_progress_snapshot() -> dict[str, Any]:
|
|
snapshot = compute_progress_snapshot()
|
|
write_json(PROGRESS_FILE, snapshot)
|
|
return snapshot
|