Files
compounding-intelligence/scripts/run_connector.py

117 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""
scripts/run_connector.py — Run a personal archive connector and emit SourceEvents.
Usage:
python3 scripts/run_connector.py twitter --source /path/to/twitter/archive --output events.jsonl [--limit 100]
This is the entry point that ties the connectors pack into the existing compounding-intelligence
pipeline. Output is JSONL (one SourceEvent per line), ready for downstream ingestion by
harvester.py or a future connector-targeted harvester.
"""
import argparse
import json
import logging
import sys
from pathlib import Path
# Add parent dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from connectors import get_connector, list_connectors
from connectors.base import BaseConnector
from connectors.schema import SourceEvent
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("run_connector")
def main():
parser = argparse.ArgumentParser(
description="Run a personal archive connector and emit normalized events."
)
parser.add_argument(
"connector",
choices=list_connectors(),
help="Connector name to run"
)
parser.add_argument(
"--source", "-s",
required=True,
help="Path to the source archive root (e.g., ~/Documents/TwitterArchive)"
)
parser.add_argument(
"--output", "-o",
required=True,
help="Output file path (JSONL, one SourceEvent per line)"
)
parser.add_argument(
"--limit", "-n",
type=int,
default=None,
help="Stop after N events (default: unlimited)"
)
parser.add_argument(
"--consent-scope",
choices=["memory_only", "bootstrap_context", "training_data"],
default="memory_only",
help="Consent scope for emitted events (default: memory_only)"
)
parser.add_argument(
"--checkpoint",
type=Path,
default=None,
help="Checkpoint file path (default: ~/.cache/connectors/{name}.checkpoint.jsonl)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and count events but do not write output"
)
args = parser.parse_args()
# Resolve connector
connector_cls = get_connector(args.connector)
connector: BaseConnector = connector_cls(
checkpoint_path=args.checkpoint,
consent_scope=args.consent_scope
)
# Resolve source path
source_path = Path(args.source).expanduser().resolve()
if not source_path.exists():
logger.error("Source path does not exist: %s", source_path)
sys.exit(1)
# Run connector
logger.info("Running connector '%s' on source: %s", args.connector, source_path)
events = connector.run(source_path, limit=args.limit)
if args.dry_run:
count = sum(1 for _ in events)
logger.info("[DRY RUN] Would emit %d events", count)
return 0
# Write output
output_path = Path(args.output).expanduser().resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with open(output_path, 'w', encoding='utf-8') as out:
for event in events:
out.write(event.to_json() + '\n')
count += 1
logger.info("Connector complete. Emitted %d events to %s", count, output_path)
return 0
if __name__ == "__main__":
sys.exit(main())