Files
compounding-intelligence/connectors/base.py

101 lines
3.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
connectors/base.py Abstract base class for all personal archive connectors.
Defines the contract every connector must implement. Connectors read local
exports or API data and yield SourceEvent objects.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterator, Optional, Dict, Any
import json
import logging
logger = logging.getLogger(__name__)
class BaseConnector(ABC):
name: str = None
source_glob: Optional[str] = None
default_consent_scope: str = "memory_only"
def __init__(self, checkpoint_path: Optional[Path] = None,
consent_scope: Optional[str] = None):
self.consent_scope = consent_scope or self.default_consent_scope
if checkpoint_path is None:
cache_dir = Path.home() / ".cache" / "connectors"
cache_dir.mkdir(parents=True, exist_ok=True)
checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl"
self.checkpoint_path = checkpoint_path
self._processed_hashes = self._load_checkpoint()
def _load_checkpoint(self) -> set:
if not self.checkpoint_path.exists():
return set()
seen = set()
with open(self.checkpoint_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
seen.add(entry.get('hash', ''))
except Exception:
continue
logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name)
return seen
def _save_checkpoint(self, event) -> None:
with open(self.checkpoint_path, 'a') as f:
f.write(json.dumps({'hash': event.hash, 'source': event.source,
'raw_ref': event.raw_ref}) + '\n')
def mark_processed(self, event_hash: str) -> None:
self._processed_hashes.add(event_hash)
def is_processed(self, event_hash: str) -> bool:
return event_hash in self._processed_hashes
def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator:
count = 0
skipped_dedup = 0
skipped_invalid = 0
for source in self.discover_sources(source_root):
for event in self.parse_source(source):
if not event.hash:
from .schema import compute_event_hash
event.hash = compute_event_hash(
event.source, event.raw_ref, event.content,
event.timestamp, event.author
)
if event.hash in self._processed_hashes:
skipped_dedup += 1
continue
from .schema import validate_event
if not validate_event(event):
skipped_invalid += 1
logger.warning("Invalid event: raw_ref=%s", event.raw_ref)
continue
event.consent_scope = self.consent_scope
self._save_checkpoint(event)
self._processed_hashes.add(event.hash)
yield event
count += 1
if limit and count >= limit:
return
logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d",
self.name, count, skipped_dedup, skipped_invalid)
@abstractmethod
def discover_sources(self, root: Path) -> Iterator:
pass
@abstractmethod
def parse_source(self, source) -> Iterator:
pass