#!/usr/bin/env python3 """ tests/test_connectors.py — Test suite for the personal archive connector pack. Tests cover: - SourceEvent schema validation - Event hash determinism - TwitterArchiveConnector parsing of standard Twitter export format - Deduplication gate """ import json import hashlib import tempfile from pathlib import Path from datetime import datetime import pytest # Add scripts dir to path for sibling imports import sys SCRIPT_DIR = Path(__file__).parent.parent / "scripts" sys.path.insert(0, str(SCRIPT_DIR.parent)) from connectors.schema import ( SourceEvent, compute_event_hash, validate_event, CONSENT_MEMORY_ONLY, CONSENT_BOOTSTRAP, ) from connectors.twitter_archive import TwitterArchiveConnector class TestSourceEventSchema: """Tests for SourceEvent dataclass and helpers.""" def test_create_minimal_event(self): event = SourceEvent( source="twitter", account="user123", thread_or_channel="tweet_456", author="user123", timestamp="2026-04-26T12:00:00Z", content="Hello world", attachments=[], raw_ref="twitter:test:456", hash="", ) assert event.source == "twitter" assert event.consent_scope == CONSENT_MEMORY_ONLY # default def test_compute_event_hash_deterministic(self): h1 = compute_event_hash( source="twitter", raw_ref="ref:123", content="test content", timestamp="2026-04-26T12:00:00Z", author="alice" ) h2 = compute_event_hash( source="twitter", raw_ref="ref:123", content="test content", timestamp="2026-04-26T12:00:00Z", author="alice" ) assert h1 == h2 assert len(h1) == 64 # SHA-256 hex def test_compute_event_hash_different_inputs(self): h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice") h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice") assert h1 != h2 def test_validate_event_accepts_valid(self): event = SourceEvent( source="discord", account="user#1234", thread_or_channel="channel_abc", author="user#1234", timestamp="2026-04-26T12:00:00Z", content="test", attachments=[], raw_ref="discord:msg:123", hash="a" * 64, ) assert validate_event(event) is True def test_validate_event_rejects_empty_content(self): event = SourceEvent( source="twitter", account="user", thread_or_channel="thread", author="user", timestamp="2026-04-26T12:00:00Z", content="", # empty attachments=[], raw_ref="ref", hash="a" * 64, ) assert validate_event(event) is False def test_validate_event_rejects_missing_hash(self): event = SourceEvent( source="twitter", account="user", thread_or_channel="thread", author="user", timestamp="2026-04-26T12:00:00Z", content="test", attachments=[], raw_ref="ref", hash=" ", # whitespace only ) assert validate_event(event) is False def test_event_to_json_roundtrip(self): event = SourceEvent( source="twitter", account="user", thread_or_channel="t1", author="user", timestamp="2026-04-26T12:00:00Z", content="hello", attachments=["https://example.com/img.jpg"], raw_ref="twitter:123", hash="b" * 64, metadata={"retweet_count": 5} ) json_str = event.to_json() parsed = json.loads(json_str) assert parsed["source"] == "twitter" assert parsed["metadata"]["retweet_count"] == 5 class TestTwitterArchiveConnector: """Tests for the Twitter/X archive connector.""" def test_connector_name(self): assert TwitterArchiveConnector.name == "twitter_archive" def test_discover_sources_finds_tweet_js(self, tmp_path: Path): # Arrange: create a fake Twitter archive structure archive = tmp_path / "twitter_archive" archive.mkdir() data_dir = archive / "data" data_dir.mkdir() (data_dir / "tweet.js").write_text("[]") (data_dir / "tweets_2024_01.js").write_text("[]") connector = TwitterArchiveConnector() sources = list(connector.discover_sources(archive)) assert len(sources) == 2 assert any("tweet.js" in str(p) for p in sources) def test_parse_single_tweet_wrapped_format(self, tmp_path: Path): """ Twitter's official export wraps the JSON array in a JS assignment: window.YTD.tweet.part0 = [ {...tweet...}, ... ]; """ # Create a minimal tweet record tweet = { "id_str": "1234567890", "full_text": "Hello from Twitter archive!", "created_at": "Mon Apr 26 08:30:00 +0000 2026", "favorite_count": 10, "retweet_count": 2, "lang": "en" } wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n" js_file = tmp_path / "tweet.js" js_file.write_text(wrapped) connector = TwitterArchiveConnector() events = list(connector.parse_source(js_file)) assert len(events) == 1 ev = events[0] assert ev.source == "twitter" assert ev.content == "Hello from Twitter archive!" assert ev.author == "user_archive" assert ev.consent_scope == "memory_only" # Hash must be computed assert len(ev.hash) == 64 # Metadata preservation assert ev.metadata["tweet_id"] == "1234567890" assert ev.metadata["favorite_count"] == 10 def test_parse_tweet_array_without_wrapper(self, tmp_path: Path): """Some Twitter exports are plain JSON arrays (no JS wrapper).""" tweet = { "id_str": "999", "full_text": "Plain JSON tweet", "created_at": "Mon Apr 26 08:30:00 +0000 2026", } json_file = tmp_path / "tweets.json" json_file.write_text(json.dumps([{"tweet": tweet}])) connector = TwitterArchiveConnector() events = list(connector.parse_source(json_file)) assert len(events) == 1 assert events[0].content == "Plain JSON tweet" def test_parse_with_media_attachments(self, tmp_path: Path): tweet = { "id_str": "111", "full_text": "Check this photo", "created_at": "Mon Apr 26 08:30:00 +0000 2026", "extended_entities": { "media": [ {"media_url_https": "https://pbs.twimg.com/media/example1.jpg"}, {"media_url_https": "https://pbs.twimg.com/media/example2.jpg"}, ] } } js_file = tmp_path / "tweet.js" js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n") connector = TwitterArchiveConnector() events = list(connector.parse_source(js_file)) assert len(events) == 1 atts = events[0].attachments assert len(atts) == 2 assert "example1.jpg" in atts[0] def test_integration_run_connector(self, tmp_path: Path): """End-to-end: create a mini archive, run connector, write JSONL output.""" # Arrange: create archive with two tweets archive_root = tmp_path / "my_twitter_archive" / "data" archive_root.mkdir(parents=True) tweet1 = { "id_str": "1", "full_text": "First tweet", "created_at": "Mon Apr 26 08:00:00 +0000 2026", } tweet2 = { "id_str": "2", "full_text": "Second tweet", "created_at": "Mon Apr 26 09:00:00 +0000 2026", } (archive_root / "tweet.js").write_text( "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n" ) connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl") output_path = tmp_path / "events.jsonl" # Act count = 0 with open(output_path, 'w') as out: for event in connector.run(archive_root): out.write(event.to_json() + '\n') count += 1 # Assert assert count == 2 lines = output_path.read_text().strip().split('\n') assert len(lines) == 2 ev1 = json.loads(lines[0]) assert ev1["content"] == "First tweet" ev2 = json.loads(lines[1]) assert ev2["content"] == "Second tweet" # Check duplicates are filtered on re-run count2 = sum(1 for _ in connector.run(archive_root)) assert count2 == 0 # all deduped via checkpoint