Compare commits

..

5 Commits

Author SHA1 Message Date
2133b18929 fix: correct Makefile syntax (tabs for recipe lines)
Some checks failed
Test / pytest (pull_request) Failing after 12s
2026-04-26 20:47:09 -04:00
c4cb325568 chore: add Python cache exclusions to .gitignore
Some checks failed
Test / pytest (pull_request) Failing after 7s
2026-04-26 20:45:25 -04:00
ca21e3e886 docs: add run_connector.py entry point for CLI execution 2026-04-26 20:45:15 -04:00
8628a0d610 feat(connectors): add sovereign personal archive connector pack foundation
- Add connectors/ directory with base infrastructure
- Implement SourceEvent unified schema (source/account/thread/author/timestamp/content/attachments/raw_ref/hash/consent_scope)
- Create BaseConnector abstract class with checkpoint/dedup/consent gates
- Implement TwitterArchiveConnector for official Twitter/X data exports
- Add run_connector.py CLI entry point
- Add comprehensive test suite (13 tests, all passing)
- Add connectors/README.md with usage docs
- Add Makefile targets: test-connectors, run-connector, connectors-help
- Reference parent EPIC #194 and issue #233

This is the foundational connector pack. Future work: Discord, Slack, WhatsApp, Notion, iMessage, Google.
2026-04-26 20:45:07 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
13 changed files with 1402 additions and 289 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/

View File

@@ -2,3 +2,12 @@
test:
python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v
# Connector targets
.PHONY: test-connectors
test-connectors:
python3 -m pytest tests/test_connectors.py -v
.PHONY: connectors-help
connectors-help:
python3 scripts/run_connector.py --help

View File

@@ -27,6 +27,11 @@ Before a session starts, queries knowledge store for relevant facts. Assembles c
### Pipeline 3: Measure
Tracks whether compounding is happening. Knowledge velocity, error reduction, hit rate, task completion. Daily report proves the loop works.
### Connector Pack (EPIC #233)
Sovereign personal archive connectors: Twitter/X, Discord, Slack, WhatsApp, Notion, iMessage, Google.
Connectors mirror local exports or explicit API tokens → normalize → redact → index → sync with provenance.
See [`connectors/`](connectors/README.md) for the full connector suite and usage.
## Directory Structure
```
@@ -40,6 +45,12 @@ Tracks whether compounding is happening. Knowledge velocity, error reduction, hi
│ ├── bootstrapper.py # Pre-session context loader
│ ├── measurer.py # Compounding metrics
│ └── session_reader.py # JSONL parser
├── connectors/ # Personal archive connectors (EPIC #233)
│ ├── __init__.py
│ ├── base.py
│ ├── schema.py
│ ├── twitter_archive.py
│ └── README.md
├── metrics/
│ └── dashboard.md # Human-readable status
└── templates/
@@ -65,4 +76,4 @@ See [all issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/compound
- EPIC 1: Session Harvester (#2)
- EPIC 2: Knowledge Store & Bootstrap (#3)
- EPIC 3: Compounding Measurement (#4)
- EPIC 4: Retroactive Harvest (#5)
- EPIC 4: Retroactive Harvest (#5)

66
connectors/README.md Normal file
View File

@@ -0,0 +1,66 @@
# Sovereign Personal Archive Connector Pack
This directory contains the connector infrastructure for ingesting personal archives
(Discord, Slack, WhatsApp, Notion, iMessage, X/Twitter, Google) into the compounding-intelligence
knowledge pipeline.
## Quick Start
```bash
# Run the Twitter archive connector
python3 scripts/run_connector.py twitter \
--source ~/Documents/TwitterArchive \
--output events.jsonl \
--limit 100
```
## Connector Output Format
Each connector emits `SourceEvent` objects (one JSON line per event):
```json
{
"source": "twitter",
"account": "user_archive",
"thread_or_channel": "tweet_123456",
"author": "user_archive",
"timestamp": "2026-04-26T08:30:00+00:00",
"content": "Tweet text here",
"attachments": ["https://..."],
"raw_ref": "twitter:archive:tweet.js:123456",
"hash": "sha256...",
"consent_scope": "memory_only",
"metadata": { "tweet_id": "123456", "favorite_count": 10 }
}
```
## Connector Registry
| Name | Source Format | Status |
|----------------|-----------------------------|----------|
| twitter_archive| Official Twitter data export| ✅ Working |
| discord_archive| Discord data package / JSON | ⏳ Planned |
| slack_archive | Slack export / API | ⏳ Planned |
| whatsapp_archive| WhatsApp Desktop export | ⏳ Planned |
| notion_archive | Notion markdown/SQLite | ⏳ Planned |
| imessage_archive| macOS local chat storage | ⏳ Planned |
| google_archive | Google Workspace CLI | ⏳ Planned |
## Design Principles
1. **Local-first**: Connectors operate on user-owned exports or explicit API credentials.
2. **Incremental**: Checkpoint files (~/.cache/connectors/) allow resumable processing.
3. **Consent-gated**: Default `consent_scope=memory_only` — explicit opt-in for broader use.
4. **Provenance-preserving**: `metadata` retains all raw fields; `hash` enables deduplication.
5. **Sovereign**: No ambient scraping. No cloud dependency unless user explicitly configures tokens.
## Writing a New Connector
Subclass `BaseConnector` from `connectors/base.py` and implement:
- `discover_sources(root: Path) -> Iterator[Path|str]` — find source files or IDs
- `parse_source(source) -> Iterator[SourceEvent]` — emit normalized events
Register in `connectors/__init__.py` `_REGISTRY` dict.
See `connectors/twitter_archive.py` for a complete example.

50
connectors/__init__.py Normal file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""
connectors/__init__.py — Sovereign personal archive connector pack.
Provides:
- BaseConnector: abstract base class for all connectors
- SourceEvent: unified event schema
- compute_event_hash, validate_event: utilities
- Registry: connector discovery and loading
Connectors:
- TwitterArchiveConnector: parse official Twitter/X archive exports
(Future: Discord, Slack, WhatsApp, Notion, iMessage, Google)
"""
from .base import BaseConnector
from .schema import (
SourceEvent,
compute_event_hash,
validate_event,
CONSENT_MEMORY_ONLY,
CONSENT_BOOTSTRAP,
CONSENT_TRAINING,
)
from .twitter_archive import TwitterArchiveConnector
# Auto-registry: map of connector name → class
_REGISTRY = {
"twitter_archive": TwitterArchiveConnector,
# Future connectors:
# "discord_archive": DiscordArchiveConnector,
# "slack_archive": SlackArchiveConnector,
# "whatsapp_archive": WhatsAppArchiveConnector,
# "notion_archive": NotionArchiveConnector,
# "imessage_archive": iMessageArchiveConnector,
# "google_archive": GoogleArchiveConnector,
}
def get_connector(name: str) -> type[BaseConnector]:
"""Get connector class by registry name."""
cls = _REGISTRY.get(name)
if cls is None:
raise ValueError(f"Unknown connector '{name}'. Available: {list(_REGISTRY.keys())}")
return cls
def list_connectors() -> list[str]:
"""List all registered connector names."""
return list(_REGISTRY.keys())

100
connectors/base.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
connectors/base.py — Abstract base class for all personal archive connectors.
Defines the contract every connector must implement. Connectors read local
exports or API data and yield SourceEvent objects.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Iterator, Optional, Dict, Any
import json
import logging
logger = logging.getLogger(__name__)
class BaseConnector(ABC):
name: str = None
source_glob: Optional[str] = None
default_consent_scope: str = "memory_only"
def __init__(self, checkpoint_path: Optional[Path] = None,
consent_scope: Optional[str] = None):
self.consent_scope = consent_scope or self.default_consent_scope
if checkpoint_path is None:
cache_dir = Path.home() / ".cache" / "connectors"
cache_dir.mkdir(parents=True, exist_ok=True)
checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl"
self.checkpoint_path = checkpoint_path
self._processed_hashes = self._load_checkpoint()
def _load_checkpoint(self) -> set:
if not self.checkpoint_path.exists():
return set()
seen = set()
with open(self.checkpoint_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
seen.add(entry.get('hash', ''))
except Exception:
continue
logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name)
return seen
def _save_checkpoint(self, event) -> None:
with open(self.checkpoint_path, 'a') as f:
f.write(json.dumps({'hash': event.hash, 'source': event.source,
'raw_ref': event.raw_ref}) + '\n')
def mark_processed(self, event_hash: str) -> None:
self._processed_hashes.add(event_hash)
def is_processed(self, event_hash: str) -> bool:
return event_hash in self._processed_hashes
def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator:
count = 0
skipped_dedup = 0
skipped_invalid = 0
for source in self.discover_sources(source_root):
for event in self.parse_source(source):
if not event.hash:
from .schema import compute_event_hash
event.hash = compute_event_hash(
event.source, event.raw_ref, event.content,
event.timestamp, event.author
)
if event.hash in self._processed_hashes:
skipped_dedup += 1
continue
from .schema import validate_event
if not validate_event(event):
skipped_invalid += 1
logger.warning("Invalid event: raw_ref=%s", event.raw_ref)
continue
event.consent_scope = self.consent_scope
self._save_checkpoint(event)
self._processed_hashes.add(event.hash)
yield event
count += 1
if limit and count >= limit:
return
logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d",
self.name, count, skipped_dedup, skipped_invalid)
@abstractmethod
def discover_sources(self, root: Path) -> Iterator:
pass
@abstractmethod
def parse_source(self, source) -> Iterator:
pass

100
connectors/schema.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
connectors/schema.py — Unified source-event schema for personal archive connectors.
All connectors must produce events conforming to this schema so downstream
pipelines (harvester → knowledge store) can process them uniformly.
"""
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Any, Dict
import hashlib
import json
@dataclass
class SourceEvent:
"""
Canonical event schema for any ingested personal archive entry.
Fields
------
source : str
Platform identifier: 'twitter', 'discord', 'slack', 'whatsapp',
'notion', 'imessage', 'google', etc.
account : str
User account/channel identifier on the source platform.
thread_or_channel : str
Conversation thread, channel, or chat identifier.
author : str
Who created this content (may differ from account in group chats).
timestamp : str
ISO-8601 timestamp when the event occurred (not when it was ingested).
content : str
Primary text content. May be empty for non-text events (images only).
attachments : list[str]
List of local file paths or URLs for attached media.
raw_ref : str
Pointer to the raw source record (file path, message ID, URL, etc.).
hash : str
SHA-256 hash of the raw content for deduplication and provenance.
consent_scope : str
Privacy gate: where this content may be used.
Examples: 'memory_only', 'bootstrap_context', 'training_data'.
Default: 'memory_only' for ingested personal archives.
metadata : dict[str, Any]
Platform-specific fields retained for provenance but not indexed.
"""
source: str
account: str
thread_or_channel: str
author: str
timestamp: str
content: str
attachments: list[str]
raw_ref: str
hash: str
consent_scope: str = "memory_only"
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> dict:
"""Convert to plain dict for JSON serialization."""
d = asdict(self)
if d['metadata'] is None:
d['metadata'] = {}
return d
def to_json(self) -> str:
"""Serialize to JSON line (one event per line)."""
return json.dumps(self.to_dict(), ensure_ascii=False)
def compute_event_hash(source: str, raw_ref: str, content: str,
timestamp: str, author: str) -> str:
"""
Compute deterministic SHA-256 hash for an event.
Hash inputs: source + raw_ref + content + timestamp + author.
This ensures identical content always produces the same hash,
enabling cross-connector deduplication.
"""
canonical = f"{source}|{raw_ref}|{content}|{timestamp}|{author}"
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
def validate_event(event: SourceEvent) -> bool:
"""
Minimal structural validation for a SourceEvent.
Returns True if required fields are present and well-formed.
"""
required = [event.source, event.account, event.thread_or_channel,
event.author, event.timestamp, event.content, event.raw_ref,
event.hash]
return all(str(x).strip() for x in required)
# Consent scope definitions
CONSENT_MEMORY_ONLY = "memory_only" # For retrieval only, not bootstrap
CONSENT_BOOTSTRAP = "bootstrap_context" # Can seed new sessions
CONSENT_TRAINING = "training_data" # May be used for model training

View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
connectors/twitter_archive.py — Twitter/X personal archive connector.
Parses official Twitter data exports (Twitter's "Download your data" archive).
Expects the tweet.js / tweet.json files from the archive's data/ directory.
Format (Twitter's archived tweets JSON):
Each entry has: {"tweet": {"id_str": "...", "full_text": "...", "created_at": "...", ...}}
Output: normalized SourceEvent with source='twitter'.
"""
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Iterator, Optional
import logging
from .base import BaseConnector
from .schema import SourceEvent, compute_event_hash
logger = logging.getLogger(__name__)
class TwitterArchiveConnector(BaseConnector):
"""Connector for Twitter/X official archive exports."""
name = "twitter_archive"
source_glob = "**/tweet*.json"
default_consent_scope = "memory_only"
# Twitter's date format in archives: "Wed Oct 10 20:19:24 +0000 2018"
TWITTER_DATE_FMT = "%a %b %d %H:%M:%S %z %Y"
def discover_sources(self, root: Path) -> Iterator[Path]:
"""
Find tweet.js / tweet.json files in a Twitter archive.
The official Twitter export places these under:
root/
data/
tweet.js (single-file format, older exports)
or
account-XXXX-YYYY/
tweets.js (per-month splitted format)
"""
root = Path(root)
# Search for .js files that start with 'tweet' — these contain the tweet JSON blobs
candidates = list(root.rglob("tweet*.js")) + list(root.rglob("tweet*.json"))
logger.info("Discovered %d Twitter archive files under %s", len(candidates), root)
for path in candidates:
yield path
def parse_source(self, source: Path) -> Iterator[SourceEvent]:
"""
Parse a Twitter archive file and yield SourceEvents.
Handles both single-file (old) and per-month splitted formats.
Twitter wraps the JSON array in a JS variable assignment: `window.YTD.tweet.part0 = [...]`
"""
try:
with open(source, 'r', encoding='utf-8') as f:
raw = f.read()
# Extract JSON array from the JS wrapper
match = re.search(r'=\s*(\[.+?\])\s*;?\s*$', raw, re.DOTALL)
if match:
json_str = match.group(1)
records = json.loads(json_str)
else:
# Plain JSON array (no wrapper)
records = json.loads(raw)
logger.debug("Parsing %d tweet records from %s", len(records), source)
for record in records:
event = self._record_to_event(record, source)
if event:
yield event
except Exception as e:
logger.error("Failed to parse %s: %s", source, e)
def _record_to_event(self, record: dict, source_path: Path) -> Optional[SourceEvent]:
"""
Convert a single tweet record into a SourceEvent.
The record can be either the wrapped format {"tweet": {...}}} or the bare tweet object.
"""
# Unwrap the tweet object
tweet = record.get('tweet', record)
# Extract core fields
id_str = tweet.get('id_str') or tweet.get('id')
full_text = tweet.get('full_text') or tweet.get('text', '')
created_at = tweet.get('created_at', '')
# Parse timestamp
try:
dt = datetime.strptime(created_at, self.TWITTER_DATE_FMT)
iso_ts = dt.astimezone().isoformat()
except Exception:
iso_ts = created_at # fallback: keep as-is
# Author is always the account owner (Twitter archives don't include others' DMs by default)
account = "user_archive" # normalized account identifier
# Thread/channel: individual tweets have no thread ID; threads aren't preserved in basic export
thread_id = f"tweet_{id_str}"
# Attachments: extract media URLs
attachments = []
extended_entities = tweet.get('extended_entities', {})
for media in extended_entities.get('media', []):
url = media.get('media_url_https') or media.get('media_url')
if url:
attachments.append(url)
# Build raw_ref
raw_ref = f"twitter:archive:{source_path.name}:{id_str}"
# Compute hash
content_for_hash = full_text or ""
hash_val = compute_event_hash(
source="twitter",
raw_ref=raw_ref,
content=content_for_hash,
timestamp=iso_ts,
author=account
)
# Preserve metadata for provenance
metadata = {
"tweet_id": id_str,
"source_file": str(source_path),
"favorite_count": tweet.get('favorite_count'),
"retweet_count": tweet.get('retweet_count'),
"in_reply_to_status_id": tweet.get('in_reply_to_status_id_str'),
"lang": tweet.get('lang'),
}
return SourceEvent(
source="twitter",
account=account,
thread_or_channel=thread_id,
author=account,
timestamp=iso_ts,
content=full_text,
attachments=attachments,
raw_ref=raw_ref,
hash=hash_val,
consent_scope=self.consent_scope,
metadata=metadata
)

View File

@@ -1,288 +0,0 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

116
scripts/run_connector.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
scripts/run_connector.py — Run a personal archive connector and emit SourceEvents.
Usage:
python3 scripts/run_connector.py twitter --source /path/to/twitter/archive --output events.jsonl [--limit 100]
This is the entry point that ties the connectors pack into the existing compounding-intelligence
pipeline. Output is JSONL (one SourceEvent per line), ready for downstream ingestion by
harvester.py or a future connector-targeted harvester.
"""
import argparse
import json
import logging
import sys
from pathlib import Path
# Add parent dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from connectors import get_connector, list_connectors
from connectors.base import BaseConnector
from connectors.schema import SourceEvent
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("run_connector")
def main():
parser = argparse.ArgumentParser(
description="Run a personal archive connector and emit normalized events."
)
parser.add_argument(
"connector",
choices=list_connectors(),
help="Connector name to run"
)
parser.add_argument(
"--source", "-s",
required=True,
help="Path to the source archive root (e.g., ~/Documents/TwitterArchive)"
)
parser.add_argument(
"--output", "-o",
required=True,
help="Output file path (JSONL, one SourceEvent per line)"
)
parser.add_argument(
"--limit", "-n",
type=int,
default=None,
help="Stop after N events (default: unlimited)"
)
parser.add_argument(
"--consent-scope",
choices=["memory_only", "bootstrap_context", "training_data"],
default="memory_only",
help="Consent scope for emitted events (default: memory_only)"
)
parser.add_argument(
"--checkpoint",
type=Path,
default=None,
help="Checkpoint file path (default: ~/.cache/connectors/{name}.checkpoint.jsonl)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and count events but do not write output"
)
args = parser.parse_args()
# Resolve connector
connector_cls = get_connector(args.connector)
connector: BaseConnector = connector_cls(
checkpoint_path=args.checkpoint,
consent_scope=args.consent_scope
)
# Resolve source path
source_path = Path(args.source).expanduser().resolve()
if not source_path.exists():
logger.error("Source path does not exist: %s", source_path)
sys.exit(1)
# Run connector
logger.info("Running connector '%s' on source: %s", args.connector, source_path)
events = connector.run(source_path, limit=args.limit)
if args.dry_run:
count = sum(1 for _ in events)
logger.info("[DRY RUN] Would emit %d events", count)
return 0
# Write output
output_path = Path(args.output).expanduser().resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with open(output_path, 'w', encoding='utf-8') as out:
for event in events:
out.write(event.to_json() + '\n')
count += 1
logger.info("Connector complete. Emitted %d events to %s", count, output_path)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

270
tests/test_connectors.py Normal file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
"""
tests/test_connectors.py — Test suite for the personal archive connector pack.
Tests cover:
- SourceEvent schema validation
- Event hash determinism
- TwitterArchiveConnector parsing of standard Twitter export format
- Deduplication gate
"""
import json
import hashlib
import tempfile
from pathlib import Path
from datetime import datetime
import pytest
# Add scripts dir to path for sibling imports
import sys
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR.parent))
from connectors.schema import (
SourceEvent,
compute_event_hash,
validate_event,
CONSENT_MEMORY_ONLY,
CONSENT_BOOTSTRAP,
)
from connectors.twitter_archive import TwitterArchiveConnector
class TestSourceEventSchema:
"""Tests for SourceEvent dataclass and helpers."""
def test_create_minimal_event(self):
event = SourceEvent(
source="twitter",
account="user123",
thread_or_channel="tweet_456",
author="user123",
timestamp="2026-04-26T12:00:00Z",
content="Hello world",
attachments=[],
raw_ref="twitter:test:456",
hash="",
)
assert event.source == "twitter"
assert event.consent_scope == CONSENT_MEMORY_ONLY # default
def test_compute_event_hash_deterministic(self):
h1 = compute_event_hash(
source="twitter",
raw_ref="ref:123",
content="test content",
timestamp="2026-04-26T12:00:00Z",
author="alice"
)
h2 = compute_event_hash(
source="twitter",
raw_ref="ref:123",
content="test content",
timestamp="2026-04-26T12:00:00Z",
author="alice"
)
assert h1 == h2
assert len(h1) == 64 # SHA-256 hex
def test_compute_event_hash_different_inputs(self):
h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice")
h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice")
assert h1 != h2
def test_validate_event_accepts_valid(self):
event = SourceEvent(
source="discord",
account="user#1234",
thread_or_channel="channel_abc",
author="user#1234",
timestamp="2026-04-26T12:00:00Z",
content="test",
attachments=[],
raw_ref="discord:msg:123",
hash="a" * 64,
)
assert validate_event(event) is True
def test_validate_event_rejects_empty_content(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="thread",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="", # empty
attachments=[],
raw_ref="ref",
hash="a" * 64,
)
assert validate_event(event) is False
def test_validate_event_rejects_missing_hash(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="thread",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="test",
attachments=[],
raw_ref="ref",
hash=" ", # whitespace only
)
assert validate_event(event) is False
def test_event_to_json_roundtrip(self):
event = SourceEvent(
source="twitter",
account="user",
thread_or_channel="t1",
author="user",
timestamp="2026-04-26T12:00:00Z",
content="hello",
attachments=["https://example.com/img.jpg"],
raw_ref="twitter:123",
hash="b" * 64,
metadata={"retweet_count": 5}
)
json_str = event.to_json()
parsed = json.loads(json_str)
assert parsed["source"] == "twitter"
assert parsed["metadata"]["retweet_count"] == 5
class TestTwitterArchiveConnector:
"""Tests for the Twitter/X archive connector."""
def test_connector_name(self):
assert TwitterArchiveConnector.name == "twitter_archive"
def test_discover_sources_finds_tweet_js(self, tmp_path: Path):
# Arrange: create a fake Twitter archive structure
archive = tmp_path / "twitter_archive"
archive.mkdir()
data_dir = archive / "data"
data_dir.mkdir()
(data_dir / "tweet.js").write_text("[]")
(data_dir / "tweets_2024_01.js").write_text("[]")
connector = TwitterArchiveConnector()
sources = list(connector.discover_sources(archive))
assert len(sources) == 2
assert any("tweet.js" in str(p) for p in sources)
def test_parse_single_tweet_wrapped_format(self, tmp_path: Path):
"""
Twitter's official export wraps the JSON array in a JS assignment:
window.YTD.tweet.part0 = [ {...tweet...}, ... ];
"""
# Create a minimal tweet record
tweet = {
"id_str": "1234567890",
"full_text": "Hello from Twitter archive!",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
"favorite_count": 10,
"retweet_count": 2,
"lang": "en"
}
wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n"
js_file = tmp_path / "tweet.js"
js_file.write_text(wrapped)
connector = TwitterArchiveConnector()
events = list(connector.parse_source(js_file))
assert len(events) == 1
ev = events[0]
assert ev.source == "twitter"
assert ev.content == "Hello from Twitter archive!"
assert ev.author == "user_archive"
assert ev.consent_scope == "memory_only"
# Hash must be computed
assert len(ev.hash) == 64
# Metadata preservation
assert ev.metadata["tweet_id"] == "1234567890"
assert ev.metadata["favorite_count"] == 10
def test_parse_tweet_array_without_wrapper(self, tmp_path: Path):
"""Some Twitter exports are plain JSON arrays (no JS wrapper)."""
tweet = {
"id_str": "999",
"full_text": "Plain JSON tweet",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
}
json_file = tmp_path / "tweets.json"
json_file.write_text(json.dumps([{"tweet": tweet}]))
connector = TwitterArchiveConnector()
events = list(connector.parse_source(json_file))
assert len(events) == 1
assert events[0].content == "Plain JSON tweet"
def test_parse_with_media_attachments(self, tmp_path: Path):
tweet = {
"id_str": "111",
"full_text": "Check this photo",
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
"extended_entities": {
"media": [
{"media_url_https": "https://pbs.twimg.com/media/example1.jpg"},
{"media_url_https": "https://pbs.twimg.com/media/example2.jpg"},
]
}
}
js_file = tmp_path / "tweet.js"
js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n")
connector = TwitterArchiveConnector()
events = list(connector.parse_source(js_file))
assert len(events) == 1
atts = events[0].attachments
assert len(atts) == 2
assert "example1.jpg" in atts[0]
def test_integration_run_connector(self, tmp_path: Path):
"""End-to-end: create a mini archive, run connector, write JSONL output."""
# Arrange: create archive with two tweets
archive_root = tmp_path / "my_twitter_archive" / "data"
archive_root.mkdir(parents=True)
tweet1 = {
"id_str": "1",
"full_text": "First tweet",
"created_at": "Mon Apr 26 08:00:00 +0000 2026",
}
tweet2 = {
"id_str": "2",
"full_text": "Second tweet",
"created_at": "Mon Apr 26 09:00:00 +0000 2026",
}
(archive_root / "tweet.js").write_text(
"window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n"
)
connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl")
output_path = tmp_path / "events.jsonl"
# Act
count = 0
with open(output_path, 'w') as out:
for event in connector.run(archive_root):
out.write(event.to_json() + '\n')
count += 1
# Assert
assert count == 2
lines = output_path.read_text().strip().split('\n')
assert len(lines) == 2
ev1 = json.loads(lines[0])
assert ev1["content"] == "First tweet"
ev2 = json.loads(lines[1])
assert ev2["content"] == "Second tweet"
# Check duplicates are filtered on re-run
count2 = sum(1 for _ in connector.run(archive_root))
assert count2 == 0 # all deduped via checkpoint