Compare commits
1 Commits
fix/524
...
step35/874
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41ac45e49b |
70
README.md
70
README.md
@@ -112,6 +112,76 @@ pytest tests/
|
||||
```
|
||||
|
||||
### Project Structure
|
||||
## Sherlock Username Recon Wrapper
|
||||
|
||||
### Quick Usage
|
||||
|
||||
```bash
|
||||
# Opt-in via env var
|
||||
export SHERLOCK_ENABLED=1
|
||||
|
||||
# Or via explicit CLI flag
|
||||
python -m tools.sherlock_wrapper --query "alice" --opt-in --json
|
||||
|
||||
# With site whitelist
|
||||
python -m tools.sherlock_wrapper --query "alice" --opt-in --sites github twitter --json
|
||||
```
|
||||
|
||||
### What It Does
|
||||
|
||||
Builds a bounded local wrapper around the Sherlock username OSINT tool that:
|
||||
|
||||
- **Opt-in gate** — SHERLOCK_ENABLED=1 or `--opt-in` required before any external call
|
||||
- **Local-first caching** — results cached in `~/.cache/timmy/sherlock_cache.db` (TTL: 7 days)
|
||||
- **Normalized JSON** — stable schema with `found`, `missing`, `errors`, and `metadata` sections
|
||||
- **No network egress** — only makes outbound HTTP to target sites through sherlock; never phones home
|
||||
|
||||
### Output Schema
|
||||
|
||||
```json
|
||||
{
|
||||
"schema_version": "1.0",
|
||||
"query": "alice",
|
||||
"timestamp": "2025-04-26T14:23:00+00:00",
|
||||
"found": [
|
||||
{"site": "github", "url": "https://github.com/alice"}
|
||||
],
|
||||
"missing": ["twitter", "facebook"],
|
||||
"errors": [{"site": "instagram", "error": "timeout"}],
|
||||
"metadata": {
|
||||
"total_sites_checked": 50,
|
||||
"found_count": 1,
|
||||
"missing_count": 48,
|
||||
"error_count": 1
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Setup
|
||||
|
||||
Sherlock must be installed separately:
|
||||
|
||||
```bash
|
||||
pip install sherlock-project
|
||||
```
|
||||
|
||||
The wrapper is pure Python and requires only stdlib apart from sherlock itself.
|
||||
|
||||
### Why an Opt-In Gate?
|
||||
|
||||
Sherlock makes outbound HTTP requests to dozens of third-party sites. The opt-in gate:
|
||||
1. Ensures a human operator explicitly approves this dependency
|
||||
2. Makes the outbound traffic auditable in session logs
|
||||
3. Prevents accidental invocation in automated pipelines
|
||||
|
||||
### Running the Smoke Test
|
||||
|
||||
```bash
|
||||
# Run unit + integration tests
|
||||
pytest tests/test_sherlock_wrapper.py -v
|
||||
```
|
||||
|
||||
|
||||
|
||||
```
|
||||
.
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration
|
||||
|
||||
Grounding report for `timmy-home #524`.
|
||||
|
||||
Issue #524 is a multi-lane directive, not a one-commit feature. This report grounds the directive in repo evidence, highlights stale cross-links, and names the missing operator bundles that still need real execution.
|
||||
|
||||
This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.
|
||||
|
||||
## Directive Snapshot
|
||||
|
||||
- Repo-grounded workstreams: 0
|
||||
- Partial workstreams: 4
|
||||
- Missing workstreams: 1
|
||||
- Drifted references: 4
|
||||
|
||||
## Reference Drift
|
||||
|
||||
- #813 is cited for Nostr Migration Leadership, but its current title is 'docs: refresh the-playground genome analysis (#671)'.
|
||||
- #819 is cited for Nostr Migration Leadership, but its current title is 'docs: verify #648 already implemented (closes #818)'.
|
||||
- #139 is cited for v0.7.0 Feature Audit, but its current title is '🐣 Allegro-Primus is born'.
|
||||
- #103 is cited for Morrowind Local-First Benchmark, but its current title is 'Build comprehensive caching layer — cache everywhere'.
|
||||
|
||||
## Workstream Matrix
|
||||
|
||||
### 1. Nostr Migration Leadership — PARTIAL
|
||||
|
||||
- Requirement: Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.
|
||||
- Referenced issues:
|
||||
- #813 (closed) — docs: refresh the-playground genome analysis (#671) [DRIFT]
|
||||
- #819 (open) — docs: verify #648 already implemented (closes #818) [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `infrastructure/timmy-bridge/client/timmy_client.py` — Nostr event client scaffold already exists
|
||||
- `infrastructure/timmy-bridge/monitor/timmy_monitor.py` — Nostr relay monitor already exists
|
||||
- `specs/wizard-telegram-bot-cutover.md` — Telegram cutover planning exists, so the migration lane is real
|
||||
- Missing operator deliverables:
|
||||
- wizard keypair inventory and ownership matrix
|
||||
- NIP-29 relay group verification report
|
||||
- operator runbook for cutting traffic off Telegram
|
||||
- Why this lane remains open: The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.
|
||||
|
||||
### 2. Lexicon Enforcement — PARTIAL
|
||||
|
||||
- Requirement: Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.
|
||||
- Referenced issues:
|
||||
- #388 (closed) — [KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents [aligned]
|
||||
- Repo evidence present:
|
||||
- `docs/WIZARD_APPRENTICESHIP_CHARTER.md` — The repo already uses wizard-language canon in docs
|
||||
- `specs/timmy-ezra-bezalel-canon-sheet.md` — Canonical agent naming already exists
|
||||
- `docs/OPERATIONS_DASHBOARD.md` — Operational roles are already described in repo language
|
||||
- Missing operator deliverables:
|
||||
- machine-checkable lexicon policy for review/triage
|
||||
- terminology lint or reviewer checklist tied to the lexicon
|
||||
- Why this lane remains open: The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.
|
||||
|
||||
### 3. v0.7.0 Feature Audit — PARTIAL
|
||||
|
||||
- Requirement: Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.
|
||||
- Referenced issues:
|
||||
- #139 (open) — 🐣 Allegro-Primus is born [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `scripts/sovereignty_audit.py` — Cloud-vs-local audit machinery already exists
|
||||
- `reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md` — Recent sovereignty audit report is committed
|
||||
- `timmy-local/README.md` — Local-first status is already documented for operators
|
||||
- Missing operator deliverables:
|
||||
- Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
|
||||
- Sovereignty Implementation Plan derived from that feature audit
|
||||
- Why this lane remains open: The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.
|
||||
|
||||
### 4. Morrowind Local-First Benchmark — PARTIAL
|
||||
|
||||
- Requirement: Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.
|
||||
- Referenced issues:
|
||||
- #103 (open) — Build comprehensive caching layer — cache everywhere [DRIFT]
|
||||
- Repo evidence present:
|
||||
- `morrowind/local_brain.py` — Local Morrowind control loop already exists
|
||||
- `morrowind/mcp_server.py` — Morrowind MCP control surface is already wired
|
||||
- `morrowind/pilot.py` — Trajectory logging for evaluation already exists
|
||||
- Missing operator deliverables:
|
||||
- cloud-vs-local benchmark report for the combat loop
|
||||
- reasoning-gap writeup tied to a proposed LoRA/fine-tune path
|
||||
- Why this lane remains open: The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.
|
||||
|
||||
### 5. Infrastructure Hardening / Syntax Guard — MISSING
|
||||
|
||||
- Requirement: Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.
|
||||
- Referenced issues: none listed in the directive body
|
||||
- Repo evidence present: none
|
||||
- Missing operator deliverables:
|
||||
- repo inventory of Gitea targets that should carry Syntax Guard
|
||||
- deployment verifier for hook presence across those repos
|
||||
- operator report proving installation state instead of assuming it
|
||||
- Why this lane remains open: No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.
|
||||
|
||||
## Highest-Leverage Next Actions
|
||||
|
||||
- Nostr Migration Leadership: wizard keypair inventory and ownership matrix
|
||||
- Lexicon Enforcement: machine-checkable lexicon policy for review/triage
|
||||
- v0.7.0 Feature Audit: Hermes v0.7.0 feature inventory linked to cloud-reduction leverage
|
||||
- Morrowind Local-First Benchmark: cloud-vs-local benchmark report for the combat loop
|
||||
- Infrastructure Hardening / Syntax Guard: repo inventory of Gitea targets that should carry Syntax Guard
|
||||
|
||||
## Why #524 Remains Open
|
||||
|
||||
- The directive bundles five separate workstreams with different evidence surfaces.
|
||||
- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.
|
||||
- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.
|
||||
- Syntax Guard verification is still undocumented and unproven inside this repo.
|
||||
@@ -1,418 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Ground timmy-home #524 as an executable status report.
|
||||
|
||||
Refs: timmy-home #524
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import request
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_REPO = "timmy-home"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
DEFAULT_DOC_PATH = DEFAULT_REPO_ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
|
||||
|
||||
DIRECTIVE_TITLE = "[DIRECTIVE] Unified Fleet Sovereignty & Comms Migration"
|
||||
DIRECTIVE_SUMMARY = (
|
||||
"Issue #524 is a multi-lane directive, not a one-commit feature. "
|
||||
"This report grounds the directive in repo evidence, highlights stale cross-links, "
|
||||
"and names the missing operator bundles that still need real execution."
|
||||
)
|
||||
|
||||
DEFAULT_REFERENCE_SNAPSHOT = {
|
||||
388: {
|
||||
"title": "[KT] Fleet Lexicon & Techniques — Shared Vocabulary, Patterns, and Standards for All Agents",
|
||||
"state": "closed",
|
||||
},
|
||||
103: {
|
||||
"title": "Build comprehensive caching layer — cache everywhere",
|
||||
"state": "open",
|
||||
},
|
||||
139: {
|
||||
"title": "🐣 Allegro-Primus is born",
|
||||
"state": "open",
|
||||
},
|
||||
813: {
|
||||
"title": "docs: refresh the-playground genome analysis (#671)",
|
||||
"state": "closed",
|
||||
},
|
||||
819: {
|
||||
"title": "docs: verify #648 already implemented (closes #818)",
|
||||
"state": "open",
|
||||
},
|
||||
}
|
||||
|
||||
WORKSTREAMS = [
|
||||
{
|
||||
"key": "nostr-migration",
|
||||
"name": "Nostr Migration Leadership",
|
||||
"requirement": "Replace Telegram with relay-based sovereign comms, verify wizard keypairs, and prove the NIP-29 group path is stable.",
|
||||
"references": [813, 819],
|
||||
"expected_keywords": ["nostr", "relay", "telegram", "comms", "messenger"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "infrastructure/timmy-bridge/client/timmy_client.py",
|
||||
"description": "Nostr event client scaffold already exists",
|
||||
},
|
||||
{
|
||||
"path": "infrastructure/timmy-bridge/monitor/timmy_monitor.py",
|
||||
"description": "Nostr relay monitor already exists",
|
||||
},
|
||||
{
|
||||
"path": "specs/wizard-telegram-bot-cutover.md",
|
||||
"description": "Telegram cutover planning exists, so the migration lane is real",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"wizard keypair inventory and ownership matrix",
|
||||
"NIP-29 relay group verification report",
|
||||
"operator runbook for cutting traffic off Telegram",
|
||||
],
|
||||
"why_open": "The repo has Nostr-adjacent scaffolding, but the directive still lacks a verified migration packet and the cited issue links drift away from the stated Nostr scope.",
|
||||
},
|
||||
{
|
||||
"key": "lexicon-enforcement",
|
||||
"name": "Lexicon Enforcement",
|
||||
"requirement": "Enforce the Fleet Lexicon in PR review and issue triage so the team uses one shared language.",
|
||||
"references": [388],
|
||||
"expected_keywords": ["lexicon", "vocabulary", "standards", "shared vocabulary"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "docs/WIZARD_APPRENTICESHIP_CHARTER.md",
|
||||
"description": "The repo already uses wizard-language canon in docs",
|
||||
},
|
||||
{
|
||||
"path": "specs/timmy-ezra-bezalel-canon-sheet.md",
|
||||
"description": "Canonical agent naming already exists",
|
||||
},
|
||||
{
|
||||
"path": "docs/OPERATIONS_DASHBOARD.md",
|
||||
"description": "Operational roles are already described in repo language",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"machine-checkable lexicon policy for review/triage",
|
||||
"terminology lint or reviewer checklist tied to the lexicon",
|
||||
],
|
||||
"why_open": "The naming canon exists, but there is still no executable enforcement bundle that would catch drift during future reviews and triage passes.",
|
||||
},
|
||||
{
|
||||
"key": "feature-audit",
|
||||
"name": "v0.7.0 Feature Audit",
|
||||
"requirement": "Audit Hermes features that can reduce cloud dependency and turn the findings into a sovereignty implementation plan.",
|
||||
"references": [139],
|
||||
"expected_keywords": ["hermes", "feature", "audit", "v0.7.0", "sovereignty"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "scripts/sovereignty_audit.py",
|
||||
"description": "Cloud-vs-local audit machinery already exists",
|
||||
},
|
||||
{
|
||||
"path": "reports/evaluations/2026-04-15-phase-4-sovereignty-audit.md",
|
||||
"description": "Recent sovereignty audit report is committed",
|
||||
},
|
||||
{
|
||||
"path": "timmy-local/README.md",
|
||||
"description": "Local-first status is already documented for operators",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"Hermes v0.7.0 feature inventory linked to cloud-reduction leverage",
|
||||
"Sovereignty Implementation Plan derived from that feature audit",
|
||||
],
|
||||
"why_open": "The repo has sovereignty-audit infrastructure, but it does not yet contain the requested v0.7.0 feature inventory or the plan that turns those findings into rollout steps.",
|
||||
},
|
||||
{
|
||||
"key": "morrowind-benchmark",
|
||||
"name": "Morrowind Local-First Benchmark",
|
||||
"requirement": "Compare cloud and local Morrowind agents, prove local parity where possible, and document the reasoning gap when it fails.",
|
||||
"references": [103],
|
||||
"expected_keywords": ["morrowind", "combat", "benchmark", "local", "cloud"],
|
||||
"repo_evidence": [
|
||||
{
|
||||
"path": "morrowind/local_brain.py",
|
||||
"description": "Local Morrowind control loop already exists",
|
||||
},
|
||||
{
|
||||
"path": "morrowind/mcp_server.py",
|
||||
"description": "Morrowind MCP control surface is already wired",
|
||||
},
|
||||
{
|
||||
"path": "morrowind/pilot.py",
|
||||
"description": "Trajectory logging for evaluation already exists",
|
||||
},
|
||||
],
|
||||
"missing_deliverables": [
|
||||
"cloud-vs-local benchmark report for the combat loop",
|
||||
"reasoning-gap writeup tied to a proposed LoRA/fine-tune path",
|
||||
],
|
||||
"why_open": "The repo has a local Morrowind stack, but it does not yet contain the requested benchmark artifact; the cited issue number also points at an unrelated caching task.",
|
||||
},
|
||||
{
|
||||
"key": "syntax-guard",
|
||||
"name": "Infrastructure Hardening / Syntax Guard",
|
||||
"requirement": "Verify Syntax Guard pre-receive protection across Gitea repos so syntax failures stop earlier.",
|
||||
"references": [],
|
||||
"expected_keywords": [],
|
||||
"repo_evidence": [],
|
||||
"missing_deliverables": [
|
||||
"repo inventory of Gitea targets that should carry Syntax Guard",
|
||||
"deployment verifier for hook presence across those repos",
|
||||
"operator report proving installation state instead of assuming it",
|
||||
],
|
||||
"why_open": "No repo-managed syntax-guard verifier is present yet, so this directive still depends on manual trust rather than auditable proof.",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def default_snapshot() -> dict[int, dict[str, str]]:
|
||||
return deepcopy(DEFAULT_REFERENCE_SNAPSHOT)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def get_issue(self, issue_number: int) -> dict[str, Any]:
|
||||
req = request.Request(
|
||||
f"{self.base_url}/repos/{self.owner}/{self.repo}/issues/{issue_number}",
|
||||
headers={"Authorization": f"token {self.token}", "Accept": "application/json"},
|
||||
)
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def load_snapshot(path: Path | None = None) -> dict[int, dict[str, str]]:
|
||||
if path is None:
|
||||
return default_snapshot()
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
return {int(k): v for k, v in data.items()}
|
||||
|
||||
|
||||
def refresh_snapshot(token_file: Path = DEFAULT_TOKEN_FILE) -> dict[int, dict[str, str]]:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
client = GiteaClient(token=token)
|
||||
snapshot: dict[int, dict[str, str]] = {}
|
||||
for issue_number in sorted(DEFAULT_REFERENCE_SNAPSHOT):
|
||||
issue = client.get_issue(issue_number)
|
||||
snapshot[issue_number] = {
|
||||
"title": issue["title"],
|
||||
"state": issue["state"],
|
||||
}
|
||||
return snapshot
|
||||
|
||||
|
||||
def collect_repo_evidence(entries: list[dict[str, str]], repo_root: Path) -> tuple[list[str], list[str]]:
|
||||
present: list[str] = []
|
||||
missing: list[str] = []
|
||||
for entry in entries:
|
||||
label = f"`{entry['path']}` — {entry['description']}"
|
||||
if (repo_root / entry["path"]).exists():
|
||||
present.append(label)
|
||||
else:
|
||||
missing.append(label)
|
||||
return present, missing
|
||||
|
||||
|
||||
|
||||
def evaluate_reference(issue_number: int, snapshot: dict[int, dict[str, str]], expected_keywords: list[str]) -> dict[str, Any]:
|
||||
record = snapshot.get(issue_number, {"title": "missing from snapshot", "state": "unknown"})
|
||||
title = record["title"]
|
||||
title_lower = title.lower()
|
||||
matched_keywords = [kw for kw in expected_keywords if kw.lower() in title_lower]
|
||||
aligned = bool(matched_keywords) if expected_keywords else True
|
||||
return {
|
||||
"number": issue_number,
|
||||
"title": title,
|
||||
"state": record["state"],
|
||||
"aligned": aligned,
|
||||
"matched_keywords": matched_keywords,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def classify_workstream(reference_results: list[dict[str, Any]], evidence_present: list[str], missing_deliverables: list[str]) -> str:
|
||||
has_drift = any(not item["aligned"] for item in reference_results)
|
||||
if not evidence_present:
|
||||
return "MISSING"
|
||||
if has_drift or missing_deliverables:
|
||||
return "PARTIAL"
|
||||
return "GROUNDED"
|
||||
|
||||
|
||||
|
||||
def evaluate_directive(snapshot: dict[int, dict[str, str]] | None = None, repo_root: Path | None = None) -> dict[str, Any]:
|
||||
snapshot = snapshot or default_snapshot()
|
||||
repo_root = repo_root or DEFAULT_REPO_ROOT
|
||||
workstreams: list[dict[str, Any]] = []
|
||||
drift_items: list[str] = []
|
||||
|
||||
for lane in WORKSTREAMS:
|
||||
reference_results = [
|
||||
evaluate_reference(issue_number, snapshot, lane["expected_keywords"])
|
||||
for issue_number in lane["references"]
|
||||
]
|
||||
present, missing = collect_repo_evidence(lane["repo_evidence"], repo_root)
|
||||
for item in reference_results:
|
||||
if not item["aligned"]:
|
||||
drift_items.append(
|
||||
f"#{item['number']} is cited for {lane['name']}, but its current title is '{item['title']}'."
|
||||
)
|
||||
workstream = {
|
||||
"key": lane["key"],
|
||||
"name": lane["name"],
|
||||
"requirement": lane["requirement"],
|
||||
"reference_results": reference_results,
|
||||
"repo_evidence_present": present,
|
||||
"repo_evidence_missing": missing,
|
||||
"missing_deliverables": list(lane["missing_deliverables"]),
|
||||
"why_open": lane["why_open"],
|
||||
}
|
||||
workstream["status"] = classify_workstream(
|
||||
reference_results=reference_results,
|
||||
evidence_present=present,
|
||||
missing_deliverables=workstream["missing_deliverables"],
|
||||
)
|
||||
workstreams.append(workstream)
|
||||
|
||||
next_actions: list[str] = []
|
||||
for workstream in workstreams:
|
||||
if workstream["missing_deliverables"]:
|
||||
next_actions.append(f"{workstream['name']}: {workstream['missing_deliverables'][0]}")
|
||||
|
||||
return {
|
||||
"issue_number": 524,
|
||||
"title": DIRECTIVE_TITLE,
|
||||
"summary": DIRECTIVE_SUMMARY,
|
||||
"reference_snapshot": {str(k): v for k, v in sorted(snapshot.items())},
|
||||
"workstreams": workstreams,
|
||||
"reference_drift": drift_items,
|
||||
"grounded_workstreams": sum(1 for item in workstreams if item["status"] == "GROUNDED"),
|
||||
"partial_workstreams": sum(1 for item in workstreams if item["status"] == "PARTIAL"),
|
||||
"missing_workstreams": sum(1 for item in workstreams if item["status"] == "MISSING"),
|
||||
"next_actions": next_actions,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def render_markdown(result: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
f"# {result['title']}",
|
||||
"",
|
||||
"Grounding report for `timmy-home #524`.",
|
||||
"",
|
||||
result["summary"],
|
||||
"",
|
||||
"This remains a `Refs #524` artifact. The directive spans multiple repos and operator actions, so this report makes the current repo-side state executable without pretending the whole migration is complete.",
|
||||
"",
|
||||
"## Directive Snapshot",
|
||||
"",
|
||||
f"- Repo-grounded workstreams: {result['grounded_workstreams']}",
|
||||
f"- Partial workstreams: {result['partial_workstreams']}",
|
||||
f"- Missing workstreams: {result['missing_workstreams']}",
|
||||
f"- Drifted references: {len(result['reference_drift'])}",
|
||||
"",
|
||||
"## Reference Drift",
|
||||
"",
|
||||
]
|
||||
if result["reference_drift"]:
|
||||
lines.extend(f"- {item}" for item in result["reference_drift"])
|
||||
else:
|
||||
lines.append("- No stale cross-links detected in the directive snapshot.")
|
||||
|
||||
lines.extend(["", "## Workstream Matrix", ""])
|
||||
for index, workstream in enumerate(result["workstreams"], start=1):
|
||||
lines.extend(
|
||||
[
|
||||
f"### {index}. {workstream['name']} — {workstream['status']}",
|
||||
"",
|
||||
f"- Requirement: {workstream['requirement']}",
|
||||
]
|
||||
)
|
||||
if workstream["reference_results"]:
|
||||
lines.append("- Referenced issues:")
|
||||
for ref in workstream["reference_results"]:
|
||||
alignment = "aligned" if ref["aligned"] else "DRIFT"
|
||||
lines.append(
|
||||
f" - #{ref['number']} ({ref['state']}) — {ref['title']} [{alignment}]"
|
||||
)
|
||||
else:
|
||||
lines.append("- Referenced issues: none listed in the directive body")
|
||||
|
||||
if workstream["repo_evidence_present"]:
|
||||
lines.append("- Repo evidence present:")
|
||||
lines.extend(f" - {item}" for item in workstream["repo_evidence_present"])
|
||||
else:
|
||||
lines.append("- Repo evidence present: none")
|
||||
|
||||
if workstream["repo_evidence_missing"]:
|
||||
lines.append("- Repo evidence expected but missing:")
|
||||
lines.extend(f" - {item}" for item in workstream["repo_evidence_missing"])
|
||||
|
||||
if workstream["missing_deliverables"]:
|
||||
lines.append("- Missing operator deliverables:")
|
||||
lines.extend(f" - {item}" for item in workstream["missing_deliverables"])
|
||||
else:
|
||||
lines.append("- Missing operator deliverables: none")
|
||||
|
||||
lines.append(f"- Why this lane remains open: {workstream['why_open']}")
|
||||
lines.append("")
|
||||
|
||||
lines.extend(["## Highest-Leverage Next Actions", ""])
|
||||
lines.extend(f"- {item}" for item in result["next_actions"])
|
||||
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
"## Why #524 Remains Open",
|
||||
"",
|
||||
"- The directive bundles five separate workstreams with different evidence surfaces.",
|
||||
"- Multiple cited issue numbers have drifted away from the work they are supposed to anchor.",
|
||||
"- Repo scaffolding exists for Nostr, sovereignty audits, and Morrowind, but the operator-facing bundles are still missing.",
|
||||
"- Syntax Guard verification is still undocumented and unproven inside this repo.",
|
||||
]
|
||||
)
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Render the unified fleet sovereignty status report for issue #524")
|
||||
parser.add_argument("--snapshot", help="Optional JSON snapshot file overriding the default issue-title/state snapshot")
|
||||
parser.add_argument("--live", action="store_true", help="Refresh the issue snapshot from Gitea before rendering")
|
||||
parser.add_argument("--token-file", default=str(DEFAULT_TOKEN_FILE), help="Token file used with --live")
|
||||
parser.add_argument("--output", help="Optional path to write the rendered report")
|
||||
parser.add_argument("--json", action="store_true", help="Print computed JSON instead of markdown")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.live:
|
||||
snapshot = refresh_snapshot(Path(args.token_file).expanduser())
|
||||
else:
|
||||
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
|
||||
|
||||
result = evaluate_directive(snapshot=snapshot, repo_root=DEFAULT_REPO_ROOT)
|
||||
rendered = json.dumps(result, indent=2) if args.json else render_markdown(result)
|
||||
|
||||
if args.output:
|
||||
output_path = Path(args.output).expanduser()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(rendered, encoding="utf-8")
|
||||
print(f"Directive status written to {output_path}")
|
||||
else:
|
||||
print(rendered)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
182
tests/test_sherlock_wrapper.py
Normal file
182
tests/test_sherlock_wrapper.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for sherlock_wrapper — validates schema, caching, opt-in gate,
|
||||
and error handling without requiring sherlock to be installed.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
|
||||
|
||||
from sherlock_wrapper import (
|
||||
compute_query_hash,
|
||||
normalize_sherlock_output,
|
||||
require_opt_in,
|
||||
check_sherlock_available,
|
||||
get_cache_connection,
|
||||
save_to_cache,
|
||||
get_cached_result,
|
||||
)
|
||||
|
||||
|
||||
class TestSherlockWrapperSmoke(unittest.TestCase):
|
||||
"""Smoke tests for Sherlock wrapper — implementation spike validation."""
|
||||
|
||||
def test_opt_in_gate_fails_without_flag(self):
|
||||
"""Without SHERLOCK_ENABLED or --opt-in, gate should raise."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
require_opt_in(opt_in=False)
|
||||
self.assertIn("opt-in only", str(ctx.exception).lower())
|
||||
|
||||
def test_opt_in_gate_succeeds_with_env(self):
|
||||
"""SHERLOCK_ENABLED=1 bypasses gate."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", True):
|
||||
require_opt_in(opt_in=False) # Should not raise
|
||||
|
||||
def test_opt_in_gate_succeeds_with_flag(self):
|
||||
"""--opt-in flag bypasses gate."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
|
||||
require_opt_in(opt_in=True) # Should not raise
|
||||
|
||||
def test_query_hash_deterministic(self):
|
||||
"""Same input produces same hash."""
|
||||
h1 = compute_query_hash("alice")
|
||||
h2 = compute_query_hash("alice")
|
||||
self.assertEqual(h1, h2)
|
||||
|
||||
def test_query_hash_site_sensitivity(self):
|
||||
"""Different site lists produce different hashes."""
|
||||
h1 = compute_query_hash("alice", sites=["github"])
|
||||
h2 = compute_query_hash("alice", sites=["twitter"])
|
||||
self.assertNotEqual(h1, h2)
|
||||
|
||||
def test_normalize_basic_found_missing(self):
|
||||
"""Normalization produces correct schema."""
|
||||
raw = {
|
||||
"github": {"status": "found", "url": "https://github.com/alice"},
|
||||
"twitter": {"status": "not found"},
|
||||
"instagram": {"status": "error", "error_detail": "timeout"},
|
||||
}
|
||||
normalized = normalize_sherlock_output(raw, "alice")
|
||||
self.assertEqual(normalized["query"], "alice")
|
||||
self.assertEqual(normalized["metadata"]["found_count"], 1)
|
||||
self.assertEqual(normalized["metadata"]["missing_count"], 1)
|
||||
self.assertEqual(normalized["metadata"]["error_count"], 1)
|
||||
self.assertEqual(len(normalized["found"]), 1)
|
||||
self.assertEqual(normalized["found"][0]["site"], "github")
|
||||
self.assertIn("twitter", normalized["missing"])
|
||||
self.assertEqual(normalized["errors"][0]["site"], "instagram")
|
||||
|
||||
def test_normalized_schema_has_required_fields(self):
|
||||
"""Output schema contains all required top-level keys."""
|
||||
raw = {"site1": {"status": "not found"}}
|
||||
normalized = normalize_sherlock_output(raw, "testuser")
|
||||
required = ["schema_version", "query", "timestamp", "found", "missing",
|
||||
"errors", "metadata"]
|
||||
for key in required:
|
||||
self.assertIn(key, normalized)
|
||||
self.assertIsInstance(normalized["timestamp"], str)
|
||||
self.assertIsInstance(normalized["found"], list)
|
||||
self.assertIsInstance(normalized["missing"], list)
|
||||
self.assertIsInstance(normalized["errors"], list)
|
||||
self.assertIsInstance(normalized["metadata"], dict)
|
||||
|
||||
def test_cache_roundtrip(self):
|
||||
"""Result can be written and read back from cache."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
|
||||
test_result = {
|
||||
"schema_version": "1.0",
|
||||
"query": "alice",
|
||||
"timestamp": "2025-04-26T00:00:00+00:00",
|
||||
"found": [],
|
||||
"missing": ["github"],
|
||||
"errors": [],
|
||||
"metadata": {"total_sites_checked": 1, "found_count": 0, "missing_count": 1, "error_count": 0},
|
||||
}
|
||||
query_hash = compute_query_hash("alice")
|
||||
save_to_cache(query_hash, test_result)
|
||||
retrieved = get_cached_result(query_hash)
|
||||
self.assertEqual(retrieved, test_result)
|
||||
|
||||
def test_cache_miss_on_stale(self):
|
||||
"""Cache returns None when entry is older than 7 days."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
db_path = Path(tmp) / "cache.db"
|
||||
with patch("sherlock_wrapper.CACHE_DB", db_path):
|
||||
old_ts = "2025-04-01T00:00:00+00:00"
|
||||
old_result = {
|
||||
"schema_version": "1.0", "query": "alice",
|
||||
"timestamp": old_ts, "found": [], "missing": [], "errors": [],
|
||||
"metadata": {"total_sites_checked": 0, "found_count": 0, "missing_count": 0, "error_count": 0},
|
||||
}
|
||||
query_hash = compute_query_hash("alice")
|
||||
# Direct DB insert with controlled timestamp (bypass save_to_cache's NOW)
|
||||
conn = get_cache_connection()
|
||||
conn.execute(
|
||||
"INSERT INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
|
||||
(query_hash, json.dumps(old_result), old_ts)
|
||||
)
|
||||
conn.commit()
|
||||
retrieved = get_cached_result(query_hash)
|
||||
self.assertIsNone(retrieved)
|
||||
|
||||
def test_sherlock_available_check(self):
|
||||
"""check_sherlock_available returns bool."""
|
||||
available = check_sherlock_available()
|
||||
self.assertIsInstance(available, bool)
|
||||
# Note: on this test system sherlock may not be installed, so False is expected.
|
||||
# The important thing is the function returns a bool.
|
||||
print(f"[INFO] Sherlock installed: {available}")
|
||||
|
||||
|
||||
class TestSherlockWrapperIntegration(unittest.TestCase):
|
||||
"""Integration tests with mocked sherlock module."""
|
||||
|
||||
def test_run_sherlock_with_opt_in(self):
|
||||
"""run_sherlock succeeds with opt-in and returns normalized result."""
|
||||
fake_sherlock = MagicMock()
|
||||
fake_sherlock.sherlock = MagicMock(return_value={
|
||||
"github": {"status": "found", "url": "https://github.com/alice"},
|
||||
"twitter": {"status": "not found"},
|
||||
})
|
||||
with patch.dict("sys.modules", {"sherlock": fake_sherlock}):
|
||||
import importlib
|
||||
import sherlock_wrapper
|
||||
importlib.reload(sherlock_wrapper)
|
||||
with patch.dict(os.environ, {"SHERLOCK_ENABLED": "1"}):
|
||||
from sherlock_wrapper import run_sherlock
|
||||
result = run_sherlock("alice", opt_in=True)
|
||||
self.assertEqual(result["query"], "alice")
|
||||
self.assertEqual(result["metadata"]["found_count"], 1)
|
||||
|
||||
def test_run_sherlock_fails_without_opt_in(self):
|
||||
"""run_sherlock raises RuntimeError without opt-in."""
|
||||
from sherlock_wrapper import run_sherlock
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
run_sherlock("alice", opt_in=False)
|
||||
self.assertIn("opt-in only", str(ctx.exception).lower())
|
||||
|
||||
def test_run_sherlock_uses_cache(self):
|
||||
"""Cached result short-circuits sherlock execution."""
|
||||
cached = {
|
||||
"schema_version": "1.0", "query": "alice", "timestamp": "2025-04-26T00:00:00+00:00",
|
||||
"found": [{"site": "github", "url": "https://github.com/alice"}],
|
||||
"missing": ["twitter"],
|
||||
"errors": [],
|
||||
"metadata": {"total_sites_checked": 2, "found_count": 1, "missing_count": 1, "error_count": 0},
|
||||
}
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
|
||||
query_hash = compute_query_hash("alice")
|
||||
save_to_cache(query_hash, cached)
|
||||
from sherlock_wrapper import run_sherlock
|
||||
result = run_sherlock("alice", opt_in=True)
|
||||
self.assertEqual(result, cached)
|
||||
@@ -1,77 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPT_PATH = ROOT / "scripts" / "unified_fleet_sovereignty_status.py"
|
||||
DOC_PATH = ROOT / "docs" / "UNIFIED_FLEET_SOVEREIGNTY_STATUS.md"
|
||||
|
||||
|
||||
def _load_module(path: Path, name: str):
|
||||
assert path.exists(), f"missing {path.relative_to(ROOT)}"
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
assert spec and spec.loader
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _workstream(result: dict, key: str) -> dict:
|
||||
for workstream in result["workstreams"]:
|
||||
if workstream["key"] == key:
|
||||
return workstream
|
||||
raise AssertionError(f"missing workstream {key}")
|
||||
|
||||
|
||||
def test_evaluate_directive_flags_reference_drift_without_faking_completion() -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
|
||||
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
|
||||
|
||||
assert len(result["reference_drift"]) == 4
|
||||
assert any("#813" in item for item in result["reference_drift"])
|
||||
assert any("#103" in item for item in result["reference_drift"])
|
||||
|
||||
nostr = _workstream(result, "nostr-migration")
|
||||
assert nostr["status"] == "PARTIAL"
|
||||
assert any("timmy_client.py" in item for item in nostr["repo_evidence_present"])
|
||||
|
||||
lexicon = _workstream(result, "lexicon-enforcement")
|
||||
assert all(item["aligned"] for item in lexicon["reference_results"])
|
||||
assert lexicon["status"] == "PARTIAL"
|
||||
|
||||
syntax_guard = _workstream(result, "syntax-guard")
|
||||
assert syntax_guard["status"] == "MISSING"
|
||||
assert any("deployment verifier" in item for item in syntax_guard["missing_deliverables"])
|
||||
|
||||
|
||||
def test_render_markdown_includes_required_sections_and_grounding_evidence() -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "unified_fleet_sovereignty_status")
|
||||
result = mod.evaluate_directive(snapshot=mod.default_snapshot(), repo_root=ROOT)
|
||||
report = mod.render_markdown(result)
|
||||
|
||||
for snippet in (
|
||||
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
|
||||
"## Directive Snapshot",
|
||||
"## Reference Drift",
|
||||
"## Workstream Matrix",
|
||||
"### 5. Infrastructure Hardening / Syntax Guard — MISSING",
|
||||
"`infrastructure/timmy-bridge/client/timmy_client.py`",
|
||||
"machine-checkable lexicon policy for review/triage",
|
||||
"## Why #524 Remains Open",
|
||||
):
|
||||
assert snippet in report
|
||||
|
||||
|
||||
def test_repo_contains_committed_issue_524_grounding_doc() -> None:
|
||||
assert DOC_PATH.exists(), "missing committed directive grounding doc"
|
||||
text = DOC_PATH.read_text(encoding="utf-8")
|
||||
for snippet in (
|
||||
"# [DIRECTIVE] Unified Fleet Sovereignty & Comms Migration",
|
||||
"## Reference Drift",
|
||||
"## Workstream Matrix",
|
||||
"## Highest-Leverage Next Actions",
|
||||
"## Why #524 Remains Open",
|
||||
):
|
||||
assert snippet in text
|
||||
0
tools/__init__.py
Normal file
0
tools/__init__.py
Normal file
249
tools/sherlock_wrapper.py
Normal file
249
tools/sherlock_wrapper.py
Normal file
@@ -0,0 +1,249 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
|
||||
|
||||
This is an implementation spike (issue #874) to validate local integration
|
||||
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
|
||||
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
|
||||
|
||||
# Cache location
|
||||
CACHE_DIR = Path.home() / ".cache" / "timmy"
|
||||
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
|
||||
|
||||
# Normalized output schema version
|
||||
SCHEMA_VERSION = "1.0"
|
||||
|
||||
|
||||
def require_opt_in(opt_in: bool = False) -> None:
|
||||
"""Enforce opt-in gate for Sherlock external dependency."""
|
||||
if not (SHERLOCK_ENABLED or opt_in):
|
||||
raise RuntimeError(
|
||||
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
|
||||
)
|
||||
|
||||
|
||||
|
||||
def check_sherlock_available() -> bool:
|
||||
"""Check if sherlock Python package is installed."""
|
||||
try:
|
||||
import sherlock # type: ignore # noqa: F401
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
def get_cache_connection() -> sqlite3.Connection:
|
||||
"""Initialize cache directory and return DB connection."""
|
||||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(CACHE_DB))
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS cache (
|
||||
query_hash TEXT PRIMARY KEY,
|
||||
result_json TEXT NOT NULL,
|
||||
timestamp DATETIME NOT NULL
|
||||
)
|
||||
""")
|
||||
return conn
|
||||
|
||||
|
||||
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
|
||||
"""Deterministic hash for cache key."""
|
||||
components = [username.lower().strip()]
|
||||
if sites:
|
||||
components.extend(sorted(sites))
|
||||
raw = "|".join(components)
|
||||
return hashlib.sha256(raw.encode()).hexdigest()
|
||||
|
||||
|
||||
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
|
||||
conn = get_cache_connection()
|
||||
cur = conn.execute(
|
||||
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
|
||||
(query_hash,)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
result_json, ts_str = row
|
||||
# TTL: 7 days (604800 seconds)
|
||||
ts = datetime.fromisoformat(ts_str)
|
||||
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
|
||||
if age_seconds >= 604800:
|
||||
return None
|
||||
return json.loads(result_json)
|
||||
|
||||
|
||||
|
||||
|
||||
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
|
||||
"""Persist result to cache."""
|
||||
conn = get_cache_connection()
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
|
||||
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def normalize_sherlock_output(
|
||||
raw_result: Dict[str, Any],
|
||||
username: str,
|
||||
sites_checked: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert raw sherlock output into a stable, normalized schema.
|
||||
|
||||
Expected sherlock result shape (via Python API):
|
||||
{
|
||||
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
|
||||
...
|
||||
}
|
||||
"""
|
||||
found: List[Dict[str, str]] = []
|
||||
missing: List[str] = []
|
||||
errors: List[Dict[str, str]] = []
|
||||
|
||||
for site_name, site_data in raw_result.items():
|
||||
status = site_data.get("status", "")
|
||||
url = site_data.get("url", "")
|
||||
if status == "found" and url:
|
||||
found.append({"site": site_name, "url": url})
|
||||
elif status == "not found":
|
||||
missing.append(site_name)
|
||||
else:
|
||||
errors.append({"site": site_name, "error": status or "unknown"})
|
||||
|
||||
# Compute totals from the original site list if provided
|
||||
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
|
||||
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"query": username,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"found": found,
|
||||
"missing": missing,
|
||||
"errors": errors,
|
||||
"metadata": {
|
||||
"total_sites_checked": total_sites,
|
||||
"found_count": len(found),
|
||||
"missing_count": len(missing),
|
||||
"error_count": len(errors),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def run_sherlock(
|
||||
username: str,
|
||||
sites: Optional[List[str]] = None,
|
||||
timeout: Optional[int] = None,
|
||||
opt_in: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
|
||||
"""
|
||||
require_opt_in(opt_in)
|
||||
|
||||
# Compute cache key
|
||||
query_hash = compute_query_hash(username, sites)
|
||||
|
||||
# Check cache first — avoids dependency requirement on cache hit
|
||||
cached = get_cached_result(query_hash)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
# Only require sherlock on cache miss
|
||||
if not check_sherlock_available():
|
||||
raise RuntimeError(
|
||||
"Sherlock Python package not installed. "
|
||||
"Install with: pip install sherlock-project"
|
||||
)
|
||||
|
||||
# Call sherlock
|
||||
try:
|
||||
import sherlock
|
||||
from sherlock import sherlock as sherlock_main # type: ignore
|
||||
|
||||
if sites:
|
||||
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
|
||||
else:
|
||||
result = sherlock_main(username, timeout=timeout or 10)
|
||||
|
||||
normalized = normalize_sherlock_output(result, username, sites)
|
||||
save_to_cache(query_hash, normalized)
|
||||
return normalized
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Sherlock execution failed: {e}") from e
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--query", "-q", required=True,
|
||||
help="Username to search across sites"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--opt-in", action="store_true",
|
||||
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sites", "-s", nargs="+",
|
||||
help="Specific sites to check (default: all supported)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout", "-t", type=int, default=10,
|
||||
help="Request timeout per site (default: 10)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true",
|
||||
help="Output normalized JSON to stdout"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-cache",
|
||||
action="store_true",
|
||||
help="Bypass cached result (if any)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
result = run_sherlock(
|
||||
username=args.query,
|
||||
sites=args.sites,
|
||||
timeout=args.timeout,
|
||||
opt_in=args.opt_in
|
||||
)
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(f"Query: {result['query']}")
|
||||
print(f"Found: {result['metadata']['found_count']} site(s)")
|
||||
print(f"Missing: {result['metadata']['missing_count']} site(s)")
|
||||
print(f"Errors: {result['metadata']['error_count']} site(s)")
|
||||
for f in result['found']:
|
||||
print(f" [{f['site']}] {f['url']}")
|
||||
return 0
|
||||
except RuntimeError as e:
|
||||
print(f"ERROR: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user