Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2133b18929 | |||
| c4cb325568 | |||
| ca21e3e886 | |||
| 8628a0d610 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,5 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
.pytest_cache/
|
||||
.mypy_cache/
|
||||
|
||||
9
Makefile
9
Makefile
@@ -2,3 +2,12 @@
|
||||
|
||||
test:
|
||||
python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v
|
||||
|
||||
# Connector targets
|
||||
.PHONY: test-connectors
|
||||
test-connectors:
|
||||
python3 -m pytest tests/test_connectors.py -v
|
||||
|
||||
.PHONY: connectors-help
|
||||
connectors-help:
|
||||
python3 scripts/run_connector.py --help
|
||||
|
||||
13
README.md
13
README.md
@@ -27,6 +27,11 @@ Before a session starts, queries knowledge store for relevant facts. Assembles c
|
||||
### Pipeline 3: Measure
|
||||
Tracks whether compounding is happening. Knowledge velocity, error reduction, hit rate, task completion. Daily report proves the loop works.
|
||||
|
||||
### Connector Pack (EPIC #233)
|
||||
Sovereign personal archive connectors: Twitter/X, Discord, Slack, WhatsApp, Notion, iMessage, Google.
|
||||
Connectors mirror local exports or explicit API tokens → normalize → redact → index → sync with provenance.
|
||||
See [`connectors/`](connectors/README.md) for the full connector suite and usage.
|
||||
|
||||
## Directory Structure
|
||||
|
||||
```
|
||||
@@ -40,6 +45,12 @@ Tracks whether compounding is happening. Knowledge velocity, error reduction, hi
|
||||
│ ├── bootstrapper.py # Pre-session context loader
|
||||
│ ├── measurer.py # Compounding metrics
|
||||
│ └── session_reader.py # JSONL parser
|
||||
├── connectors/ # Personal archive connectors (EPIC #233)
|
||||
│ ├── __init__.py
|
||||
│ ├── base.py
|
||||
│ ├── schema.py
|
||||
│ ├── twitter_archive.py
|
||||
│ └── README.md
|
||||
├── metrics/
|
||||
│ └── dashboard.md # Human-readable status
|
||||
└── templates/
|
||||
@@ -65,4 +76,4 @@ See [all issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/compound
|
||||
- EPIC 1: Session Harvester (#2)
|
||||
- EPIC 2: Knowledge Store & Bootstrap (#3)
|
||||
- EPIC 3: Compounding Measurement (#4)
|
||||
- EPIC 4: Retroactive Harvest (#5)
|
||||
- EPIC 4: Retroactive Harvest (#5)
|
||||
66
connectors/README.md
Normal file
66
connectors/README.md
Normal file
@@ -0,0 +1,66 @@
|
||||
# Sovereign Personal Archive Connector Pack
|
||||
|
||||
This directory contains the connector infrastructure for ingesting personal archives
|
||||
(Discord, Slack, WhatsApp, Notion, iMessage, X/Twitter, Google) into the compounding-intelligence
|
||||
knowledge pipeline.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Run the Twitter archive connector
|
||||
python3 scripts/run_connector.py twitter \
|
||||
--source ~/Documents/TwitterArchive \
|
||||
--output events.jsonl \
|
||||
--limit 100
|
||||
```
|
||||
|
||||
## Connector Output Format
|
||||
|
||||
Each connector emits `SourceEvent` objects (one JSON line per event):
|
||||
|
||||
```json
|
||||
{
|
||||
"source": "twitter",
|
||||
"account": "user_archive",
|
||||
"thread_or_channel": "tweet_123456",
|
||||
"author": "user_archive",
|
||||
"timestamp": "2026-04-26T08:30:00+00:00",
|
||||
"content": "Tweet text here",
|
||||
"attachments": ["https://..."],
|
||||
"raw_ref": "twitter:archive:tweet.js:123456",
|
||||
"hash": "sha256...",
|
||||
"consent_scope": "memory_only",
|
||||
"metadata": { "tweet_id": "123456", "favorite_count": 10 }
|
||||
}
|
||||
```
|
||||
|
||||
## Connector Registry
|
||||
|
||||
| Name | Source Format | Status |
|
||||
|----------------|-----------------------------|----------|
|
||||
| twitter_archive| Official Twitter data export| ✅ Working |
|
||||
| discord_archive| Discord data package / JSON | ⏳ Planned |
|
||||
| slack_archive | Slack export / API | ⏳ Planned |
|
||||
| whatsapp_archive| WhatsApp Desktop export | ⏳ Planned |
|
||||
| notion_archive | Notion markdown/SQLite | ⏳ Planned |
|
||||
| imessage_archive| macOS local chat storage | ⏳ Planned |
|
||||
| google_archive | Google Workspace CLI | ⏳ Planned |
|
||||
|
||||
## Design Principles
|
||||
|
||||
1. **Local-first**: Connectors operate on user-owned exports or explicit API credentials.
|
||||
2. **Incremental**: Checkpoint files (~/.cache/connectors/) allow resumable processing.
|
||||
3. **Consent-gated**: Default `consent_scope=memory_only` — explicit opt-in for broader use.
|
||||
4. **Provenance-preserving**: `metadata` retains all raw fields; `hash` enables deduplication.
|
||||
5. **Sovereign**: No ambient scraping. No cloud dependency unless user explicitly configures tokens.
|
||||
|
||||
## Writing a New Connector
|
||||
|
||||
Subclass `BaseConnector` from `connectors/base.py` and implement:
|
||||
|
||||
- `discover_sources(root: Path) -> Iterator[Path|str]` — find source files or IDs
|
||||
- `parse_source(source) -> Iterator[SourceEvent]` — emit normalized events
|
||||
|
||||
Register in `connectors/__init__.py` `_REGISTRY` dict.
|
||||
|
||||
See `connectors/twitter_archive.py` for a complete example.
|
||||
50
connectors/__init__.py
Normal file
50
connectors/__init__.py
Normal file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/__init__.py — Sovereign personal archive connector pack.
|
||||
|
||||
Provides:
|
||||
- BaseConnector: abstract base class for all connectors
|
||||
- SourceEvent: unified event schema
|
||||
- compute_event_hash, validate_event: utilities
|
||||
- Registry: connector discovery and loading
|
||||
|
||||
Connectors:
|
||||
- TwitterArchiveConnector: parse official Twitter/X archive exports
|
||||
(Future: Discord, Slack, WhatsApp, Notion, iMessage, Google)
|
||||
"""
|
||||
|
||||
from .base import BaseConnector
|
||||
from .schema import (
|
||||
SourceEvent,
|
||||
compute_event_hash,
|
||||
validate_event,
|
||||
CONSENT_MEMORY_ONLY,
|
||||
CONSENT_BOOTSTRAP,
|
||||
CONSENT_TRAINING,
|
||||
)
|
||||
from .twitter_archive import TwitterArchiveConnector
|
||||
|
||||
# Auto-registry: map of connector name → class
|
||||
_REGISTRY = {
|
||||
"twitter_archive": TwitterArchiveConnector,
|
||||
# Future connectors:
|
||||
# "discord_archive": DiscordArchiveConnector,
|
||||
# "slack_archive": SlackArchiveConnector,
|
||||
# "whatsapp_archive": WhatsAppArchiveConnector,
|
||||
# "notion_archive": NotionArchiveConnector,
|
||||
# "imessage_archive": iMessageArchiveConnector,
|
||||
# "google_archive": GoogleArchiveConnector,
|
||||
}
|
||||
|
||||
|
||||
def get_connector(name: str) -> type[BaseConnector]:
|
||||
"""Get connector class by registry name."""
|
||||
cls = _REGISTRY.get(name)
|
||||
if cls is None:
|
||||
raise ValueError(f"Unknown connector '{name}'. Available: {list(_REGISTRY.keys())}")
|
||||
return cls
|
||||
|
||||
|
||||
def list_connectors() -> list[str]:
|
||||
"""List all registered connector names."""
|
||||
return list(_REGISTRY.keys())
|
||||
100
connectors/base.py
Normal file
100
connectors/base.py
Normal file
@@ -0,0 +1,100 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/base.py — Abstract base class for all personal archive connectors.
|
||||
|
||||
Defines the contract every connector must implement. Connectors read local
|
||||
exports or API data and yield SourceEvent objects.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional, Dict, Any
|
||||
import json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseConnector(ABC):
|
||||
name: str = None
|
||||
source_glob: Optional[str] = None
|
||||
default_consent_scope: str = "memory_only"
|
||||
|
||||
def __init__(self, checkpoint_path: Optional[Path] = None,
|
||||
consent_scope: Optional[str] = None):
|
||||
self.consent_scope = consent_scope or self.default_consent_scope
|
||||
if checkpoint_path is None:
|
||||
cache_dir = Path.home() / ".cache" / "connectors"
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl"
|
||||
self.checkpoint_path = checkpoint_path
|
||||
self._processed_hashes = self._load_checkpoint()
|
||||
|
||||
def _load_checkpoint(self) -> set:
|
||||
if not self.checkpoint_path.exists():
|
||||
return set()
|
||||
seen = set()
|
||||
with open(self.checkpoint_path, 'r') as f:
|
||||
for line in f:
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
seen.add(entry.get('hash', ''))
|
||||
except Exception:
|
||||
continue
|
||||
logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name)
|
||||
return seen
|
||||
|
||||
def _save_checkpoint(self, event) -> None:
|
||||
with open(self.checkpoint_path, 'a') as f:
|
||||
f.write(json.dumps({'hash': event.hash, 'source': event.source,
|
||||
'raw_ref': event.raw_ref}) + '\n')
|
||||
|
||||
def mark_processed(self, event_hash: str) -> None:
|
||||
self._processed_hashes.add(event_hash)
|
||||
|
||||
def is_processed(self, event_hash: str) -> bool:
|
||||
return event_hash in self._processed_hashes
|
||||
|
||||
def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator:
|
||||
count = 0
|
||||
skipped_dedup = 0
|
||||
skipped_invalid = 0
|
||||
|
||||
for source in self.discover_sources(source_root):
|
||||
for event in self.parse_source(source):
|
||||
if not event.hash:
|
||||
from .schema import compute_event_hash
|
||||
event.hash = compute_event_hash(
|
||||
event.source, event.raw_ref, event.content,
|
||||
event.timestamp, event.author
|
||||
)
|
||||
|
||||
if event.hash in self._processed_hashes:
|
||||
skipped_dedup += 1
|
||||
continue
|
||||
|
||||
from .schema import validate_event
|
||||
if not validate_event(event):
|
||||
skipped_invalid += 1
|
||||
logger.warning("Invalid event: raw_ref=%s", event.raw_ref)
|
||||
continue
|
||||
|
||||
event.consent_scope = self.consent_scope
|
||||
self._save_checkpoint(event)
|
||||
self._processed_hashes.add(event.hash)
|
||||
|
||||
yield event
|
||||
count += 1
|
||||
if limit and count >= limit:
|
||||
return
|
||||
|
||||
logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d",
|
||||
self.name, count, skipped_dedup, skipped_invalid)
|
||||
|
||||
@abstractmethod
|
||||
def discover_sources(self, root: Path) -> Iterator:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def parse_source(self, source) -> Iterator:
|
||||
pass
|
||||
100
connectors/schema.py
Normal file
100
connectors/schema.py
Normal file
@@ -0,0 +1,100 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/schema.py — Unified source-event schema for personal archive connectors.
|
||||
|
||||
All connectors must produce events conforming to this schema so downstream
|
||||
pipelines (harvester → knowledge store) can process them uniformly.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime
|
||||
from typing import Optional, Any, Dict
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
class SourceEvent:
|
||||
"""
|
||||
Canonical event schema for any ingested personal archive entry.
|
||||
|
||||
Fields
|
||||
------
|
||||
source : str
|
||||
Platform identifier: 'twitter', 'discord', 'slack', 'whatsapp',
|
||||
'notion', 'imessage', 'google', etc.
|
||||
account : str
|
||||
User account/channel identifier on the source platform.
|
||||
thread_or_channel : str
|
||||
Conversation thread, channel, or chat identifier.
|
||||
author : str
|
||||
Who created this content (may differ from account in group chats).
|
||||
timestamp : str
|
||||
ISO-8601 timestamp when the event occurred (not when it was ingested).
|
||||
content : str
|
||||
Primary text content. May be empty for non-text events (images only).
|
||||
attachments : list[str]
|
||||
List of local file paths or URLs for attached media.
|
||||
raw_ref : str
|
||||
Pointer to the raw source record (file path, message ID, URL, etc.).
|
||||
hash : str
|
||||
SHA-256 hash of the raw content for deduplication and provenance.
|
||||
consent_scope : str
|
||||
Privacy gate: where this content may be used.
|
||||
Examples: 'memory_only', 'bootstrap_context', 'training_data'.
|
||||
Default: 'memory_only' for ingested personal archives.
|
||||
metadata : dict[str, Any]
|
||||
Platform-specific fields retained for provenance but not indexed.
|
||||
"""
|
||||
source: str
|
||||
account: str
|
||||
thread_or_channel: str
|
||||
author: str
|
||||
timestamp: str
|
||||
content: str
|
||||
attachments: list[str]
|
||||
raw_ref: str
|
||||
hash: str
|
||||
consent_scope: str = "memory_only"
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to plain dict for JSON serialization."""
|
||||
d = asdict(self)
|
||||
if d['metadata'] is None:
|
||||
d['metadata'] = {}
|
||||
return d
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Serialize to JSON line (one event per line)."""
|
||||
return json.dumps(self.to_dict(), ensure_ascii=False)
|
||||
|
||||
|
||||
def compute_event_hash(source: str, raw_ref: str, content: str,
|
||||
timestamp: str, author: str) -> str:
|
||||
"""
|
||||
Compute deterministic SHA-256 hash for an event.
|
||||
|
||||
Hash inputs: source + raw_ref + content + timestamp + author.
|
||||
This ensures identical content always produces the same hash,
|
||||
enabling cross-connector deduplication.
|
||||
"""
|
||||
canonical = f"{source}|{raw_ref}|{content}|{timestamp}|{author}"
|
||||
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def validate_event(event: SourceEvent) -> bool:
|
||||
"""
|
||||
Minimal structural validation for a SourceEvent.
|
||||
Returns True if required fields are present and well-formed.
|
||||
"""
|
||||
required = [event.source, event.account, event.thread_or_channel,
|
||||
event.author, event.timestamp, event.content, event.raw_ref,
|
||||
event.hash]
|
||||
return all(str(x).strip() for x in required)
|
||||
|
||||
|
||||
# Consent scope definitions
|
||||
CONSENT_MEMORY_ONLY = "memory_only" # For retrieval only, not bootstrap
|
||||
CONSENT_BOOTSTRAP = "bootstrap_context" # Can seed new sessions
|
||||
CONSENT_TRAINING = "training_data" # May be used for model training
|
||||
155
connectors/twitter_archive.py
Normal file
155
connectors/twitter_archive.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/twitter_archive.py — Twitter/X personal archive connector.
|
||||
|
||||
Parses official Twitter data exports (Twitter's "Download your data" archive).
|
||||
Expects the tweet.js / tweet.json files from the archive's data/ directory.
|
||||
|
||||
Format (Twitter's archived tweets JSON):
|
||||
Each entry has: {"tweet": {"id_str": "...", "full_text": "...", "created_at": "...", ...}}
|
||||
|
||||
Output: normalized SourceEvent with source='twitter'.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional
|
||||
import logging
|
||||
|
||||
from .base import BaseConnector
|
||||
from .schema import SourceEvent, compute_event_hash
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitterArchiveConnector(BaseConnector):
|
||||
"""Connector for Twitter/X official archive exports."""
|
||||
name = "twitter_archive"
|
||||
source_glob = "**/tweet*.json"
|
||||
default_consent_scope = "memory_only"
|
||||
|
||||
# Twitter's date format in archives: "Wed Oct 10 20:19:24 +0000 2018"
|
||||
TWITTER_DATE_FMT = "%a %b %d %H:%M:%S %z %Y"
|
||||
|
||||
def discover_sources(self, root: Path) -> Iterator[Path]:
|
||||
"""
|
||||
Find tweet.js / tweet.json files in a Twitter archive.
|
||||
|
||||
The official Twitter export places these under:
|
||||
root/
|
||||
data/
|
||||
tweet.js (single-file format, older exports)
|
||||
or
|
||||
account-XXXX-YYYY/
|
||||
tweets.js (per-month splitted format)
|
||||
"""
|
||||
root = Path(root)
|
||||
# Search for .js files that start with 'tweet' — these contain the tweet JSON blobs
|
||||
candidates = list(root.rglob("tweet*.js")) + list(root.rglob("tweet*.json"))
|
||||
logger.info("Discovered %d Twitter archive files under %s", len(candidates), root)
|
||||
for path in candidates:
|
||||
yield path
|
||||
|
||||
def parse_source(self, source: Path) -> Iterator[SourceEvent]:
|
||||
"""
|
||||
Parse a Twitter archive file and yield SourceEvents.
|
||||
|
||||
Handles both single-file (old) and per-month splitted formats.
|
||||
Twitter wraps the JSON array in a JS variable assignment: `window.YTD.tweet.part0 = [...]`
|
||||
"""
|
||||
try:
|
||||
with open(source, 'r', encoding='utf-8') as f:
|
||||
raw = f.read()
|
||||
|
||||
# Extract JSON array from the JS wrapper
|
||||
match = re.search(r'=\s*(\[.+?\])\s*;?\s*$', raw, re.DOTALL)
|
||||
if match:
|
||||
json_str = match.group(1)
|
||||
records = json.loads(json_str)
|
||||
else:
|
||||
# Plain JSON array (no wrapper)
|
||||
records = json.loads(raw)
|
||||
|
||||
logger.debug("Parsing %d tweet records from %s", len(records), source)
|
||||
|
||||
for record in records:
|
||||
event = self._record_to_event(record, source)
|
||||
if event:
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to parse %s: %s", source, e)
|
||||
|
||||
def _record_to_event(self, record: dict, source_path: Path) -> Optional[SourceEvent]:
|
||||
"""
|
||||
Convert a single tweet record into a SourceEvent.
|
||||
|
||||
The record can be either the wrapped format {"tweet": {...}}} or the bare tweet object.
|
||||
"""
|
||||
# Unwrap the tweet object
|
||||
tweet = record.get('tweet', record)
|
||||
|
||||
# Extract core fields
|
||||
id_str = tweet.get('id_str') or tweet.get('id')
|
||||
full_text = tweet.get('full_text') or tweet.get('text', '')
|
||||
created_at = tweet.get('created_at', '')
|
||||
|
||||
# Parse timestamp
|
||||
try:
|
||||
dt = datetime.strptime(created_at, self.TWITTER_DATE_FMT)
|
||||
iso_ts = dt.astimezone().isoformat()
|
||||
except Exception:
|
||||
iso_ts = created_at # fallback: keep as-is
|
||||
|
||||
# Author is always the account owner (Twitter archives don't include others' DMs by default)
|
||||
account = "user_archive" # normalized account identifier
|
||||
|
||||
# Thread/channel: individual tweets have no thread ID; threads aren't preserved in basic export
|
||||
thread_id = f"tweet_{id_str}"
|
||||
|
||||
# Attachments: extract media URLs
|
||||
attachments = []
|
||||
extended_entities = tweet.get('extended_entities', {})
|
||||
for media in extended_entities.get('media', []):
|
||||
url = media.get('media_url_https') or media.get('media_url')
|
||||
if url:
|
||||
attachments.append(url)
|
||||
|
||||
# Build raw_ref
|
||||
raw_ref = f"twitter:archive:{source_path.name}:{id_str}"
|
||||
|
||||
# Compute hash
|
||||
content_for_hash = full_text or ""
|
||||
hash_val = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref=raw_ref,
|
||||
content=content_for_hash,
|
||||
timestamp=iso_ts,
|
||||
author=account
|
||||
)
|
||||
|
||||
# Preserve metadata for provenance
|
||||
metadata = {
|
||||
"tweet_id": id_str,
|
||||
"source_file": str(source_path),
|
||||
"favorite_count": tweet.get('favorite_count'),
|
||||
"retweet_count": tweet.get('retweet_count'),
|
||||
"in_reply_to_status_id": tweet.get('in_reply_to_status_id_str'),
|
||||
"lang": tweet.get('lang'),
|
||||
}
|
||||
|
||||
return SourceEvent(
|
||||
source="twitter",
|
||||
account=account,
|
||||
thread_or_channel=thread_id,
|
||||
author=account,
|
||||
timestamp=iso_ts,
|
||||
content=full_text,
|
||||
attachments=attachments,
|
||||
raw_ref=raw_ref,
|
||||
hash=hash_val,
|
||||
consent_scope=self.consent_scope,
|
||||
metadata=metadata
|
||||
)
|
||||
116
scripts/run_connector.py
Normal file
116
scripts/run_connector.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
scripts/run_connector.py — Run a personal archive connector and emit SourceEvents.
|
||||
|
||||
Usage:
|
||||
python3 scripts/run_connector.py twitter --source /path/to/twitter/archive --output events.jsonl [--limit 100]
|
||||
|
||||
This is the entry point that ties the connectors pack into the existing compounding-intelligence
|
||||
pipeline. Output is JSONL (one SourceEvent per line), ready for downstream ingestion by
|
||||
harvester.py or a future connector-targeted harvester.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent dir to path for sibling imports
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from connectors import get_connector, list_connectors
|
||||
from connectors.base import BaseConnector
|
||||
from connectors.schema import SourceEvent
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||||
)
|
||||
logger = logging.getLogger("run_connector")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run a personal archive connector and emit normalized events."
|
||||
)
|
||||
parser.add_argument(
|
||||
"connector",
|
||||
choices=list_connectors(),
|
||||
help="Connector name to run"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source", "-s",
|
||||
required=True,
|
||||
help="Path to the source archive root (e.g., ~/Documents/TwitterArchive)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output", "-o",
|
||||
required=True,
|
||||
help="Output file path (JSONL, one SourceEvent per line)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--limit", "-n",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Stop after N events (default: unlimited)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--consent-scope",
|
||||
choices=["memory_only", "bootstrap_context", "training_data"],
|
||||
default="memory_only",
|
||||
help="Consent scope for emitted events (default: memory_only)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--checkpoint",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Checkpoint file path (default: ~/.cache/connectors/{name}.checkpoint.jsonl)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Parse and count events but do not write output"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Resolve connector
|
||||
connector_cls = get_connector(args.connector)
|
||||
connector: BaseConnector = connector_cls(
|
||||
checkpoint_path=args.checkpoint,
|
||||
consent_scope=args.consent_scope
|
||||
)
|
||||
|
||||
# Resolve source path
|
||||
source_path = Path(args.source).expanduser().resolve()
|
||||
if not source_path.exists():
|
||||
logger.error("Source path does not exist: %s", source_path)
|
||||
sys.exit(1)
|
||||
|
||||
# Run connector
|
||||
logger.info("Running connector '%s' on source: %s", args.connector, source_path)
|
||||
events = connector.run(source_path, limit=args.limit)
|
||||
|
||||
if args.dry_run:
|
||||
count = sum(1 for _ in events)
|
||||
logger.info("[DRY RUN] Would emit %d events", count)
|
||||
return 0
|
||||
|
||||
# Write output
|
||||
output_path = Path(args.output).expanduser().resolve()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
count = 0
|
||||
with open(output_path, 'w', encoding='utf-8') as out:
|
||||
for event in events:
|
||||
out.write(event.to_json() + '\n')
|
||||
count += 1
|
||||
|
||||
logger.info("Connector complete. Emitted %d events to %s", count, output_path)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
270
tests/test_connectors.py
Normal file
270
tests/test_connectors.py
Normal file
@@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tests/test_connectors.py — Test suite for the personal archive connector pack.
|
||||
|
||||
Tests cover:
|
||||
- SourceEvent schema validation
|
||||
- Event hash determinism
|
||||
- TwitterArchiveConnector parsing of standard Twitter export format
|
||||
- Deduplication gate
|
||||
"""
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import pytest
|
||||
|
||||
# Add scripts dir to path for sibling imports
|
||||
import sys
|
||||
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPT_DIR.parent))
|
||||
|
||||
from connectors.schema import (
|
||||
SourceEvent,
|
||||
compute_event_hash,
|
||||
validate_event,
|
||||
CONSENT_MEMORY_ONLY,
|
||||
CONSENT_BOOTSTRAP,
|
||||
)
|
||||
from connectors.twitter_archive import TwitterArchiveConnector
|
||||
|
||||
|
||||
class TestSourceEventSchema:
|
||||
"""Tests for SourceEvent dataclass and helpers."""
|
||||
|
||||
def test_create_minimal_event(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user123",
|
||||
thread_or_channel="tweet_456",
|
||||
author="user123",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="Hello world",
|
||||
attachments=[],
|
||||
raw_ref="twitter:test:456",
|
||||
hash="",
|
||||
)
|
||||
assert event.source == "twitter"
|
||||
assert event.consent_scope == CONSENT_MEMORY_ONLY # default
|
||||
|
||||
def test_compute_event_hash_deterministic(self):
|
||||
h1 = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref="ref:123",
|
||||
content="test content",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
author="alice"
|
||||
)
|
||||
h2 = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref="ref:123",
|
||||
content="test content",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
author="alice"
|
||||
)
|
||||
assert h1 == h2
|
||||
assert len(h1) == 64 # SHA-256 hex
|
||||
|
||||
def test_compute_event_hash_different_inputs(self):
|
||||
h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice")
|
||||
h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice")
|
||||
assert h1 != h2
|
||||
|
||||
def test_validate_event_accepts_valid(self):
|
||||
event = SourceEvent(
|
||||
source="discord",
|
||||
account="user#1234",
|
||||
thread_or_channel="channel_abc",
|
||||
author="user#1234",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="test",
|
||||
attachments=[],
|
||||
raw_ref="discord:msg:123",
|
||||
hash="a" * 64,
|
||||
)
|
||||
assert validate_event(event) is True
|
||||
|
||||
def test_validate_event_rejects_empty_content(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="thread",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="", # empty
|
||||
attachments=[],
|
||||
raw_ref="ref",
|
||||
hash="a" * 64,
|
||||
)
|
||||
assert validate_event(event) is False
|
||||
|
||||
def test_validate_event_rejects_missing_hash(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="thread",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="test",
|
||||
attachments=[],
|
||||
raw_ref="ref",
|
||||
hash=" ", # whitespace only
|
||||
)
|
||||
assert validate_event(event) is False
|
||||
|
||||
def test_event_to_json_roundtrip(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="t1",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="hello",
|
||||
attachments=["https://example.com/img.jpg"],
|
||||
raw_ref="twitter:123",
|
||||
hash="b" * 64,
|
||||
metadata={"retweet_count": 5}
|
||||
)
|
||||
json_str = event.to_json()
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["source"] == "twitter"
|
||||
assert parsed["metadata"]["retweet_count"] == 5
|
||||
|
||||
|
||||
class TestTwitterArchiveConnector:
|
||||
"""Tests for the Twitter/X archive connector."""
|
||||
|
||||
def test_connector_name(self):
|
||||
assert TwitterArchiveConnector.name == "twitter_archive"
|
||||
|
||||
def test_discover_sources_finds_tweet_js(self, tmp_path: Path):
|
||||
# Arrange: create a fake Twitter archive structure
|
||||
archive = tmp_path / "twitter_archive"
|
||||
archive.mkdir()
|
||||
data_dir = archive / "data"
|
||||
data_dir.mkdir()
|
||||
(data_dir / "tweet.js").write_text("[]")
|
||||
(data_dir / "tweets_2024_01.js").write_text("[]")
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
sources = list(connector.discover_sources(archive))
|
||||
|
||||
assert len(sources) == 2
|
||||
assert any("tweet.js" in str(p) for p in sources)
|
||||
|
||||
def test_parse_single_tweet_wrapped_format(self, tmp_path: Path):
|
||||
"""
|
||||
Twitter's official export wraps the JSON array in a JS assignment:
|
||||
window.YTD.tweet.part0 = [ {...tweet...}, ... ];
|
||||
"""
|
||||
# Create a minimal tweet record
|
||||
tweet = {
|
||||
"id_str": "1234567890",
|
||||
"full_text": "Hello from Twitter archive!",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
"favorite_count": 10,
|
||||
"retweet_count": 2,
|
||||
"lang": "en"
|
||||
}
|
||||
wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n"
|
||||
|
||||
js_file = tmp_path / "tweet.js"
|
||||
js_file.write_text(wrapped)
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(js_file))
|
||||
|
||||
assert len(events) == 1
|
||||
ev = events[0]
|
||||
assert ev.source == "twitter"
|
||||
assert ev.content == "Hello from Twitter archive!"
|
||||
assert ev.author == "user_archive"
|
||||
assert ev.consent_scope == "memory_only"
|
||||
# Hash must be computed
|
||||
assert len(ev.hash) == 64
|
||||
# Metadata preservation
|
||||
assert ev.metadata["tweet_id"] == "1234567890"
|
||||
assert ev.metadata["favorite_count"] == 10
|
||||
|
||||
def test_parse_tweet_array_without_wrapper(self, tmp_path: Path):
|
||||
"""Some Twitter exports are plain JSON arrays (no JS wrapper)."""
|
||||
tweet = {
|
||||
"id_str": "999",
|
||||
"full_text": "Plain JSON tweet",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
}
|
||||
json_file = tmp_path / "tweets.json"
|
||||
json_file.write_text(json.dumps([{"tweet": tweet}]))
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(json_file))
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].content == "Plain JSON tweet"
|
||||
|
||||
def test_parse_with_media_attachments(self, tmp_path: Path):
|
||||
tweet = {
|
||||
"id_str": "111",
|
||||
"full_text": "Check this photo",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
"extended_entities": {
|
||||
"media": [
|
||||
{"media_url_https": "https://pbs.twimg.com/media/example1.jpg"},
|
||||
{"media_url_https": "https://pbs.twimg.com/media/example2.jpg"},
|
||||
]
|
||||
}
|
||||
}
|
||||
js_file = tmp_path / "tweet.js"
|
||||
js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n")
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(js_file))
|
||||
|
||||
assert len(events) == 1
|
||||
atts = events[0].attachments
|
||||
assert len(atts) == 2
|
||||
assert "example1.jpg" in atts[0]
|
||||
|
||||
def test_integration_run_connector(self, tmp_path: Path):
|
||||
"""End-to-end: create a mini archive, run connector, write JSONL output."""
|
||||
# Arrange: create archive with two tweets
|
||||
archive_root = tmp_path / "my_twitter_archive" / "data"
|
||||
archive_root.mkdir(parents=True)
|
||||
|
||||
tweet1 = {
|
||||
"id_str": "1",
|
||||
"full_text": "First tweet",
|
||||
"created_at": "Mon Apr 26 08:00:00 +0000 2026",
|
||||
}
|
||||
tweet2 = {
|
||||
"id_str": "2",
|
||||
"full_text": "Second tweet",
|
||||
"created_at": "Mon Apr 26 09:00:00 +0000 2026",
|
||||
}
|
||||
(archive_root / "tweet.js").write_text(
|
||||
"window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n"
|
||||
)
|
||||
|
||||
connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl")
|
||||
output_path = tmp_path / "events.jsonl"
|
||||
|
||||
# Act
|
||||
count = 0
|
||||
with open(output_path, 'w') as out:
|
||||
for event in connector.run(archive_root):
|
||||
out.write(event.to_json() + '\n')
|
||||
count += 1
|
||||
|
||||
# Assert
|
||||
assert count == 2
|
||||
lines = output_path.read_text().strip().split('\n')
|
||||
assert len(lines) == 2
|
||||
ev1 = json.loads(lines[0])
|
||||
assert ev1["content"] == "First tweet"
|
||||
ev2 = json.loads(lines[1])
|
||||
assert ev2["content"] == "Second tweet"
|
||||
# Check duplicates are filtered on re-run
|
||||
count2 = sum(1 for _ in connector.run(archive_root))
|
||||
assert count2 == 0 # all deduped via checkpoint
|
||||
Reference in New Issue
Block a user