feat(connectors): add sovereign personal archive connector pack foundation #279

Open
Rockachopa wants to merge 4 commits from step35/233-atlas-connectors-sovereign-p into main
10 changed files with 881 additions and 1 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/

View File

@@ -2,3 +2,12 @@
test:
python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v
# Connector targets
.PHONY: test-connectors
test-connectors:
python3 -m pytest tests/test_connectors.py -v
.PHONY: connectors-help
connectors-help:
python3 scripts/run_connector.py --help

View File

@@ -27,6 +27,11 @@ Before a session starts, queries knowledge store for relevant facts. Assembles c
### Pipeline 3: Measure
Tracks whether compounding is happening. Knowledge velocity, error reduction, hit rate, task completion. Daily report proves the loop works.
### Connector Pack (EPIC #233)
Sovereign personal archive connectors: Twitter/X, Discord, Slack, WhatsApp, Notion, iMessage, Google.
Connectors mirror local exports or explicit API tokens → normalize → redact → index → sync with provenance.
See [`connectors/`](connectors/README.md) for the full connector suite and usage.
## Directory Structure
```
@@ -40,6 +45,12 @@ Tracks whether compounding is happening. Knowledge velocity, error reduction, hi
│ ├── bootstrapper.py # Pre-session context loader
│ ├── measurer.py # Compounding metrics
│ └── session_reader.py # JSONL parser
├── connectors/ # Personal archive connectors (EPIC #233)
│ ├── __init__.py
│ ├── base.py
│ ├── schema.py
│ ├── twitter_archive.py
│ └── README.md
├── metrics/
│ └── dashboard.md # Human-readable status
└── templates/
@@ -65,4 +76,4 @@ See [all issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/compound
- EPIC 1: Session Harvester (#2)
- EPIC 2: Knowledge Store & Bootstrap (#3)
- EPIC 3: Compounding Measurement (#4)
- EPIC 4: Retroactive Harvest (#5)
- EPIC 4: Retroactive Harvest (#5)

66
connectors/README.md Normal file
View File

@@ -0,0 +1,66 @@
# Sovereign Personal Archive Connector Pack
This directory contains the connector infrastructure for ingesting personal archives
(Discord, Slack, WhatsApp, Notion, iMessage, X/Twitter, Google) into the compounding-intelligence
knowledge pipeline.
## Quick Start
```bash
# Run the Twitter archive connector
python3 scripts/run_connector.py twitter \
--source ~/Documents/TwitterArchive \
--output events.jsonl \
--limit 100
```
## Connector Output Format
Each connector emits `SourceEvent` objects (one JSON line per event):
```json
{
"source": "twitter",
"account": "user_archive",
"thread_or_channel": "tweet_123456",
"author": "user_archive",
"timestamp": "2026-04-26T08:30:00+00:00",
"content": "Tweet text here",
"attachments": ["https://..."],
"raw_ref": "twitter:archive:tweet.js:123456",
"hash": "sha256...",
"consent_scope": "memory_only",
"metadata": { "tweet_id": "123456", "favorite_count": 10 }
}
```
## Connector Registry
| Name | Source Format | Status |
|----------------|-----------------------------|----------|
| twitter_archive| Official Twitter data export| ✅ Working |
| discord_archive| Discord data package / JSON | ⏳ Planned |
| slack_archive | Slack export / API | ⏳ Planned |
| whatsapp_archive| WhatsApp Desktop export | ⏳ Planned |
| notion_archive | Notion markdown/SQLite | ⏳ Planned |
| imessage_archive| macOS local chat storage | ⏳ Planned |
| google_archive | Google Workspace CLI | ⏳ Planned |
## Design Principles
1. **Local-first**: Connectors operate on user-owned exports or explicit API credentials.
2. **Incremental**: Checkpoint files (~/.cache/connectors/) allow resumable processing.
3. **Consent-gated**: Default `consent_scope=memory_only` — explicit opt-in for broader use.
4. **Provenance-preserving**: `metadata` retains all raw fields; `hash` enables deduplication.
5. **Sovereign**: No ambient scraping. No cloud dependency unless user explicitly configures tokens.
## Writing a New Connector
Subclass `BaseConnector` from `connectors/base.py` and implement:
- `discover_sources(root: Path) -> Iterator[Path|str]` — find source files or IDs
- `parse_source(source) -> Iterator[SourceEvent]` — emit normalized events
Register in `connectors/__init__.py` `_REGISTRY` dict.
See `connectors/twitter_archive.py` for a complete example.

50
connectors/__init__.py Normal file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""
connectors/__init__.py — Sovereign personal archive connector pack.
Provides:
- BaseConnector: abstract base class for all connectors
- SourceEvent: unified event schema
- compute_event_hash, validate_event: utilities
- Registry: connector discovery and loading
Connectors:
- TwitterArchiveConnector: parse official Twitter/X archive exports
(Future: Discord, Slack, WhatsApp, Notion, iMessage, Google)
"""
from .base import BaseConnector
from .schema import (
SourceEvent,
compute_event_hash,
validate_event,
CONSENT_MEMORY_ONLY,
CONSENT_BOOTSTRAP,
CONSENT_TRAINING,
)
from .twitter_archive import TwitterArchiveConnector
# Auto-registry: map of connector name → class
_REGISTRY = {
"twitter_archive": TwitterArchiveConnector,
# Future connectors:
# "discord_archive": DiscordArchiveConnector,
# "slack_archive": SlackArchiveConnector,
# "whatsapp_archive": WhatsAppArchiveConnector,
# "notion_archive": NotionArchiveConnector,
# "imessage_archive": iMessageArchiveConnector,
# "google_archive": GoogleArchiveConnector,
}
def get_connector(name: str) -> type[BaseConnector]:
"""Get connector class by registry name."""
cls = _REGISTRY.get(name)
if cls is None:
raise ValueError(f"Unknown connector '{name}'. Available: {list(_REGISTRY.keys())}")
return cls
def list_connectors() -> list[str]:
"""List all registered connector names."""
return list(_REGISTRY.keys())

100
connectors/base.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
connectors/base.py — Abstract base class for all personal archive connectors.
Defines the contract every connector must implement. Connectors read local
exports or API data and yield SourceEvent objects.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterator, Optional, Dict, Any
import json
import logging
logger = logging.getLogger(__name__)
class BaseConnector(ABC):
name: str = None
source_glob: Optional[str] = None
default_consent_scope: str = "memory_only"
def __init__(self, checkpoint_path: Optional[Path] = None,
consent_scope: Optional[str] = None):
self.consent_scope = consent_scope or self.default_consent_scope
if checkpoint_path is None:
cache_dir = Path.home() / ".cache" / "connectors"
cache_dir.mkdir(parents=True, exist_ok=True)
checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl"
self.checkpoint_path = checkpoint_path
self._processed_hashes = self._load_checkpoint()
def _load_checkpoint(self) -> set:
if not self.checkpoint_path.exists():
return set()
seen = set()
with open(self.checkpoint_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
seen.add(entry.get('hash', ''))
except Exception:
continue
logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name)
return seen
def _save_checkpoint(self, event) -> None:
with open(self.checkpoint_path, 'a') as f:
f.write(json.dumps({'hash': event.hash, 'source': event.source,
'raw_ref': event.raw_ref}) + '\n')
def mark_processed(self, event_hash: str) -> None:
self._processed_hashes.add(event_hash)
def is_processed(self, event_hash: str) -> bool:
return event_hash in self._processed_hashes
def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator:
count = 0
skipped_dedup = 0
skipped_invalid = 0
for source in self.discover_sources(source_root):
for event in self.parse_source(source):
if not event.hash:
from .schema import compute_event_hash
event.hash = compute_event_hash(
event.source, event.raw_ref, event.content,
event.timestamp, event.author
)
if event.hash in self._processed_hashes:
skipped_dedup += 1
continue
from .schema import validate_event
if not validate_event(event):
skipped_invalid += 1
logger.warning("Invalid event: raw_ref=%s", event.raw_ref)
continue
event.consent_scope = self.consent_scope
self._save_checkpoint(event)
self._processed_hashes.add(event.hash)
yield event
count += 1
if limit and count >= limit:
return
logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d",
self.name, count, skipped_dedup, skipped_invalid)
@abstractmethod
def discover_sources(self, root: Path) -> Iterator:
pass
@abstractmethod
def parse_source(self, source) -> Iterator:
pass

100
connectors/schema.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
connectors/schema.py — Unified source-event schema for personal archive connectors.
All connectors must produce events conforming to this schema so downstream
pipelines (harvester → knowledge store) can process them uniformly.
"""
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Any, Dict
import hashlib
import json
@dataclass
class SourceEvent:
"""
Canonical event schema for any ingested personal archive entry.
Fields
------
source : str
Platform identifier: 'twitter', 'discord', 'slack', 'whatsapp',
'notion', 'imessage', 'google', etc.
account : str
User account/channel identifier on the source platform.
thread_or_channel : str
Conversation thread, channel, or chat identifier.
author : str
Who created this content (may differ from account in group chats).
timestamp : str
ISO-8601 timestamp when the event occurred (not when it was ingested).
content : str
Primary text content. May be empty for non-text events (images only).
attachments : list[str]
List of local file paths or URLs for attached media.
raw_ref : str
Pointer to the raw source record (file path, message ID, URL, etc.).
hash : str
SHA-256 hash of the raw content for deduplication and provenance.
consent_scope : str
Privacy gate: where this content may be used.
Examples: 'memory_only', 'bootstrap_context', 'training_data'.
Default: 'memory_only' for ingested personal archives.
metadata : dict[str, Any]
Platform-specific fields retained for provenance but not indexed.
"""
source: str
account: str
thread_or_channel: str
author: str
timestamp: str
content: str
attachments: list[str]
raw_ref: str
hash: str
consent_scope: str = "memory_only"
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> dict:
"""Convert to plain dict for JSON serialization."""
d = asdict(self)
if d['metadata'] is None:
d['metadata'] = {}
return d
def to_json(self) -> str:
"""Serialize to JSON line (one event per line)."""
return json.dumps(self.to_dict(), ensure_ascii=False)
def compute_event_hash(source: str, raw_ref: str, content: str,
timestamp: str, author: str) -> str:
"""
Compute deterministic SHA-256 hash for an event.
Hash inputs: source + raw_ref + content + timestamp + author.
This ensures identical content always produces the same hash,
enabling cross-connector deduplication.
"""
canonical = f"{source}|{raw_ref}|{content}|{timestamp}|{author}"
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def validate_event(event: SourceEvent) -> bool:
"""
Minimal structural validation for a SourceEvent.
Returns True if required fields are present and well-formed.
"""
required = [event.source, event.account, event.thread_or_channel,
event.author, event.timestamp, event.content, event.raw_ref,
event.hash]
return all(str(x).strip() for x in required)
# Consent scope definitions
CONSENT_MEMORY_ONLY = "memory_only" # For retrieval only, not bootstrap
CONSENT_BOOTSTRAP = "bootstrap_context" # Can seed new sessions
CONSENT_TRAINING = "training_data" # May be used for model training

View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
connectors/twitter_archive.py — Twitter/X personal archive connector.
Parses official Twitter data exports (Twitter's "Download your data" archive).
Expects the tweet.js / tweet.json files from the archive's data/ directory.
Format (Twitter's archived tweets JSON):
Each entry has: {"tweet": {"id_str": "...", "full_text": "...", "created_at": "...", ...}}
Output: normalized SourceEvent with source='twitter'.
"""
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Iterator, Optional
import logging
from .base import BaseConnector
from .schema import SourceEvent, compute_event_hash
logger = logging.getLogger(__name__)
class TwitterArchiveConnector(BaseConnector):
"""Connector for Twitter/X official archive exports."""
name = "twitter_archive"
source_glob = "**/tweet*.json"
default_consent_scope = "memory_only"
# Twitter's date format in archives: "Wed Oct 10 20:19:24 +0000 2018"
TWITTER_DATE_FMT = "%a %b %d %H:%M:%S %z %Y"
def discover_sources(self, root: Path) -> Iterator[Path]:
"""
Find tweet.js / tweet.json files in a Twitter archive.
The official Twitter export places these under:
root/
data/
tweet.js (single-file format, older exports)
or
account-XXXX-YYYY/
tweets.js (per-month splitted format)
"""
root = Path(root)
# Search for .js files that start with 'tweet' — these contain the tweet JSON blobs
candidates = list(root.rglob("tweet*.js")) + list(root.rglob("tweet*.json"))
logger.info("Discovered %d Twitter archive files under %s", len(candidates), root)
for path in candidates:
yield path
def parse_source(self, source: Path) -> Iterator[SourceEvent]:
"""
Parse a Twitter archive file and yield SourceEvents.
Handles both single-file (old) and per-month splitted formats.
Twitter wraps the JSON array in a JS variable assignment: `window.YTD.tweet.part0 = [...]`
"""
try:
with open(source, 'r', encoding='utf-8') as f:
raw = f.read()
# Extract JSON array from the JS wrapper
match = re.search(r'=\s*(\[.+?\])\s*;?\s*$', raw, re.DOTALL)
if match:
json_str = match.group(1)
records = json.loads(json_str)
else:
# Plain JSON array (no wrapper)
records = json.loads(raw)
logger.debug("Parsing %d tweet records from %s", len(records), source)
for record in records:
event = self._record_to_event(record, source)
if event:
yield event
except Exception as e:
logger.error("Failed to parse %s: %s", source, e)
def _record_to_event(self, record: dict, source_path: Path) -> Optional[SourceEvent]:
"""
Convert a single tweet record into a SourceEvent.
The record can be either the wrapped format {"tweet": {...}}} or the bare tweet object.
"""
# Unwrap the tweet object
tweet = record.get('tweet', record)
# Extract core fields
id_str = tweet.get('id_str') or tweet.get('id')
full_text = tweet.get('full_text') or tweet.get('text', '')
created_at = tweet.get('created_at', '')
# Parse timestamp
try:
dt = datetime.strptime(created_at, self.TWITTER_DATE_FMT)
iso_ts = dt.astimezone().isoformat()
except Exception:
iso_ts = created_at # fallback: keep as-is
# Author is always the account owner (Twitter archives don't include others' DMs by default)
account = "user_archive" # normalized account identifier
# Thread/channel: individual tweets have no thread ID; threads aren't preserved in basic export
thread_id = f"tweet_{id_str}"
# Attachments: extract media URLs
attachments = []
extended_entities = tweet.get('extended_entities', {})
for media in extended_entities.get('media', []):
url = media.get('media_url_https') or media.get('media_url')
if url:
attachments.append(url)
# Build raw_ref
raw_ref = f"twitter:archive:{source_path.name}:{id_str}"
# Compute hash
content_for_hash = full_text or ""
hash_val = compute_event_hash(
source="twitter",
raw_ref=raw_ref,
content=content_for_hash,
timestamp=iso_ts,
author=account
)
# Preserve metadata for provenance
metadata = {
"tweet_id": id_str,
"source_file": str(source_path),
"favorite_count": tweet.get('favorite_count'),
"retweet_count": tweet.get('retweet_count'),
"in_reply_to_status_id": tweet.get('in_reply_to_status_id_str'),
"lang": tweet.get('lang'),
}
return SourceEvent(
source="twitter",
account=account,
thread_or_channel=thread_id,
author=account,
timestamp=iso_ts,
content=full_text,
attachments=attachments,
raw_ref=raw_ref,
hash=hash_val,
consent_scope=self.consent_scope,
metadata=metadata
)

116
scripts/run_connector.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
scripts/run_connector.py — Run a personal archive connector and emit SourceEvents.
Usage:
python3 scripts/run_connector.py twitter --source /path/to/twitter/archive --output events.jsonl [--limit 100]
This is the entry point that ties the connectors pack into the existing compounding-intelligence
pipeline. Output is JSONL (one SourceEvent per line), ready for downstream ingestion by
harvester.py or a future connector-targeted harvester.
"""
import argparse
import json
import logging
import sys
from pathlib import Path
# Add parent dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from connectors import get_connector, list_connectors
from connectors.base import BaseConnector
from connectors.schema import SourceEvent
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("run_connector")
def main():
parser = argparse.ArgumentParser(
description="Run a personal archive connector and emit normalized events."
)
parser.add_argument(
"connector",
choices=list_connectors(),
help="Connector name to run"
)
parser.add_argument(
"--source", "-s",
required=True,
help="Path to the source archive root (e.g., ~/Documents/TwitterArchive)"
)
parser.add_argument(
"--output", "-o",
required=True,
help="Output file path (JSONL, one SourceEvent per line)"
)
parser.add_argument(
"--limit", "-n",
type=int,
default=None,
help="Stop after N events (default: unlimited)"
)
parser.add_argument(
"--consent-scope",
choices=["memory_only", "bootstrap_context", "training_data"],
default="memory_only",
help="Consent scope for emitted events (default: memory_only)"
)
parser.add_argument(
"--checkpoint",
type=Path,
default=None,
help="Checkpoint file path (default: ~/.cache/connectors/{name}.checkpoint.jsonl)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and count events but do not write output"
)
args = parser.parse_args()
# Resolve connector
connector_cls = get_connector(args.connector)
connector: BaseConnector = connector_cls(
checkpoint_path=args.checkpoint,
consent_scope=args.consent_scope
)
# Resolve source path
source_path = Path(args.source).expanduser().resolve()
if not source_path.exists():
logger.error("Source path does not exist: %s", source_path)
sys.exit(1)
# Run connector
logger.info("Running connector '%s' on source: %s", args.connector, source_path)
events = connector.run(source_path, limit=args.limit)
if args.dry_run:
count = sum(1 for _ in events)
logger.info("[DRY RUN] Would emit %d events", count)
return 0
# Write output
output_path = Path(args.output).expanduser().resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with open(output_path, 'w', encoding='utf-8') as out:
for event in events:
out.write(event.to_json() + '\n')
count += 1
logger.info("Connector complete. Emitted %d events to %s", count, output_path)
return 0
if __name__ == "__main__":
sys.exit(main())

270
tests/test_connectors.py Normal file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
"""
tests/test_connectors.py — Test suite for the personal archive connector pack.
Tests cover:
- SourceEvent schema validation
- Event hash determinism
- TwitterArchiveConnector parsing of standard Twitter export format
- Deduplication gate
"""
import json
import hashlib
import tempfile
from pathlib import Path
from datetime import datetime
import pytest
# Add scripts dir to path for sibling imports
import sys
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR.parent))
from connectors.schema import (
SourceEvent,
compute_event_hash,
validate_event,
CONSENT_MEMORY_ONLY,
CONSENT_BOOTSTRAP,
)
from connectors.twitter_archive import TwitterArchiveConnector
class TestSourceEventSchema:
"""Tests for SourceEvent dataclass and helpers."""
def test_create_minimal_event(self):
event = SourceEvent(
source="twitter",
account="user123",
thread_or_channel="tweet_456",
author="user123",
timestamp="2026-04-26T12:00:00Z",
content="Hello world",
attachments=[],
raw_ref="twitter:test:456",
hash="",
)
assert event.source == "twitter"
assert event.consent_scope == CONSENT_MEMORY_ONLY # default
def test_compute_event_hash_deterministic(self):
h1 = compute_event_hash(
source="twitter",
raw_ref="ref:123",
content="test content",
timestamp="2026-04-26T12:00:00Z",
author="alice"
)
h2 = compute_event_hash(
source="twitter",
raw_ref="ref:123",
content="test content",
timestamp="2026-04-26T12:00:00Z",
author="alice"
)
assert h1 == h2
assert len(h1) == 64 # SHA-256 hex
def test_compute_event_hash_different_inputs(self):
h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice")
h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice")
assert h1 != h2
def test_validate_event_accepts_valid(self):
event = SourceEvent(
source="discord",
account="user#1234",
thread_or_channel="channel_abc",
author="user#1234",
timestamp="2026-04-26T12:00:00Z",
content="test",
attachments=[],
raw_ref="discord:msg:123",
hash="a" * 64,
)
assert validate_event(event) is True
def test_validate_event_rejects_empty_content(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="thread",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="", # empty
attachments=[],
raw_ref="ref",
hash="a" * 64,
)
assert validate_event(event) is False
def test_validate_event_rejects_missing_hash(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="thread",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="test",
attachments=[],
raw_ref="ref",
hash=" ", # whitespace only
)
assert validate_event(event) is False
def test_event_to_json_roundtrip(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="t1",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="hello",
attachments=["https://example.com/img.jpg"],
raw_ref="twitter:123",
hash="b" * 64,
metadata={"retweet_count": 5}
)
json_str = event.to_json()
parsed = json.loads(json_str)
assert parsed["source"] == "twitter"
assert parsed["metadata"]["retweet_count"] == 5
class TestTwitterArchiveConnector:
"""Tests for the Twitter/X archive connector."""
def test_connector_name(self):
assert TwitterArchiveConnector.name == "twitter_archive"
def test_discover_sources_finds_tweet_js(self, tmp_path: Path):
# Arrange: create a fake Twitter archive structure
archive = tmp_path / "twitter_archive"
archive.mkdir()
data_dir = archive / "data"
data_dir.mkdir()
(data_dir / "tweet.js").write_text("[]")
(data_dir / "tweets_2024_01.js").write_text("[]")
connector = TwitterArchiveConnector()
sources = list(connector.discover_sources(archive))
assert len(sources) == 2
assert any("tweet.js" in str(p) for p in sources)
def test_parse_single_tweet_wrapped_format(self, tmp_path: Path):
"""
Twitter's official export wraps the JSON array in a JS assignment:
window.YTD.tweet.part0 = [ {...tweet...}, ... ];
"""
# Create a minimal tweet record
tweet = {
"id_str": "1234567890",
"full_text": "Hello from Twitter archive!",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
"favorite_count": 10,
"retweet_count": 2,
"lang": "en"
}
wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n"
js_file = tmp_path / "tweet.js"
js_file.write_text(wrapped)
connector = TwitterArchiveConnector()
events = list(connector.parse_source(js_file))
assert len(events) == 1
ev = events[0]
assert ev.source == "twitter"
assert ev.content == "Hello from Twitter archive!"
assert ev.author == "user_archive"
assert ev.consent_scope == "memory_only"
# Hash must be computed
assert len(ev.hash) == 64
# Metadata preservation
assert ev.metadata["tweet_id"] == "1234567890"
assert ev.metadata["favorite_count"] == 10
def test_parse_tweet_array_without_wrapper(self, tmp_path: Path):
"""Some Twitter exports are plain JSON arrays (no JS wrapper)."""
tweet = {
"id_str": "999",
"full_text": "Plain JSON tweet",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
}
json_file = tmp_path / "tweets.json"
json_file.write_text(json.dumps([{"tweet": tweet}]))
connector = TwitterArchiveConnector()
events = list(connector.parse_source(json_file))
assert len(events) == 1
assert events[0].content == "Plain JSON tweet"
def test_parse_with_media_attachments(self, tmp_path: Path):
tweet = {
"id_str": "111",
"full_text": "Check this photo",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
"extended_entities": {
"media": [
{"media_url_https": "https://pbs.twimg.com/media/example1.jpg"},
{"media_url_https": "https://pbs.twimg.com/media/example2.jpg"},
]
}
}
js_file = tmp_path / "tweet.js"
js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n")
connector = TwitterArchiveConnector()
events = list(connector.parse_source(js_file))
assert len(events) == 1
atts = events[0].attachments
assert len(atts) == 2
assert "example1.jpg" in atts[0]
def test_integration_run_connector(self, tmp_path: Path):
"""End-to-end: create a mini archive, run connector, write JSONL output."""
# Arrange: create archive with two tweets
archive_root = tmp_path / "my_twitter_archive" / "data"
archive_root.mkdir(parents=True)
tweet1 = {
"id_str": "1",
"full_text": "First tweet",
"created_at": "Mon Apr 26 08:00:00 +0000 2026",
}
tweet2 = {
"id_str": "2",
"full_text": "Second tweet",
"created_at": "Mon Apr 26 09:00:00 +0000 2026",
}
(archive_root / "tweet.js").write_text(
"window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n"
)
connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl")
output_path = tmp_path / "events.jsonl"
# Act
count = 0
with open(output_path, 'w') as out:
for event in connector.run(archive_root):
out.write(event.to_json() + '\n')
count += 1
# Assert
assert count == 2
lines = output_path.read_text().strip().split('\n')
assert len(lines) == 2
ev1 = json.loads(lines[0])
assert ev1["content"] == "First tweet"
ev2 = json.loads(lines[1])
assert ev2["content"] == "Second tweet"
# Check duplicates are filtered on re-run
count2 = sum(1 for _ in connector.run(archive_root))
assert count2 == 0 # all deduped via checkpoint