diff --git a/Makefile b/Makefile index c3d6fa2..96b39d5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,14 @@ -.PHONY: test + 1|.PHONY: test + 2| + 3|test: + 4| python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v + 5| +# Connector targets +test-connectors: + python3 -m pytest tests/test_connectors.py -v -test: - python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v +run-connector: + python3 scripts/run_connector.py $(CONNECTOR) --source $(SOURCE) --output $(OUTPUT) [--limit $(LIMIT)] [--dry-run] + +connectors-help: + python3 scripts/run_connector.py --help diff --git a/README.md b/README.md index 20fd178..3a55f77 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,11 @@ Before a session starts, queries knowledge store for relevant facts. Assembles c ### Pipeline 3: Measure Tracks whether compounding is happening. Knowledge velocity, error reduction, hit rate, task completion. Daily report proves the loop works. +### Connector Pack (EPIC #233) +Sovereign personal archive connectors: Twitter/X, Discord, Slack, WhatsApp, Notion, iMessage, Google. +Connectors mirror local exports or explicit API tokens → normalize → redact → index → sync with provenance. +See [`connectors/`](connectors/README.md) for the full connector suite and usage. + ## Directory Structure ``` @@ -40,6 +45,12 @@ Tracks whether compounding is happening. Knowledge velocity, error reduction, hi │ ├── bootstrapper.py # Pre-session context loader │ ├── measurer.py # Compounding metrics │ └── session_reader.py # JSONL parser +├── connectors/ # Personal archive connectors (EPIC #233) +│ ├── __init__.py +│ ├── base.py +│ ├── schema.py +│ ├── twitter_archive.py +│ └── README.md ├── metrics/ │ └── dashboard.md # Human-readable status └── templates/ @@ -65,4 +76,4 @@ See [all issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/compound - EPIC 1: Session Harvester (#2) - EPIC 2: Knowledge Store & Bootstrap (#3) - EPIC 3: Compounding Measurement (#4) -- EPIC 4: Retroactive Harvest (#5) +- EPIC 4: Retroactive Harvest (#5) \ No newline at end of file diff --git a/connectors/README.md b/connectors/README.md new file mode 100644 index 0000000..ed9cea8 --- /dev/null +++ b/connectors/README.md @@ -0,0 +1,66 @@ +# Sovereign Personal Archive Connector Pack + +This directory contains the connector infrastructure for ingesting personal archives +(Discord, Slack, WhatsApp, Notion, iMessage, X/Twitter, Google) into the compounding-intelligence +knowledge pipeline. + +## Quick Start + +```bash +# Run the Twitter archive connector +python3 scripts/run_connector.py twitter \ + --source ~/Documents/TwitterArchive \ + --output events.jsonl \ + --limit 100 +``` + +## Connector Output Format + +Each connector emits `SourceEvent` objects (one JSON line per event): + +```json +{ + "source": "twitter", + "account": "user_archive", + "thread_or_channel": "tweet_123456", + "author": "user_archive", + "timestamp": "2026-04-26T08:30:00+00:00", + "content": "Tweet text here", + "attachments": ["https://..."], + "raw_ref": "twitter:archive:tweet.js:123456", + "hash": "sha256...", + "consent_scope": "memory_only", + "metadata": { "tweet_id": "123456", "favorite_count": 10 } +} +``` + +## Connector Registry + +| Name | Source Format | Status | +|----------------|-----------------------------|----------| +| twitter_archive| Official Twitter data export| ✅ Working | +| discord_archive| Discord data package / JSON | ⏳ Planned | +| slack_archive | Slack export / API | ⏳ Planned | +| whatsapp_archive| WhatsApp Desktop export | ⏳ Planned | +| notion_archive | Notion markdown/SQLite | ⏳ Planned | +| imessage_archive| macOS local chat storage | ⏳ Planned | +| google_archive | Google Workspace CLI | ⏳ Planned | + +## Design Principles + +1. **Local-first**: Connectors operate on user-owned exports or explicit API credentials. +2. **Incremental**: Checkpoint files (~/.cache/connectors/) allow resumable processing. +3. **Consent-gated**: Default `consent_scope=memory_only` — explicit opt-in for broader use. +4. **Provenance-preserving**: `metadata` retains all raw fields; `hash` enables deduplication. +5. **Sovereign**: No ambient scraping. No cloud dependency unless user explicitly configures tokens. + +## Writing a New Connector + +Subclass `BaseConnector` from `connectors/base.py` and implement: + +- `discover_sources(root: Path) -> Iterator[Path|str]` — find source files or IDs +- `parse_source(source) -> Iterator[SourceEvent]` — emit normalized events + +Register in `connectors/__init__.py` `_REGISTRY` dict. + +See `connectors/twitter_archive.py` for a complete example. diff --git a/connectors/__init__.py b/connectors/__init__.py new file mode 100644 index 0000000..873098c --- /dev/null +++ b/connectors/__init__.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +connectors/__init__.py — Sovereign personal archive connector pack. + +Provides: + - BaseConnector: abstract base class for all connectors + - SourceEvent: unified event schema + - compute_event_hash, validate_event: utilities + - Registry: connector discovery and loading + +Connectors: + - TwitterArchiveConnector: parse official Twitter/X archive exports + (Future: Discord, Slack, WhatsApp, Notion, iMessage, Google) +""" + +from .base import BaseConnector +from .schema import ( + SourceEvent, + compute_event_hash, + validate_event, + CONSENT_MEMORY_ONLY, + CONSENT_BOOTSTRAP, + CONSENT_TRAINING, +) +from .twitter_archive import TwitterArchiveConnector + +# Auto-registry: map of connector name → class +_REGISTRY = { + "twitter_archive": TwitterArchiveConnector, + # Future connectors: + # "discord_archive": DiscordArchiveConnector, + # "slack_archive": SlackArchiveConnector, + # "whatsapp_archive": WhatsAppArchiveConnector, + # "notion_archive": NotionArchiveConnector, + # "imessage_archive": iMessageArchiveConnector, + # "google_archive": GoogleArchiveConnector, +} + + +def get_connector(name: str) -> type[BaseConnector]: + """Get connector class by registry name.""" + cls = _REGISTRY.get(name) + if cls is None: + raise ValueError(f"Unknown connector '{name}'. Available: {list(_REGISTRY.keys())}") + return cls + + +def list_connectors() -> list[str]: + """List all registered connector names.""" + return list(_REGISTRY.keys()) diff --git a/connectors/base.py b/connectors/base.py new file mode 100644 index 0000000..3cc11e8 --- /dev/null +++ b/connectors/base.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +connectors/base.py — Abstract base class for all personal archive connectors. + +Defines the contract every connector must implement. Connectors read local +exports or API data and yield SourceEvent objects. +""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Iterator, Optional, Dict, Any +import json +import logging + +logger = logging.getLogger(__name__) + + +class BaseConnector(ABC): + name: str = None + source_glob: Optional[str] = None + default_consent_scope: str = "memory_only" + + def __init__(self, checkpoint_path: Optional[Path] = None, + consent_scope: Optional[str] = None): + self.consent_scope = consent_scope or self.default_consent_scope + if checkpoint_path is None: + cache_dir = Path.home() / ".cache" / "connectors" + cache_dir.mkdir(parents=True, exist_ok=True) + checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl" + self.checkpoint_path = checkpoint_path + self._processed_hashes = self._load_checkpoint() + + def _load_checkpoint(self) -> set: + if not self.checkpoint_path.exists(): + return set() + seen = set() + with open(self.checkpoint_path, 'r') as f: + for line in f: + try: + entry = json.loads(line) + seen.add(entry.get('hash', '')) + except Exception: + continue + logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name) + return seen + + def _save_checkpoint(self, event) -> None: + with open(self.checkpoint_path, 'a') as f: + f.write(json.dumps({'hash': event.hash, 'source': event.source, + 'raw_ref': event.raw_ref}) + '\n') + + def mark_processed(self, event_hash: str) -> None: + self._processed_hashes.add(event_hash) + + def is_processed(self, event_hash: str) -> bool: + return event_hash in self._processed_hashes + + def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator: + count = 0 + skipped_dedup = 0 + skipped_invalid = 0 + + for source in self.discover_sources(source_root): + for event in self.parse_source(source): + if not event.hash: + from .schema import compute_event_hash + event.hash = compute_event_hash( + event.source, event.raw_ref, event.content, + event.timestamp, event.author + ) + + if event.hash in self._processed_hashes: + skipped_dedup += 1 + continue + + from .schema import validate_event + if not validate_event(event): + skipped_invalid += 1 + logger.warning("Invalid event: raw_ref=%s", event.raw_ref) + continue + + event.consent_scope = self.consent_scope + self._save_checkpoint(event) + self._processed_hashes.add(event.hash) + + yield event + count += 1 + if limit and count >= limit: + return + + logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d", + self.name, count, skipped_dedup, skipped_invalid) + + @abstractmethod + def discover_sources(self, root: Path) -> Iterator: + pass + + @abstractmethod + def parse_source(self, source) -> Iterator: + pass diff --git a/connectors/schema.py b/connectors/schema.py new file mode 100644 index 0000000..8950499 --- /dev/null +++ b/connectors/schema.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +connectors/schema.py — Unified source-event schema for personal archive connectors. + +All connectors must produce events conforming to this schema so downstream +pipelines (harvester → knowledge store) can process them uniformly. +""" + +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional, Any, Dict +import hashlib +import json + + +@dataclass +class SourceEvent: + """ + Canonical event schema for any ingested personal archive entry. + + Fields + ------ + source : str + Platform identifier: 'twitter', 'discord', 'slack', 'whatsapp', + 'notion', 'imessage', 'google', etc. + account : str + User account/channel identifier on the source platform. + thread_or_channel : str + Conversation thread, channel, or chat identifier. + author : str + Who created this content (may differ from account in group chats). + timestamp : str + ISO-8601 timestamp when the event occurred (not when it was ingested). + content : str + Primary text content. May be empty for non-text events (images only). + attachments : list[str] + List of local file paths or URLs for attached media. + raw_ref : str + Pointer to the raw source record (file path, message ID, URL, etc.). + hash : str + SHA-256 hash of the raw content for deduplication and provenance. + consent_scope : str + Privacy gate: where this content may be used. + Examples: 'memory_only', 'bootstrap_context', 'training_data'. + Default: 'memory_only' for ingested personal archives. + metadata : dict[str, Any] + Platform-specific fields retained for provenance but not indexed. + """ + source: str + account: str + thread_or_channel: str + author: str + timestamp: str + content: str + attachments: list[str] + raw_ref: str + hash: str + consent_scope: str = "memory_only" + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> dict: + """Convert to plain dict for JSON serialization.""" + d = asdict(self) + if d['metadata'] is None: + d['metadata'] = {} + return d + + def to_json(self) -> str: + """Serialize to JSON line (one event per line).""" + return json.dumps(self.to_dict(), ensure_ascii=False) + + +def compute_event_hash(source: str, raw_ref: str, content: str, + timestamp: str, author: str) -> str: + """ + Compute deterministic SHA-256 hash for an event. + + Hash inputs: source + raw_ref + content + timestamp + author. + This ensures identical content always produces the same hash, + enabling cross-connector deduplication. + """ + canonical = f"{source}|{raw_ref}|{content}|{timestamp}|{author}" + return hashlib.sha256(canonical.encode('utf-8')).hexdigest() + + +def validate_event(event: SourceEvent) -> bool: + """ + Minimal structural validation for a SourceEvent. + Returns True if required fields are present and well-formed. + """ + required = [event.source, event.account, event.thread_or_channel, + event.author, event.timestamp, event.content, event.raw_ref, + event.hash] + return all(str(x).strip() for x in required) + + +# Consent scope definitions +CONSENT_MEMORY_ONLY = "memory_only" # For retrieval only, not bootstrap +CONSENT_BOOTSTRAP = "bootstrap_context" # Can seed new sessions +CONSENT_TRAINING = "training_data" # May be used for model training diff --git a/connectors/twitter_archive.py b/connectors/twitter_archive.py new file mode 100644 index 0000000..0bd5bc0 --- /dev/null +++ b/connectors/twitter_archive.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +connectors/twitter_archive.py — Twitter/X personal archive connector. + +Parses official Twitter data exports (Twitter's "Download your data" archive). +Expects the tweet.js / tweet.json files from the archive's data/ directory. + +Format (Twitter's archived tweets JSON): + Each entry has: {"tweet": {"id_str": "...", "full_text": "...", "created_at": "...", ...}} + +Output: normalized SourceEvent with source='twitter'. +""" + +import json +import re +from datetime import datetime +from pathlib import Path +from typing import Iterator, Optional +import logging + +from .base import BaseConnector +from .schema import SourceEvent, compute_event_hash + +logger = logging.getLogger(__name__) + + +class TwitterArchiveConnector(BaseConnector): + """Connector for Twitter/X official archive exports.""" + name = "twitter_archive" + source_glob = "**/tweet*.json" + default_consent_scope = "memory_only" + + # Twitter's date format in archives: "Wed Oct 10 20:19:24 +0000 2018" + TWITTER_DATE_FMT = "%a %b %d %H:%M:%S %z %Y" + + def discover_sources(self, root: Path) -> Iterator[Path]: + """ + Find tweet.js / tweet.json files in a Twitter archive. + + The official Twitter export places these under: + root/ + data/ + tweet.js (single-file format, older exports) + or + account-XXXX-YYYY/ + tweets.js (per-month splitted format) + """ + root = Path(root) + # Search for .js files that start with 'tweet' — these contain the tweet JSON blobs + candidates = list(root.rglob("tweet*.js")) + list(root.rglob("tweet*.json")) + logger.info("Discovered %d Twitter archive files under %s", len(candidates), root) + for path in candidates: + yield path + + def parse_source(self, source: Path) -> Iterator[SourceEvent]: + """ + Parse a Twitter archive file and yield SourceEvents. + + Handles both single-file (old) and per-month splitted formats. + Twitter wraps the JSON array in a JS variable assignment: `window.YTD.tweet.part0 = [...]` + """ + try: + with open(source, 'r', encoding='utf-8') as f: + raw = f.read() + + # Extract JSON array from the JS wrapper + match = re.search(r'=\s*(\[.+?\])\s*;?\s*$', raw, re.DOTALL) + if match: + json_str = match.group(1) + records = json.loads(json_str) + else: + # Plain JSON array (no wrapper) + records = json.loads(raw) + + logger.debug("Parsing %d tweet records from %s", len(records), source) + + for record in records: + event = self._record_to_event(record, source) + if event: + yield event + + except Exception as e: + logger.error("Failed to parse %s: %s", source, e) + + def _record_to_event(self, record: dict, source_path: Path) -> Optional[SourceEvent]: + """ + Convert a single tweet record into a SourceEvent. + + The record can be either the wrapped format {"tweet": {...}}} or the bare tweet object. + """ + # Unwrap the tweet object + tweet = record.get('tweet', record) + + # Extract core fields + id_str = tweet.get('id_str') or tweet.get('id') + full_text = tweet.get('full_text') or tweet.get('text', '') + created_at = tweet.get('created_at', '') + + # Parse timestamp + try: + dt = datetime.strptime(created_at, self.TWITTER_DATE_FMT) + iso_ts = dt.astimezone().isoformat() + except Exception: + iso_ts = created_at # fallback: keep as-is + + # Author is always the account owner (Twitter archives don't include others' DMs by default) + account = "user_archive" # normalized account identifier + + # Thread/channel: individual tweets have no thread ID; threads aren't preserved in basic export + thread_id = f"tweet_{id_str}" + + # Attachments: extract media URLs + attachments = [] + extended_entities = tweet.get('extended_entities', {}) + for media in extended_entities.get('media', []): + url = media.get('media_url_https') or media.get('media_url') + if url: + attachments.append(url) + + # Build raw_ref + raw_ref = f"twitter:archive:{source_path.name}:{id_str}" + + # Compute hash + content_for_hash = full_text or "" + hash_val = compute_event_hash( + source="twitter", + raw_ref=raw_ref, + content=content_for_hash, + timestamp=iso_ts, + author=account + ) + + # Preserve metadata for provenance + metadata = { + "tweet_id": id_str, + "source_file": str(source_path), + "favorite_count": tweet.get('favorite_count'), + "retweet_count": tweet.get('retweet_count'), + "in_reply_to_status_id": tweet.get('in_reply_to_status_id_str'), + "lang": tweet.get('lang'), + } + + return SourceEvent( + source="twitter", + account=account, + thread_or_channel=thread_id, + author=account, + timestamp=iso_ts, + content=full_text, + attachments=attachments, + raw_ref=raw_ref, + hash=hash_val, + consent_scope=self.consent_scope, + metadata=metadata + ) diff --git a/tests/test_connectors.py b/tests/test_connectors.py new file mode 100644 index 0000000..30c9ccb --- /dev/null +++ b/tests/test_connectors.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +""" +tests/test_connectors.py — Test suite for the personal archive connector pack. + +Tests cover: + - SourceEvent schema validation + - Event hash determinism + - TwitterArchiveConnector parsing of standard Twitter export format + - Deduplication gate +""" + +import json +import hashlib +import tempfile +from pathlib import Path +from datetime import datetime +import pytest + +# Add scripts dir to path for sibling imports +import sys +SCRIPT_DIR = Path(__file__).parent.parent / "scripts" +sys.path.insert(0, str(SCRIPT_DIR.parent)) + +from connectors.schema import ( + SourceEvent, + compute_event_hash, + validate_event, + CONSENT_MEMORY_ONLY, + CONSENT_BOOTSTRAP, +) +from connectors.twitter_archive import TwitterArchiveConnector + + +class TestSourceEventSchema: + """Tests for SourceEvent dataclass and helpers.""" + + def test_create_minimal_event(self): + event = SourceEvent( + source="twitter", + account="user123", + thread_or_channel="tweet_456", + author="user123", + timestamp="2026-04-26T12:00:00Z", + content="Hello world", + attachments=[], + raw_ref="twitter:test:456", + hash="", + ) + assert event.source == "twitter" + assert event.consent_scope == CONSENT_MEMORY_ONLY # default + + def test_compute_event_hash_deterministic(self): + h1 = compute_event_hash( + source="twitter", + raw_ref="ref:123", + content="test content", + timestamp="2026-04-26T12:00:00Z", + author="alice" + ) + h2 = compute_event_hash( + source="twitter", + raw_ref="ref:123", + content="test content", + timestamp="2026-04-26T12:00:00Z", + author="alice" + ) + assert h1 == h2 + assert len(h1) == 64 # SHA-256 hex + + def test_compute_event_hash_different_inputs(self): + h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice") + h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice") + assert h1 != h2 + + def test_validate_event_accepts_valid(self): + event = SourceEvent( + source="discord", + account="user#1234", + thread_or_channel="channel_abc", + author="user#1234", + timestamp="2026-04-26T12:00:00Z", + content="test", + attachments=[], + raw_ref="discord:msg:123", + hash="a" * 64, + ) + assert validate_event(event) is True + + def test_validate_event_rejects_empty_content(self): + event = SourceEvent( + source="twitter", + account="user", + thread_or_channel="thread", + author="user", + timestamp="2026-04-26T12:00:00Z", + content="", # empty + attachments=[], + raw_ref="ref", + hash="a" * 64, + ) + assert validate_event(event) is False + + def test_validate_event_rejects_missing_hash(self): + event = SourceEvent( + source="twitter", + account="user", + thread_or_channel="thread", + author="user", + timestamp="2026-04-26T12:00:00Z", + content="test", + attachments=[], + raw_ref="ref", + hash=" ", # whitespace only + ) + assert validate_event(event) is False + + def test_event_to_json_roundtrip(self): + event = SourceEvent( + source="twitter", + account="user", + thread_or_channel="t1", + author="user", + timestamp="2026-04-26T12:00:00Z", + content="hello", + attachments=["https://example.com/img.jpg"], + raw_ref="twitter:123", + hash="b" * 64, + metadata={"retweet_count": 5} + ) + json_str = event.to_json() + parsed = json.loads(json_str) + assert parsed["source"] == "twitter" + assert parsed["metadata"]["retweet_count"] == 5 + + +class TestTwitterArchiveConnector: + """Tests for the Twitter/X archive connector.""" + + def test_connector_name(self): + assert TwitterArchiveConnector.name == "twitter_archive" + + def test_discover_sources_finds_tweet_js(self, tmp_path: Path): + # Arrange: create a fake Twitter archive structure + archive = tmp_path / "twitter_archive" + archive.mkdir() + data_dir = archive / "data" + data_dir.mkdir() + (data_dir / "tweet.js").write_text("[]") + (data_dir / "tweets_2024_01.js").write_text("[]") + + connector = TwitterArchiveConnector() + sources = list(connector.discover_sources(archive)) + + assert len(sources) == 2 + assert any("tweet.js" in str(p) for p in sources) + + def test_parse_single_tweet_wrapped_format(self, tmp_path: Path): + """ + Twitter's official export wraps the JSON array in a JS assignment: + window.YTD.tweet.part0 = [ {...tweet...}, ... ]; + """ + # Create a minimal tweet record + tweet = { + "id_str": "1234567890", + "full_text": "Hello from Twitter archive!", + "created_at": "Mon Apr 26 08:30:00 +0000 2026", + "favorite_count": 10, + "retweet_count": 2, + "lang": "en" + } + wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n" + + js_file = tmp_path / "tweet.js" + js_file.write_text(wrapped) + + connector = TwitterArchiveConnector() + events = list(connector.parse_source(js_file)) + + assert len(events) == 1 + ev = events[0] + assert ev.source == "twitter" + assert ev.content == "Hello from Twitter archive!" + assert ev.author == "user_archive" + assert ev.consent_scope == "memory_only" + # Hash must be computed + assert len(ev.hash) == 64 + # Metadata preservation + assert ev.metadata["tweet_id"] == "1234567890" + assert ev.metadata["favorite_count"] == 10 + + def test_parse_tweet_array_without_wrapper(self, tmp_path: Path): + """Some Twitter exports are plain JSON arrays (no JS wrapper).""" + tweet = { + "id_str": "999", + "full_text": "Plain JSON tweet", + "created_at": "Mon Apr 26 08:30:00 +0000 2026", + } + json_file = tmp_path / "tweets.json" + json_file.write_text(json.dumps([{"tweet": tweet}])) + + connector = TwitterArchiveConnector() + events = list(connector.parse_source(json_file)) + + assert len(events) == 1 + assert events[0].content == "Plain JSON tweet" + + def test_parse_with_media_attachments(self, tmp_path: Path): + tweet = { + "id_str": "111", + "full_text": "Check this photo", + "created_at": "Mon Apr 26 08:30:00 +0000 2026", + "extended_entities": { + "media": [ + {"media_url_https": "https://pbs.twimg.com/media/example1.jpg"}, + {"media_url_https": "https://pbs.twimg.com/media/example2.jpg"}, + ] + } + } + js_file = tmp_path / "tweet.js" + js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n") + + connector = TwitterArchiveConnector() + events = list(connector.parse_source(js_file)) + + assert len(events) == 1 + atts = events[0].attachments + assert len(atts) == 2 + assert "example1.jpg" in atts[0] + + def test_integration_run_connector(self, tmp_path: Path): + """End-to-end: create a mini archive, run connector, write JSONL output.""" + # Arrange: create archive with two tweets + archive_root = tmp_path / "my_twitter_archive" / "data" + archive_root.mkdir(parents=True) + + tweet1 = { + "id_str": "1", + "full_text": "First tweet", + "created_at": "Mon Apr 26 08:00:00 +0000 2026", + } + tweet2 = { + "id_str": "2", + "full_text": "Second tweet", + "created_at": "Mon Apr 26 09:00:00 +0000 2026", + } + (archive_root / "tweet.js").write_text( + "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n" + ) + + connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl") + output_path = tmp_path / "events.jsonl" + + # Act + count = 0 + with open(output_path, 'w') as out: + for event in connector.run(archive_root): + out.write(event.to_json() + '\n') + count += 1 + + # Assert + assert count == 2 + lines = output_path.read_text().strip().split('\n') + assert len(lines) == 2 + ev1 = json.loads(lines[0]) + assert ev1["content"] == "First tweet" + ev2 = json.loads(lines[1]) + assert ev2["content"] == "Second tweet" + # Check duplicates are filtered on re-run + count2 = sum(1 for _ in connector.run(archive_root)) + assert count2 == 0 # all deduped via checkpoint