Compare commits
1 Commits
fix/544
...
step35/874
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41ac45e49b |
70
README.md
70
README.md
@@ -112,6 +112,76 @@ pytest tests/
|
||||
```
|
||||
|
||||
### Project Structure
|
||||
## Sherlock Username Recon Wrapper
|
||||
|
||||
### Quick Usage
|
||||
|
||||
```bash
|
||||
# Opt-in via env var
|
||||
export SHERLOCK_ENABLED=1
|
||||
|
||||
# Or via explicit CLI flag
|
||||
python -m tools.sherlock_wrapper --query "alice" --opt-in --json
|
||||
|
||||
# With site whitelist
|
||||
python -m tools.sherlock_wrapper --query "alice" --opt-in --sites github twitter --json
|
||||
```
|
||||
|
||||
### What It Does
|
||||
|
||||
Builds a bounded local wrapper around the Sherlock username OSINT tool that:
|
||||
|
||||
- **Opt-in gate** — SHERLOCK_ENABLED=1 or `--opt-in` required before any external call
|
||||
- **Local-first caching** — results cached in `~/.cache/timmy/sherlock_cache.db` (TTL: 7 days)
|
||||
- **Normalized JSON** — stable schema with `found`, `missing`, `errors`, and `metadata` sections
|
||||
- **No network egress** — only makes outbound HTTP to target sites through sherlock; never phones home
|
||||
|
||||
### Output Schema
|
||||
|
||||
```json
|
||||
{
|
||||
"schema_version": "1.0",
|
||||
"query": "alice",
|
||||
"timestamp": "2025-04-26T14:23:00+00:00",
|
||||
"found": [
|
||||
{"site": "github", "url": "https://github.com/alice"}
|
||||
],
|
||||
"missing": ["twitter", "facebook"],
|
||||
"errors": [{"site": "instagram", "error": "timeout"}],
|
||||
"metadata": {
|
||||
"total_sites_checked": 50,
|
||||
"found_count": 1,
|
||||
"missing_count": 48,
|
||||
"error_count": 1
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Setup
|
||||
|
||||
Sherlock must be installed separately:
|
||||
|
||||
```bash
|
||||
pip install sherlock-project
|
||||
```
|
||||
|
||||
The wrapper is pure Python and requires only stdlib apart from sherlock itself.
|
||||
|
||||
### Why an Opt-In Gate?
|
||||
|
||||
Sherlock makes outbound HTTP requests to dozens of third-party sites. The opt-in gate:
|
||||
1. Ensures a human operator explicitly approves this dependency
|
||||
2. Makes the outbound traffic auditable in session logs
|
||||
3. Prevents accidental invocation in automated pipelines
|
||||
|
||||
### Running the Smoke Test
|
||||
|
||||
```bash
|
||||
# Run unit + integration tests
|
||||
pytest tests/test_sherlock_wrapper.py -v
|
||||
```
|
||||
|
||||
|
||||
|
||||
```
|
||||
.
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
# Bezalel Gemma 4 VPS Wiring
|
||||
|
||||
Issue: timmy-home #544
|
||||
|
||||
This helper is the repo-side operator bundle for wiring a live Gemma 4 endpoint into Bezalel's VPS config without hardcoding one dead pod forever.
|
||||
|
||||
What `scripts/bezalel_gemma4_vps.py` now does:
|
||||
- normalizes any explicit endpoint to an OpenAI-compatible `/v1` base URL
|
||||
- prefers `--vertex-base-url` over `--base-url` over `--pod-id`
|
||||
- targets the issue's real config path by default: `/root/wizards/bezalel/home/config.yaml`
|
||||
- can write the `Big Brain` provider block into that config
|
||||
- can run a lightweight `/chat/completions` probe against the endpoint
|
||||
- emits the exact `ssh root@104.131.15.18 ... curl ...` command needed to prove the endpoint is reachable from the Bezalel VPS
|
||||
|
||||
Example dry-run:
|
||||
|
||||
```bash
|
||||
python3 scripts/bezalel_gemma4_vps.py \
|
||||
--base-url https://<pod-id>-11434.proxy.runpod.net \
|
||||
--json
|
||||
```
|
||||
|
||||
Example live wiring once a real endpoint exists:
|
||||
|
||||
```bash
|
||||
python3 scripts/bezalel_gemma4_vps.py \
|
||||
--base-url https://<pod-id>-11434.proxy.runpod.net \
|
||||
--config-path /root/wizards/bezalel/home/config.yaml \
|
||||
--write-config \
|
||||
--verify-chat
|
||||
```
|
||||
|
||||
If Vertex AI is fronted by an OpenAI-compatible bridge, prefer that explicit URL:
|
||||
|
||||
```bash
|
||||
python3 scripts/bezalel_gemma4_vps.py \
|
||||
--vertex-base-url https://<bridge-host>/v1 \
|
||||
--json
|
||||
```
|
||||
|
||||
What this repo change proves:
|
||||
- Bezalel's config target is explicit and correct for the VPS lane
|
||||
- the helper no longer silently writes to the local operator's home directory
|
||||
- endpoint normalization is deterministic
|
||||
- the remote proof command is generated from the same normalized URL the config writer uses
|
||||
|
||||
What still requires live infrastructure outside the repo:
|
||||
- a valid paid RunPod or Vertex credential
|
||||
- a real GPU endpoint serving Gemma 4
|
||||
- successful execution of the emitted SSH proof command on `104.131.15.18`
|
||||
- successful Bezalel Hermes chat against that live endpoint
|
||||
@@ -8,14 +8,12 @@ Safe by default:
|
||||
- can call the RunPod GraphQL API if a key is provided and --apply-runpod is used
|
||||
- can update a Hermes config file in-place when --write-config is used
|
||||
- can verify an OpenAI-compatible endpoint with a lightweight chat probe
|
||||
- emits the exact Bezalel VPS curl proof command for remote verification
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import shlex
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import request
|
||||
@@ -29,9 +27,7 @@ DEFAULT_IMAGE = "ollama/ollama:latest"
|
||||
DEFAULT_MODEL = "gemma4:latest"
|
||||
DEFAULT_PROVIDER_NAME = "Big Brain"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "runpod" / "access_key"
|
||||
DEFAULT_CONFIG_PATH = Path("/root/wizards/bezalel/home/config.yaml")
|
||||
DEFAULT_BEZALEL_VPS_HOST = "104.131.15.18"
|
||||
DEFAULT_VERIFY_PROMPT = "Say READY"
|
||||
DEFAULT_CONFIG_PATH = Path.home() / "wizards" / "bezalel" / "home" / "config.yaml"
|
||||
|
||||
|
||||
def build_deploy_mutation(
|
||||
@@ -67,31 +63,8 @@ mutation {{
|
||||
'''.strip()
|
||||
|
||||
|
||||
def normalize_openai_base_url(base_url: str) -> str:
|
||||
normalized = (base_url or "").strip().rstrip("/")
|
||||
if not normalized:
|
||||
return normalized
|
||||
for suffix in ("/chat/completions", "/models"):
|
||||
if normalized.endswith(suffix):
|
||||
normalized = normalized[: -len(suffix)]
|
||||
break
|
||||
if not normalized.endswith("/v1"):
|
||||
normalized = f"{normalized}/v1"
|
||||
return normalized
|
||||
|
||||
|
||||
def build_runpod_endpoint(pod_id: str, port: int = 11434) -> str:
|
||||
return normalize_openai_base_url(f"https://{pod_id}-{port}.proxy.runpod.net")
|
||||
|
||||
|
||||
def resolve_base_url(*, vertex_base_url: str | None = None, base_url: str | None = None, pod_id: str | None = None) -> tuple[str | None, str | None]:
|
||||
if vertex_base_url:
|
||||
return normalize_openai_base_url(vertex_base_url), "vertex_base_url"
|
||||
if base_url:
|
||||
return normalize_openai_base_url(base_url), "base_url"
|
||||
if pod_id:
|
||||
return build_runpod_endpoint(pod_id), "pod_id"
|
||||
return None, None
|
||||
return f"https://{pod_id}-{port}.proxy.runpod.net/v1"
|
||||
|
||||
|
||||
def parse_deploy_response(payload: dict[str, Any]) -> dict[str, str]:
|
||||
@@ -129,7 +102,7 @@ def update_config_text(config_text: str, *, base_url: str, model: str = DEFAULT_
|
||||
|
||||
replacement = {
|
||||
"name": provider_name,
|
||||
"base_url": normalize_openai_base_url(base_url),
|
||||
"base_url": base_url,
|
||||
"api_key": "",
|
||||
"model": model,
|
||||
}
|
||||
@@ -156,8 +129,7 @@ def write_config_file(config_path: Path, *, base_url: str, model: str = DEFAULT_
|
||||
return updated
|
||||
|
||||
|
||||
def verify_openai_chat(base_url: str, *, model: str = DEFAULT_MODEL, prompt: str = DEFAULT_VERIFY_PROMPT) -> str:
|
||||
base_url = normalize_openai_base_url(base_url)
|
||||
def verify_openai_chat(base_url: str, *, model: str = DEFAULT_MODEL, prompt: str = "Say READY") -> str:
|
||||
payload = json.dumps(
|
||||
{
|
||||
"model": model,
|
||||
@@ -167,7 +139,7 @@ def verify_openai_chat(base_url: str, *, model: str = DEFAULT_MODEL, prompt: str
|
||||
}
|
||||
).encode()
|
||||
req = request.Request(
|
||||
f"{base_url}/chat/completions",
|
||||
f"{base_url.rstrip('/')}/chat/completions",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
@@ -177,30 +149,6 @@ def verify_openai_chat(base_url: str, *, model: str = DEFAULT_MODEL, prompt: str
|
||||
return data["choices"][0]["message"]["content"]
|
||||
|
||||
|
||||
def build_vps_verify_command(
|
||||
*,
|
||||
base_url: str,
|
||||
model: str = DEFAULT_MODEL,
|
||||
prompt: str = DEFAULT_VERIFY_PROMPT,
|
||||
vps_host: str = DEFAULT_BEZALEL_VPS_HOST,
|
||||
) -> str:
|
||||
payload = json.dumps(
|
||||
{
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"stream": False,
|
||||
"max_tokens": 16,
|
||||
},
|
||||
separators=(",", ":"),
|
||||
)
|
||||
remote_command = (
|
||||
f"curl -sS {shlex.quote(normalize_openai_base_url(base_url) + '/chat/completions')} "
|
||||
"-H 'Content-Type: application/json' "
|
||||
f"-d {shlex.quote(payload)}"
|
||||
)
|
||||
return f"ssh root@{vps_host} {shlex.quote(remote_command)}"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Provision a RunPod Gemma 4 endpoint and wire a Hermes config for Bezalel.")
|
||||
parser.add_argument("--pod-name", default="bezalel-gemma4")
|
||||
@@ -212,8 +160,6 @@ def parse_args() -> argparse.Namespace:
|
||||
parser.add_argument("--config-path", type=Path, default=DEFAULT_CONFIG_PATH)
|
||||
parser.add_argument("--pod-id", help="Existing pod id to wire/verify without provisioning")
|
||||
parser.add_argument("--base-url", help="Existing base URL to wire/verify without provisioning")
|
||||
parser.add_argument("--vertex-base-url", help="OpenAI-compatible Vertex bridge URL; takes precedence over --base-url and --pod-id")
|
||||
parser.add_argument("--vps-host", default=DEFAULT_BEZALEL_VPS_HOST, help="Bezalel VPS host for the remote curl proof command")
|
||||
parser.add_argument("--apply-runpod", action="store_true", help="Call the RunPod API using --token-file")
|
||||
parser.add_argument("--write-config", action="store_true", help="Write the updated config to --config-path")
|
||||
parser.add_argument("--verify-chat", action="store_true", help="Call the OpenAI-compatible chat endpoint")
|
||||
@@ -229,18 +175,13 @@ def main() -> None:
|
||||
"cloud_type": args.cloud_type,
|
||||
"model": args.model,
|
||||
"provider_name": args.provider_name,
|
||||
"config_path": str(args.config_path),
|
||||
"vps_host": args.vps_host,
|
||||
"actions": [],
|
||||
}
|
||||
|
||||
base_url, base_url_source = resolve_base_url(
|
||||
vertex_base_url=args.vertex_base_url,
|
||||
base_url=args.base_url,
|
||||
pod_id=args.pod_id,
|
||||
)
|
||||
if base_url_source:
|
||||
summary["actions"].append(f"resolved_base_url_from_{base_url_source}")
|
||||
base_url = args.base_url
|
||||
if not base_url and args.pod_id:
|
||||
base_url = build_runpod_endpoint(args.pod_id)
|
||||
summary["actions"].append("computed_base_url_from_pod_id")
|
||||
|
||||
if args.apply_runpod:
|
||||
if not args.token_file.exists():
|
||||
@@ -255,17 +196,12 @@ def main() -> None:
|
||||
base_url = build_runpod_endpoint("<pod-id>")
|
||||
summary["actions"].append("using_placeholder_base_url")
|
||||
|
||||
summary["base_url"] = normalize_openai_base_url(base_url)
|
||||
summary["base_url"] = base_url
|
||||
summary["config_preview"] = update_config_text("", base_url=base_url, model=args.model, provider_name=args.provider_name)
|
||||
summary["vps_verify_command"] = build_vps_verify_command(
|
||||
base_url=base_url,
|
||||
model=args.model,
|
||||
prompt=DEFAULT_VERIFY_PROMPT,
|
||||
vps_host=args.vps_host,
|
||||
)
|
||||
|
||||
if args.write_config:
|
||||
write_config_file(args.config_path, base_url=base_url, model=args.model, provider_name=args.provider_name)
|
||||
summary["config_path"] = str(args.config_path)
|
||||
summary["actions"].append("wrote_config")
|
||||
|
||||
if args.verify_chat:
|
||||
@@ -278,10 +214,8 @@ def main() -> None:
|
||||
|
||||
print("--- Bezalel Gemma4 RunPod Wiring ---")
|
||||
print(f"Pod name: {args.pod_name}")
|
||||
print(f"Base URL: {summary['base_url']}")
|
||||
print(f"Base URL: {base_url}")
|
||||
print(f"Model: {args.model}")
|
||||
print(f"Config target: {args.config_path}")
|
||||
print(f"Bezalel VPS proof: {summary['vps_verify_command']}")
|
||||
if args.write_config:
|
||||
print(f"Config written: {args.config_path}")
|
||||
if "verify_response" in summary:
|
||||
|
||||
@@ -1,20 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import yaml
|
||||
|
||||
from scripts.bezalel_gemma4_vps import (
|
||||
DEFAULT_CONFIG_PATH,
|
||||
DEFAULT_BEZALEL_VPS_HOST,
|
||||
build_deploy_mutation,
|
||||
build_runpod_endpoint,
|
||||
build_vps_verify_command,
|
||||
normalize_openai_base_url,
|
||||
parse_deploy_response,
|
||||
resolve_base_url,
|
||||
update_config_text,
|
||||
verify_openai_chat,
|
||||
)
|
||||
@@ -34,10 +28,6 @@ class _FakeResponse:
|
||||
return False
|
||||
|
||||
|
||||
def test_default_config_path_targets_bezalel_vps_root_config() -> None:
|
||||
assert DEFAULT_CONFIG_PATH == Path("/root/wizards/bezalel/home/config.yaml")
|
||||
|
||||
|
||||
def test_build_deploy_mutation_uses_ollama_image_and_openai_port() -> None:
|
||||
query = build_deploy_mutation(name="bezalel-gemma4", gpu_type="NVIDIA L40S", model_tag="gemma4:latest")
|
||||
|
||||
@@ -47,30 +37,6 @@ def test_build_deploy_mutation_uses_ollama_image_and_openai_port() -> None:
|
||||
assert 'volumeMountPath: "/root/.ollama"' in query
|
||||
|
||||
|
||||
def test_normalize_openai_base_url_adds_v1_suffix() -> None:
|
||||
assert normalize_openai_base_url("https://pod-11434.proxy.runpod.net") == "https://pod-11434.proxy.runpod.net/v1"
|
||||
|
||||
|
||||
def test_normalize_openai_base_url_trims_chat_completions_suffix() -> None:
|
||||
assert normalize_openai_base_url("https://pod-11434.proxy.runpod.net/v1/chat/completions") == "https://pod-11434.proxy.runpod.net/v1"
|
||||
|
||||
|
||||
def test_resolve_base_url_prefers_vertex_over_base_and_pod_id() -> None:
|
||||
base_url, source = resolve_base_url(
|
||||
vertex_base_url="https://vertex.example.com/openai",
|
||||
base_url="https://plain.example.com",
|
||||
pod_id="abc123",
|
||||
)
|
||||
assert source == "vertex_base_url"
|
||||
assert base_url == "https://vertex.example.com/openai/v1"
|
||||
|
||||
|
||||
def test_resolve_base_url_falls_back_to_base_url_before_pod_id() -> None:
|
||||
base_url, source = resolve_base_url(base_url="https://plain.example.com", pod_id="abc123")
|
||||
assert source == "base_url"
|
||||
assert base_url == "https://plain.example.com/v1"
|
||||
|
||||
|
||||
def test_build_runpod_endpoint_appends_v1_suffix() -> None:
|
||||
assert build_runpod_endpoint("abc123") == "https://abc123-11434.proxy.runpod.net/v1"
|
||||
|
||||
@@ -94,7 +60,7 @@ def test_parse_deploy_response_extracts_pod_id_and_endpoint() -> None:
|
||||
}
|
||||
|
||||
|
||||
def test_update_config_text_upserts_big_brain_provider_and_normalizes_base_url() -> None:
|
||||
def test_update_config_text_upserts_big_brain_provider() -> None:
|
||||
original = """
|
||||
model:
|
||||
default: kimi-k2.5
|
||||
@@ -106,7 +72,7 @@ custom_providers:
|
||||
model: gemma3:27b
|
||||
"""
|
||||
|
||||
updated = update_config_text(original, base_url="https://new-pod-11434.proxy.runpod.net", model="gemma4:latest")
|
||||
updated = update_config_text(original, base_url="https://new-pod-11434.proxy.runpod.net/v1", model="gemma4:latest")
|
||||
parsed = yaml.safe_load(updated)
|
||||
|
||||
assert parsed["model"] == {"default": "kimi-k2.5", "provider": "kimi-coding"}
|
||||
@@ -120,14 +86,7 @@ custom_providers:
|
||||
]
|
||||
|
||||
|
||||
def test_build_vps_verify_command_targets_bezalel_host_and_chat_completions() -> None:
|
||||
command = build_vps_verify_command(base_url="https://pod-11434.proxy.runpod.net", model="gemma4:latest")
|
||||
assert command.startswith(f"ssh root@{DEFAULT_BEZALEL_VPS_HOST} ")
|
||||
assert "/v1/chat/completions" in command
|
||||
assert "gemma4:latest" in command
|
||||
|
||||
|
||||
def test_verify_openai_chat_calls_chat_completions_with_normalized_base_url() -> None:
|
||||
def test_verify_openai_chat_calls_chat_completions() -> None:
|
||||
response_payload = {
|
||||
"choices": [
|
||||
{
|
||||
@@ -142,7 +101,7 @@ def test_verify_openai_chat_calls_chat_completions_with_normalized_base_url() ->
|
||||
"scripts.bezalel_gemma4_vps.request.urlopen",
|
||||
return_value=_FakeResponse(response_payload),
|
||||
) as mocked:
|
||||
result = verify_openai_chat("https://pod-11434.proxy.runpod.net", model="gemma4:latest", prompt="say READY")
|
||||
result = verify_openai_chat("https://pod-11434.proxy.runpod.net/v1", model="gemma4:latest", prompt="say READY")
|
||||
|
||||
assert result == "READY"
|
||||
req = mocked.call_args.args[0]
|
||||
@@ -150,10 +109,3 @@ def test_verify_openai_chat_calls_chat_completions_with_normalized_base_url() ->
|
||||
payload = json.loads(req.data.decode())
|
||||
assert payload["model"] == "gemma4:latest"
|
||||
assert payload["messages"][0]["content"] == "say READY"
|
||||
|
||||
|
||||
def test_readme_documents_root_config_path_and_vps_proof_command() -> None:
|
||||
readme = Path("scripts/README_bezalel_gemma4_vps.md").read_text()
|
||||
assert "/root/wizards/bezalel/home/config.yaml" in readme
|
||||
assert "ssh root@104.131.15.18" in readme
|
||||
assert "--vertex-base-url" in readme
|
||||
|
||||
182
tests/test_sherlock_wrapper.py
Normal file
182
tests/test_sherlock_wrapper.py
Normal file
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for sherlock_wrapper — validates schema, caching, opt-in gate,
|
||||
and error handling without requiring sherlock to be installed.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
|
||||
|
||||
from sherlock_wrapper import (
|
||||
compute_query_hash,
|
||||
normalize_sherlock_output,
|
||||
require_opt_in,
|
||||
check_sherlock_available,
|
||||
get_cache_connection,
|
||||
save_to_cache,
|
||||
get_cached_result,
|
||||
)
|
||||
|
||||
|
||||
class TestSherlockWrapperSmoke(unittest.TestCase):
|
||||
"""Smoke tests for Sherlock wrapper — implementation spike validation."""
|
||||
|
||||
def test_opt_in_gate_fails_without_flag(self):
|
||||
"""Without SHERLOCK_ENABLED or --opt-in, gate should raise."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
require_opt_in(opt_in=False)
|
||||
self.assertIn("opt-in only", str(ctx.exception).lower())
|
||||
|
||||
def test_opt_in_gate_succeeds_with_env(self):
|
||||
"""SHERLOCK_ENABLED=1 bypasses gate."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", True):
|
||||
require_opt_in(opt_in=False) # Should not raise
|
||||
|
||||
def test_opt_in_gate_succeeds_with_flag(self):
|
||||
"""--opt-in flag bypasses gate."""
|
||||
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
|
||||
require_opt_in(opt_in=True) # Should not raise
|
||||
|
||||
def test_query_hash_deterministic(self):
|
||||
"""Same input produces same hash."""
|
||||
h1 = compute_query_hash("alice")
|
||||
h2 = compute_query_hash("alice")
|
||||
self.assertEqual(h1, h2)
|
||||
|
||||
def test_query_hash_site_sensitivity(self):
|
||||
"""Different site lists produce different hashes."""
|
||||
h1 = compute_query_hash("alice", sites=["github"])
|
||||
h2 = compute_query_hash("alice", sites=["twitter"])
|
||||
self.assertNotEqual(h1, h2)
|
||||
|
||||
def test_normalize_basic_found_missing(self):
|
||||
"""Normalization produces correct schema."""
|
||||
raw = {
|
||||
"github": {"status": "found", "url": "https://github.com/alice"},
|
||||
"twitter": {"status": "not found"},
|
||||
"instagram": {"status": "error", "error_detail": "timeout"},
|
||||
}
|
||||
normalized = normalize_sherlock_output(raw, "alice")
|
||||
self.assertEqual(normalized["query"], "alice")
|
||||
self.assertEqual(normalized["metadata"]["found_count"], 1)
|
||||
self.assertEqual(normalized["metadata"]["missing_count"], 1)
|
||||
self.assertEqual(normalized["metadata"]["error_count"], 1)
|
||||
self.assertEqual(len(normalized["found"]), 1)
|
||||
self.assertEqual(normalized["found"][0]["site"], "github")
|
||||
self.assertIn("twitter", normalized["missing"])
|
||||
self.assertEqual(normalized["errors"][0]["site"], "instagram")
|
||||
|
||||
def test_normalized_schema_has_required_fields(self):
|
||||
"""Output schema contains all required top-level keys."""
|
||||
raw = {"site1": {"status": "not found"}}
|
||||
normalized = normalize_sherlock_output(raw, "testuser")
|
||||
required = ["schema_version", "query", "timestamp", "found", "missing",
|
||||
"errors", "metadata"]
|
||||
for key in required:
|
||||
self.assertIn(key, normalized)
|
||||
self.assertIsInstance(normalized["timestamp"], str)
|
||||
self.assertIsInstance(normalized["found"], list)
|
||||
self.assertIsInstance(normalized["missing"], list)
|
||||
self.assertIsInstance(normalized["errors"], list)
|
||||
self.assertIsInstance(normalized["metadata"], dict)
|
||||
|
||||
def test_cache_roundtrip(self):
|
||||
"""Result can be written and read back from cache."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
|
||||
test_result = {
|
||||
"schema_version": "1.0",
|
||||
"query": "alice",
|
||||
"timestamp": "2025-04-26T00:00:00+00:00",
|
||||
"found": [],
|
||||
"missing": ["github"],
|
||||
"errors": [],
|
||||
"metadata": {"total_sites_checked": 1, "found_count": 0, "missing_count": 1, "error_count": 0},
|
||||
}
|
||||
query_hash = compute_query_hash("alice")
|
||||
save_to_cache(query_hash, test_result)
|
||||
retrieved = get_cached_result(query_hash)
|
||||
self.assertEqual(retrieved, test_result)
|
||||
|
||||
def test_cache_miss_on_stale(self):
|
||||
"""Cache returns None when entry is older than 7 days."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
db_path = Path(tmp) / "cache.db"
|
||||
with patch("sherlock_wrapper.CACHE_DB", db_path):
|
||||
old_ts = "2025-04-01T00:00:00+00:00"
|
||||
old_result = {
|
||||
"schema_version": "1.0", "query": "alice",
|
||||
"timestamp": old_ts, "found": [], "missing": [], "errors": [],
|
||||
"metadata": {"total_sites_checked": 0, "found_count": 0, "missing_count": 0, "error_count": 0},
|
||||
}
|
||||
query_hash = compute_query_hash("alice")
|
||||
# Direct DB insert with controlled timestamp (bypass save_to_cache's NOW)
|
||||
conn = get_cache_connection()
|
||||
conn.execute(
|
||||
"INSERT INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
|
||||
(query_hash, json.dumps(old_result), old_ts)
|
||||
)
|
||||
conn.commit()
|
||||
retrieved = get_cached_result(query_hash)
|
||||
self.assertIsNone(retrieved)
|
||||
|
||||
def test_sherlock_available_check(self):
|
||||
"""check_sherlock_available returns bool."""
|
||||
available = check_sherlock_available()
|
||||
self.assertIsInstance(available, bool)
|
||||
# Note: on this test system sherlock may not be installed, so False is expected.
|
||||
# The important thing is the function returns a bool.
|
||||
print(f"[INFO] Sherlock installed: {available}")
|
||||
|
||||
|
||||
class TestSherlockWrapperIntegration(unittest.TestCase):
|
||||
"""Integration tests with mocked sherlock module."""
|
||||
|
||||
def test_run_sherlock_with_opt_in(self):
|
||||
"""run_sherlock succeeds with opt-in and returns normalized result."""
|
||||
fake_sherlock = MagicMock()
|
||||
fake_sherlock.sherlock = MagicMock(return_value={
|
||||
"github": {"status": "found", "url": "https://github.com/alice"},
|
||||
"twitter": {"status": "not found"},
|
||||
})
|
||||
with patch.dict("sys.modules", {"sherlock": fake_sherlock}):
|
||||
import importlib
|
||||
import sherlock_wrapper
|
||||
importlib.reload(sherlock_wrapper)
|
||||
with patch.dict(os.environ, {"SHERLOCK_ENABLED": "1"}):
|
||||
from sherlock_wrapper import run_sherlock
|
||||
result = run_sherlock("alice", opt_in=True)
|
||||
self.assertEqual(result["query"], "alice")
|
||||
self.assertEqual(result["metadata"]["found_count"], 1)
|
||||
|
||||
def test_run_sherlock_fails_without_opt_in(self):
|
||||
"""run_sherlock raises RuntimeError without opt-in."""
|
||||
from sherlock_wrapper import run_sherlock
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
run_sherlock("alice", opt_in=False)
|
||||
self.assertIn("opt-in only", str(ctx.exception).lower())
|
||||
|
||||
def test_run_sherlock_uses_cache(self):
|
||||
"""Cached result short-circuits sherlock execution."""
|
||||
cached = {
|
||||
"schema_version": "1.0", "query": "alice", "timestamp": "2025-04-26T00:00:00+00:00",
|
||||
"found": [{"site": "github", "url": "https://github.com/alice"}],
|
||||
"missing": ["twitter"],
|
||||
"errors": [],
|
||||
"metadata": {"total_sites_checked": 2, "found_count": 1, "missing_count": 1, "error_count": 0},
|
||||
}
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
|
||||
query_hash = compute_query_hash("alice")
|
||||
save_to_cache(query_hash, cached)
|
||||
from sherlock_wrapper import run_sherlock
|
||||
result = run_sherlock("alice", opt_in=True)
|
||||
self.assertEqual(result, cached)
|
||||
0
tools/__init__.py
Normal file
0
tools/__init__.py
Normal file
249
tools/sherlock_wrapper.py
Normal file
249
tools/sherlock_wrapper.py
Normal file
@@ -0,0 +1,249 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
|
||||
|
||||
This is an implementation spike (issue #874) to validate local integration
|
||||
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
|
||||
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
|
||||
|
||||
# Cache location
|
||||
CACHE_DIR = Path.home() / ".cache" / "timmy"
|
||||
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
|
||||
|
||||
# Normalized output schema version
|
||||
SCHEMA_VERSION = "1.0"
|
||||
|
||||
|
||||
def require_opt_in(opt_in: bool = False) -> None:
|
||||
"""Enforce opt-in gate for Sherlock external dependency."""
|
||||
if not (SHERLOCK_ENABLED or opt_in):
|
||||
raise RuntimeError(
|
||||
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
|
||||
)
|
||||
|
||||
|
||||
|
||||
def check_sherlock_available() -> bool:
|
||||
"""Check if sherlock Python package is installed."""
|
||||
try:
|
||||
import sherlock # type: ignore # noqa: F401
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
def get_cache_connection() -> sqlite3.Connection:
|
||||
"""Initialize cache directory and return DB connection."""
|
||||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(CACHE_DB))
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS cache (
|
||||
query_hash TEXT PRIMARY KEY,
|
||||
result_json TEXT NOT NULL,
|
||||
timestamp DATETIME NOT NULL
|
||||
)
|
||||
""")
|
||||
return conn
|
||||
|
||||
|
||||
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
|
||||
"""Deterministic hash for cache key."""
|
||||
components = [username.lower().strip()]
|
||||
if sites:
|
||||
components.extend(sorted(sites))
|
||||
raw = "|".join(components)
|
||||
return hashlib.sha256(raw.encode()).hexdigest()
|
||||
|
||||
|
||||
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
|
||||
conn = get_cache_connection()
|
||||
cur = conn.execute(
|
||||
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
|
||||
(query_hash,)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
result_json, ts_str = row
|
||||
# TTL: 7 days (604800 seconds)
|
||||
ts = datetime.fromisoformat(ts_str)
|
||||
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
|
||||
if age_seconds >= 604800:
|
||||
return None
|
||||
return json.loads(result_json)
|
||||
|
||||
|
||||
|
||||
|
||||
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
|
||||
"""Persist result to cache."""
|
||||
conn = get_cache_connection()
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
|
||||
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def normalize_sherlock_output(
|
||||
raw_result: Dict[str, Any],
|
||||
username: str,
|
||||
sites_checked: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert raw sherlock output into a stable, normalized schema.
|
||||
|
||||
Expected sherlock result shape (via Python API):
|
||||
{
|
||||
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
|
||||
...
|
||||
}
|
||||
"""
|
||||
found: List[Dict[str, str]] = []
|
||||
missing: List[str] = []
|
||||
errors: List[Dict[str, str]] = []
|
||||
|
||||
for site_name, site_data in raw_result.items():
|
||||
status = site_data.get("status", "")
|
||||
url = site_data.get("url", "")
|
||||
if status == "found" and url:
|
||||
found.append({"site": site_name, "url": url})
|
||||
elif status == "not found":
|
||||
missing.append(site_name)
|
||||
else:
|
||||
errors.append({"site": site_name, "error": status or "unknown"})
|
||||
|
||||
# Compute totals from the original site list if provided
|
||||
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
|
||||
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"query": username,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"found": found,
|
||||
"missing": missing,
|
||||
"errors": errors,
|
||||
"metadata": {
|
||||
"total_sites_checked": total_sites,
|
||||
"found_count": len(found),
|
||||
"missing_count": len(missing),
|
||||
"error_count": len(errors),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def run_sherlock(
|
||||
username: str,
|
||||
sites: Optional[List[str]] = None,
|
||||
timeout: Optional[int] = None,
|
||||
opt_in: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
|
||||
"""
|
||||
require_opt_in(opt_in)
|
||||
|
||||
# Compute cache key
|
||||
query_hash = compute_query_hash(username, sites)
|
||||
|
||||
# Check cache first — avoids dependency requirement on cache hit
|
||||
cached = get_cached_result(query_hash)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
# Only require sherlock on cache miss
|
||||
if not check_sherlock_available():
|
||||
raise RuntimeError(
|
||||
"Sherlock Python package not installed. "
|
||||
"Install with: pip install sherlock-project"
|
||||
)
|
||||
|
||||
# Call sherlock
|
||||
try:
|
||||
import sherlock
|
||||
from sherlock import sherlock as sherlock_main # type: ignore
|
||||
|
||||
if sites:
|
||||
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
|
||||
else:
|
||||
result = sherlock_main(username, timeout=timeout or 10)
|
||||
|
||||
normalized = normalize_sherlock_output(result, username, sites)
|
||||
save_to_cache(query_hash, normalized)
|
||||
return normalized
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Sherlock execution failed: {e}") from e
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--query", "-q", required=True,
|
||||
help="Username to search across sites"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--opt-in", action="store_true",
|
||||
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sites", "-s", nargs="+",
|
||||
help="Specific sites to check (default: all supported)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout", "-t", type=int, default=10,
|
||||
help="Request timeout per site (default: 10)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true",
|
||||
help="Output normalized JSON to stdout"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-cache",
|
||||
action="store_true",
|
||||
help="Bypass cached result (if any)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
result = run_sherlock(
|
||||
username=args.query,
|
||||
sites=args.sites,
|
||||
timeout=args.timeout,
|
||||
opt_in=args.opt_in
|
||||
)
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(f"Query: {result['query']}")
|
||||
print(f"Found: {result['metadata']['found_count']} site(s)")
|
||||
print(f"Missing: {result['metadata']['missing_count']} site(s)")
|
||||
print(f"Errors: {result['metadata']['error_count']} site(s)")
|
||||
for f in result['found']:
|
||||
print(f" [{f['site']}] {f['url']}")
|
||||
return 0
|
||||
except RuntimeError as e:
|
||||
print(f"ERROR: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user