- creates tools/sherlock_wrapper.py with run_sherlock() library + CLI - opt-in gate: SHERLOCK_ENABLED=1 or --opt-in required - local SQLite cache at ~/.cache/timmy/sherlock_cache.db (TTL: 7 days) - normalized JSON output schema: found/missing/errors/metadata - minimal smoke test suite: 13 tests covering schema, cache, TTL, opt-in - adds README section with usage, schema, setup, and smoke-test instructions Closes #874
250 lines
7.3 KiB
Python
250 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
|
|
|
|
This is an implementation spike (issue #874) to validate local integration
|
|
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
|
|
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
|
|
|
|
# Cache location
|
|
CACHE_DIR = Path.home() / ".cache" / "timmy"
|
|
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
|
|
|
|
# Normalized output schema version
|
|
SCHEMA_VERSION = "1.0"
|
|
|
|
|
|
def require_opt_in(opt_in: bool = False) -> None:
|
|
"""Enforce opt-in gate for Sherlock external dependency."""
|
|
if not (SHERLOCK_ENABLED or opt_in):
|
|
raise RuntimeError(
|
|
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
|
|
)
|
|
|
|
|
|
|
|
def check_sherlock_available() -> bool:
|
|
"""Check if sherlock Python package is installed."""
|
|
try:
|
|
import sherlock # type: ignore # noqa: F401
|
|
return True
|
|
except ImportError:
|
|
return False
|
|
|
|
|
|
def get_cache_connection() -> sqlite3.Connection:
|
|
"""Initialize cache directory and return DB connection."""
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(CACHE_DB))
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS cache (
|
|
query_hash TEXT PRIMARY KEY,
|
|
result_json TEXT NOT NULL,
|
|
timestamp DATETIME NOT NULL
|
|
)
|
|
""")
|
|
return conn
|
|
|
|
|
|
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
|
|
"""Deterministic hash for cache key."""
|
|
components = [username.lower().strip()]
|
|
if sites:
|
|
components.extend(sorted(sites))
|
|
raw = "|".join(components)
|
|
return hashlib.sha256(raw.encode()).hexdigest()
|
|
|
|
|
|
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
|
|
conn = get_cache_connection()
|
|
cur = conn.execute(
|
|
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
|
|
(query_hash,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
result_json, ts_str = row
|
|
# TTL: 7 days (604800 seconds)
|
|
ts = datetime.fromisoformat(ts_str)
|
|
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
|
|
if age_seconds >= 604800:
|
|
return None
|
|
return json.loads(result_json)
|
|
|
|
|
|
|
|
|
|
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
|
|
"""Persist result to cache."""
|
|
conn = get_cache_connection()
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
|
|
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def normalize_sherlock_output(
|
|
raw_result: Dict[str, Any],
|
|
username: str,
|
|
sites_checked: Optional[List[str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Convert raw sherlock output into a stable, normalized schema.
|
|
|
|
Expected sherlock result shape (via Python API):
|
|
{
|
|
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
|
|
...
|
|
}
|
|
"""
|
|
found: List[Dict[str, str]] = []
|
|
missing: List[str] = []
|
|
errors: List[Dict[str, str]] = []
|
|
|
|
for site_name, site_data in raw_result.items():
|
|
status = site_data.get("status", "")
|
|
url = site_data.get("url", "")
|
|
if status == "found" and url:
|
|
found.append({"site": site_name, "url": url})
|
|
elif status == "not found":
|
|
missing.append(site_name)
|
|
else:
|
|
errors.append({"site": site_name, "error": status or "unknown"})
|
|
|
|
# Compute totals from the original site list if provided
|
|
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
|
|
|
|
return {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"query": username,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"found": found,
|
|
"missing": missing,
|
|
"errors": errors,
|
|
"metadata": {
|
|
"total_sites_checked": total_sites,
|
|
"found_count": len(found),
|
|
"missing_count": len(missing),
|
|
"error_count": len(errors),
|
|
},
|
|
}
|
|
|
|
|
|
def run_sherlock(
|
|
username: str,
|
|
sites: Optional[List[str]] = None,
|
|
timeout: Optional[int] = None,
|
|
opt_in: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
|
|
"""
|
|
require_opt_in(opt_in)
|
|
|
|
# Compute cache key
|
|
query_hash = compute_query_hash(username, sites)
|
|
|
|
# Check cache first — avoids dependency requirement on cache hit
|
|
cached = get_cached_result(query_hash)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Only require sherlock on cache miss
|
|
if not check_sherlock_available():
|
|
raise RuntimeError(
|
|
"Sherlock Python package not installed. "
|
|
"Install with: pip install sherlock-project"
|
|
)
|
|
|
|
# Call sherlock
|
|
try:
|
|
import sherlock
|
|
from sherlock import sherlock as sherlock_main # type: ignore
|
|
|
|
if sites:
|
|
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
|
|
else:
|
|
result = sherlock_main(username, timeout=timeout or 10)
|
|
|
|
normalized = normalize_sherlock_output(result, username, sites)
|
|
save_to_cache(query_hash, normalized)
|
|
return normalized
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Sherlock execution failed: {e}") from e
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
|
|
)
|
|
parser.add_argument(
|
|
"--query", "-q", required=True,
|
|
help="Username to search across sites"
|
|
)
|
|
parser.add_argument(
|
|
"--opt-in", action="store_true",
|
|
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
|
|
)
|
|
parser.add_argument(
|
|
"--sites", "-s", nargs="+",
|
|
help="Specific sites to check (default: all supported)"
|
|
)
|
|
parser.add_argument(
|
|
"--timeout", "-t", type=int, default=10,
|
|
help="Request timeout per site (default: 10)"
|
|
)
|
|
parser.add_argument(
|
|
"--json", action="store_true",
|
|
help="Output normalized JSON to stdout"
|
|
)
|
|
parser.add_argument(
|
|
"--no-cache",
|
|
action="store_true",
|
|
help="Bypass cached result (if any)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
result = run_sherlock(
|
|
username=args.query,
|
|
sites=args.sites,
|
|
timeout=args.timeout,
|
|
opt_in=args.opt_in
|
|
)
|
|
if args.json:
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print(f"Query: {result['query']}")
|
|
print(f"Found: {result['metadata']['found_count']} site(s)")
|
|
print(f"Missing: {result['metadata']['missing_count']} site(s)")
|
|
print(f"Errors: {result['metadata']['error_count']} site(s)")
|
|
for f in result['found']:
|
|
print(f" [{f['site']}] {f['url']}")
|
|
return 0
|
|
except RuntimeError as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|