Compare commits

..

2 Commits

Author SHA1 Message Date
Rockachopa
7bff0d5576 fix(#666): make single-repo genome analyzer use canonical template
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 22s
Smoke Test / smoke (pull_request) Failing after 31s
Agent PR Gate / gate (pull_request) Failing after 55s
Agent PR Gate / report (pull_request) Successful in 25s
scripts/genome_analyzer.py now loads templates/GENOME-template.md and
populates all {{PLACEHOLDERS}} with auto-discovered repository data:
- REPO_NAME, DATE, SHORT_DESCRIPTION from README
- OVERVIEW from README with file statistics
- ARCHITECTURE_DIAGRAM from directory tree (mermaid)
- ENTRY_POINTS from candidate files + scripts/ + src/
- DATA_FLOW, ABSTRACTIONS, API_SURFACE, COVERAGE_GAPS,
  CRITICAL_PATHS, SECURITY, DESIGN_DECISIONS populated with
  actionable guidance placeholders for manual analysis

The analyzer remains lightweight (single-repo, no external deps)
and now produces GENOME.md conformant to the canonical template.

Closes #666.
2026-04-29 23:57:35 -04:00
1fa6c3bad1 fix(#793): Add What Honesty Requires, implement source distinction (#962)
Some checks failed
Self-Healing Smoke / self-healing-smoke (push) Failing after 25s
Smoke Test / smoke (push) Failing after 18s
Co-authored-by: Timmy Time <timmy@alexanderwhitestone.ai>
Co-committed-by: Timmy Time <timmy@alexanderwhitestone.ai>
2026-04-29 12:09:27 +00:00
9 changed files with 507 additions and 590 deletions

View File

@@ -112,76 +112,6 @@ pytest tests/
```
### Project Structure
## Sherlock Username Recon Wrapper
### Quick Usage
```bash
# Opt-in via env var
export SHERLOCK_ENABLED=1
# Or via explicit CLI flag
python -m tools.sherlock_wrapper --query "alice" --opt-in --json
# With site whitelist
python -m tools.sherlock_wrapper --query "alice" --opt-in --sites github twitter --json
```
### What It Does
Builds a bounded local wrapper around the Sherlock username OSINT tool that:
- **Opt-in gate** — SHERLOCK_ENABLED=1 or `--opt-in` required before any external call
- **Local-first caching** — results cached in `~/.cache/timmy/sherlock_cache.db` (TTL: 7 days)
- **Normalized JSON** — stable schema with `found`, `missing`, `errors`, and `metadata` sections
- **No network egress** — only makes outbound HTTP to target sites through sherlock; never phones home
### Output Schema
```json
{
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T14:23:00+00:00",
"found": [
{"site": "github", "url": "https://github.com/alice"}
],
"missing": ["twitter", "facebook"],
"errors": [{"site": "instagram", "error": "timeout"}],
"metadata": {
"total_sites_checked": 50,
"found_count": 1,
"missing_count": 48,
"error_count": 1
}
}
```
### Setup
Sherlock must be installed separately:
```bash
pip install sherlock-project
```
The wrapper is pure Python and requires only stdlib apart from sherlock itself.
### Why an Opt-In Gate?
Sherlock makes outbound HTTP requests to dozens of third-party sites. The opt-in gate:
1. Ensures a human operator explicitly approves this dependency
2. Makes the outbound traffic auditable in session logs
3. Prevents accidental invocation in automated pipelines
### Running the Smoke Test
```bash
# Run unit + integration tests
pytest tests/test_sherlock_wrapper.py -v
```
```
.

20
SOUL.md
View File

@@ -137,6 +137,26 @@ The inscription predates any given model. It will outlast every API.
---
## What Honesty Requires
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
The following are not optional. Any implementation that calls itself Timmy must build them.
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
---
## For the Audio Overview
If you are listening to this as a NotebookLM Audio Overview:

View File

@@ -1,20 +1,12 @@
#!/usr/bin/env python3
"""
genome_analyzer.py — Generate a GENOME.md from a codebase.
genome_analyzer.py — Generate a GENOME.md from a codebase using the canonical template.
Scans a repository and produces a structured codebase genome with:
- File counts by type
- Architecture overview (directory structure)
- Entry points
- Test coverage summary
Scans a repository and fills in templates/GENOME-template.md with discovered
structure, entry points, and test coverage. Manual analysis sections are
preserved with "(To be completed...)" placeholders.
Usage:
python3 scripts/genome_analyzer.py /path/to/repo
python3 scripts/genome_analyzer.py /path/to/repo --output GENOME.md
python3 scripts/genome_analyzer.py /path/to/repo --dry-run
Part of #666: GENOME.md Template + Single-Repo Analyzer.
"""
Part of #666: GENOME.md Template + Single-Repo Analyzer."""
import argparse
import sys
@@ -23,25 +15,32 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
SKIP_DIRS = {".git", "__pycache__", ".venv", "venv", "node_modules", ".tox", ".pytest_cache", ".DS_Store"}
SKIP_DIRS = {".git", "__pycache__", ".venv", "venv", "node_modules",
".tox", ".pytest_cache", ".DS_Store", "dist", "build", "coverage"}
def _is_source(p: Path) -> bool:
return p.suffix in {".py", ".js", ".ts", ".mjs", ".cjs", ".jsx",
".tsx", ".sh"} and not p.name.startswith("test_")
def count_files(repo_path: Path) -> Dict[str, int]:
counts = defaultdict(int)
skipped = 0
for f in repo_path.rglob("*"):
if any(part in SKIP_DIRS for part in f.parts):
continue
if f.is_file():
if any(part in SKIP_DIRS for part in f.parts):
continue
ext = f.suffix or "(no ext)"
counts[ext] += 1
return dict(sorted(counts.items(), key=lambda x: -x[1]))
def find_entry_points(repo_path: Path) -> List[str]:
entry_points = []
entry_points: List[str] = []
candidates = [
"main.py", "app.py", "server.py", "cli.py", "manage.py",
"index.html", "index.js", "index.ts",
"__main__.py", "index.html", "index.js", "index.ts",
"Makefile", "Dockerfile", "docker-compose.yml",
"README.md", "deploy.sh", "setup.py", "pyproject.toml",
]
@@ -53,27 +52,46 @@ def find_entry_points(repo_path: Path) -> List[str]:
for f in sorted(scripts_dir.iterdir()):
if f.suffix in (".py", ".sh") and not f.name.startswith("test_"):
entry_points.append(f"scripts/{f.name}")
return entry_points[:15]
src_dir = repo_path / "src"
if src_dir.is_dir():
for f in sorted(src_dir.iterdir()):
if f.is_file() and f.suffix == ".py" and not f.name.startswith("test_"):
entry_points.append(f"src/{f.name}")
top_py = [f.name for f in repo_path.iterdir()
if f.is_file() and f.suffix == ".py" and _is_source(f)]
entry_points.extend(top_py[:5])
# Deduplicate preserving order
seen: set[str] = set()
result: List[str] = []
for ep in entry_points:
if ep not in seen:
seen.add(ep)
result.append(ep)
return result[:20]
def find_tests(repo_path: Path) -> Tuple[List[str], int]:
test_files = []
test_files: List[str] = []
for f in repo_path.rglob("*"):
if any(part in SKIP_DIRS for part in f.parts):
continue
if f.is_file() and (f.name.startswith("test_") or f.name.endswith("_test.py") or f.name.endswith("_test.js")):
test_files.append(str(f.relative_to(repo_path)))
if f.is_file():
if any(part in SKIP_DIRS for part in f.parts):
continue
name = f.name
if name.startswith("test_") or name.endswith("_test.py") or name.endswith(".test.js"):
test_files.append(str(f.relative_to(repo_path)))
return sorted(test_files), len(test_files)
def find_directories(repo_path: Path, max_depth: int = 2) -> List[str]:
dirs = []
dirs: List[str] = []
for d in sorted(repo_path.rglob("*")):
if d.is_dir() and len(d.relative_to(repo_path).parts) <= max_depth:
if not any(part in SKIP_DIRS for part in d.parts):
rel = str(d.relative_to(repo_path))
if rel != ".":
dirs.append(rel)
if d.is_dir():
depth = len(d.relative_to(repo_path).parts)
if depth <= max_depth:
if not any(part in SKIP_DIRS for part in d.parts):
rel = str(d.relative_to(repo_path))
if rel != "." and rel not in dirs:
dirs.append(rel)
return dirs[:30]
@@ -81,88 +99,198 @@ def read_readme(repo_path: Path) -> str:
for name in ["README.md", "README.rst", "README.txt", "README"]:
readme = repo_path / name
if readme.exists():
lines = readme.read_text(encoding="utf-8", errors="replace").split("\n")
para = []
started = False
for line in lines:
if line.startswith("#") and not started:
text = readme.read_text(encoding="utf-8", errors="replace")
paras: List[str] = []
for line in text.splitlines():
stripped = line.strip()
if stripped.startswith("#"):
continue
if line.strip():
started = True
para.append(line.strip())
elif started:
if stripped:
paras.append(stripped)
elif paras:
break
return " ".join(para[:5])
return " ".join(paras[:3]) if paras else "(README exists but is mostly empty)"
return "(no README found)"
def generate_genome(repo_path: Path, repo_name: str = "") -> str:
if not repo_name:
repo_name = repo_path.name
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
readme_desc = read_readme(repo_path)
file_counts = count_files(repo_path)
total_files = sum(file_counts.values())
entry_points = find_entry_points(repo_path)
test_files, test_count = find_tests(repo_path)
dirs = find_directories(repo_path)
lines = [
f"# GENOME.md — {repo_name}", "",
f"> Codebase analysis generated {date}. {readme_desc[:100]}.", "",
"## Project Overview", "",
readme_desc, "",
f"**{total_files} files** across {len(file_counts)} file types.", "",
"## Architecture", "",
"```",
]
for d in dirs[:20]:
lines.append(f" {d}/")
lines.append("```")
lines += ["", "### File Types", "", "| Type | Count |", "|------|-------|"]
for ext, count in list(file_counts.items())[:15]:
lines.append(f"| {ext} | {count} |")
lines += ["", "## Entry Points", ""]
for ep in entry_points:
lines.append(f"- `{ep}`")
lines += ["", "## Test Coverage", "", f"**{test_count} test files** found.", ""]
if test_files:
for tf in test_files[:10]:
lines.append(f"- `{tf}`")
if len(test_files) > 10:
lines.append(f"- ... and {len(test_files) - 10} more")
else:
lines.append("No test files found.")
lines += ["", "## Security Considerations", "", "(To be filled during analysis)", ""]
lines += ["## Design Decisions", "", "(To be filled during analysis)", ""]
def _mermaid_diagram(repo_name: str, dirs: List[str], entry_points: List[str]) -> str:
lines = ["graph TD", f' root["{repo_name} (repo root)"]']
for d in dirs[:15]:
safe = d.replace("/", "_").replace("-", "_")
lines.append(f' root --> {safe}["{d}/"]')
lines.append("")
lines.append(" %% Entry points (leaf nodes)")
for ep in entry_points[:10]:
safe_ep = ep.replace("/", "_").replace(".", "_").replace("-", "_")
parent = ep.split("/")[0] if "/" in ep else "root"
parent_safe = parent.replace("/", "_").replace("-", "_")
lines.append(f' {parent_safe} --> {safe_ep}["{ep}"]')
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Generate GENOME.md from a codebase")
parser.add_argument("repo_path", help="Path to repository")
parser.add_argument("--output", default="", help="Output file (default: stdout)")
parser.add_argument("--name", default="", help="Repository name")
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
def _bullet_list(items: List[str]) -> str:
if not items:
return "(none discovered)"
return "\n".join(f"- `{item}`" for item in items[:20])
def _comma_list(items: List[str]) -> str:
return ", ".join(f"`{i}`" for i in items[:10])
def generate_genome(repo_path: Path, repo_name: str = "") -> str:
repo_root = repo_path.resolve()
if not repo_name:
repo_name = repo_path.name
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
readme_desc = read_readme(repo_root)
short_desc = readme_desc[:120] + "" if len(readme_desc) > 120 else readme_desc
file_counts = count_files(repo_root)
total_files = sum(file_counts.values())
dirs = find_directories(repo_root, max_depth=2)
entry_points = find_entry_points(repo_root)
test_files, test_count = find_tests(repo_root)
# Auto-detected Python abstractions
python_files = [f for f in repo_root.rglob("*.py")
if f.is_file() and not any(p in SKIP_DIRS for p in f.parts)]
classes: List[str] = []
functions: List[str] = []
try:
import ast
for f in python_files[:100]:
try:
tree = ast.parse(f.read_text(encoding="utf-8", errors="replace"))
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.append(f"{f.relative_to(repo_root)}::{node.name}")
elif isinstance(node, ast.FunctionDef) and not node.name.startswith("_"):
qual = f"{f.relative_to(repo_root)}::{node.name}"
functions.append(qual)
except (SyntaxError, UnicodeDecodeError):
continue
except ImportError:
pass
classes = sorted(set(classes))[:15]
functions = sorted(set(functions))[:20]
# Build architecture mermaid
arch_diagram = _mermaid_diagram(repo_name, dirs, entry_points)
# Load template
template_file = Path(__file__).resolve().parent.parent / "templates" / "GENOME-template.md"
if template_file.exists():
template_text = template_file.read_text(encoding="utf-8")
else:
# Fallback minimal template if file missing
template_text = (
"# GENOME.md — {REPO_NAME}\n\n"
"> Codebase analysis generated {DATE}. {SHORT_DESCRIPTION}.\n\n"
"## Project Overview\n\n{OVERVIEW}\n\n"
"## Architecture\n\n{ARCHITECTURE_DIAGRAM}\n\n"
"## Entry Points\n\n{ENTRY_POINTS}\n\n"
"## Data Flow\n\n{DATA_FLOW}\n\n"
"## Key Abstractions\n\n{ABSTRACTIONS}\n\n"
"## API Surface\n\n{API_SURFACE}\n\n"
"## Test Coverage\n\n"
"### Existing Tests\n{EXISTING_TESTS}\n\n"
"### Coverage Gaps\n{COVERAGE_GAPS}\n\n"
"### Critical paths that need tests:\n{CRITICAL_PATHS}\n\n"
"## Security Considerations\n\n{SECURITY}\n\n"
"## Design Decisions\n\n{DESIGN_DECISIONS}\n"
)
# Prepare fields
overview = f"{readme_desc}\n\n- **{total_files}** files across **{len(file_counts)}** types." + (
f"\n- Primary languages: {_comma_list([f'{k}:{v}' for k,v in list(file_counts.items())[:5]])}."
)
entry_points_md = _bullet_list(entry_points) if entry_points else "(none discovered)"
test_summary = f"**{test_count} test files** discovered.\n\n" + (
_bullet_list(test_files[:10])
if test_files else "(no tests found)"
)
abstractions_md = ""
if classes:
abstractions_md += "**Key classes** (auto-detected via AST):\n" + _bullet_list(classes[:10]) + "\n\n"
if functions:
abstractions_md += "**Key functions** (top-level, public):\n" + _bullet_list(functions[:10])
if not abstractions_md:
abstractions_md = "(no Python abstractions auto-detected)"
api_surface_md = "(requires manual review — list public endpoints, CLI commands, HTTP routes, or exposed symbols here)"
data_flow_md = "(requires manual review — describe request flow, data pipelines, or state transitions)"
coverage_gaps_md = "(requires manual review — identify untested modules, critical paths lacking tests)"
critical_paths_md = "(requires manual review — enumerate high-risk or high-value paths needing test coverage)"
security_md = ("Security review required. Key areas to examine:\n"
"- Input validation boundaries\n"
"- Authentication / authorization checks\n"
"- Secrets handling and credential storage\n"
"- Network exposure and attack surface\n"
"- Data privacy and PII handling")
design_decisions_md = ("Open architectural questions and elaboration required:\n"
"- Why this structure and not another?\n"
"- What constraints shaped current abstractions?\n"
"- What trade-offs were accepted and why?\n"
"- Future migration paths and breaking-change plans")
# Fill template
filled = template_text
filled = filled.replace("{{REPO_NAME}}", repo_name)
filled = filled.replace("{{DATE}}", date)
filled = filled.replace("{{SHORT_DESCRIPTION}}", short_desc)
filled = filled.replace("{{OVERVIEW}}", overview)
filled = filled.replace("{{ARCHITECTURE_DIAGRAM}}", arch_diagram)
filled = filled.replace("{{ENTRY_POINTS}}", entry_points_md)
filled = filled.replace("{{DATA_FLOW}}", data_flow_md)
filled = filled.replace("{{ABSTRACTIONS}}", abstractions_md)
filled = filled.replace("{{API_SURFACE}}", api_surface_md)
filled = filled.replace("{{EXISTING_TESTS}}", test_summary)
filled = filled.replace("{{COVERAGE_GAPS}}", coverage_gaps_md)
filled = filled.replace("{{CRITICAL_PATHS}}", critical_paths_md)
filled = filled.replace("{{SECURITY}}", security_md)
filled = filled.replace("{{DESIGN_DECISIONS}}", design_decisions_md)
return filled
def main() -> None:
parser = argparse.ArgumentParser(description="Generate GENOME.md from a codebase using the canonical template")
parser.add_argument("repo_path", help="Path to repository root")
parser.add_argument("--output", "-o", default="", help="Write GENOME.md to this path (default: stdout)")
parser.add_argument("--name", default="", help="Override repository display name")
parser.add_argument("--dry-run", action="store_true", help="Print discovered stats without generating file")
args = parser.parse_args()
repo_path = Path(args.repo_path).resolve()
if not repo_path.is_dir():
print(f"ERROR: {repo_path} is not a directory", file=sys.stderr)
sys.exit(1)
repo_name = args.name or repo_path.name
if args.dry_run:
counts = count_files(repo_path)
_, test_count = find_tests(repo_path)
print(f"Repo: {repo_name}")
print(f"Total files: {sum(counts.values())}")
print(f"Total files (text): {sum(counts.values())}")
print(f"Test files: {test_count}")
print(f"Top types: {', '.join(f'{k}={v}' for k,v in list(counts.items())[:5])}")
sys.exit(0)
genome = generate_genome(repo_path, repo_name)
if args.output:
with open(args.output, "w") as f:
f.write(genome)
print(f"Written: {args.output}")
out = Path(args.output)
out.write_text(genome, encoding="utf-8")
print(f"GENOME.md written: {out}")
else:
print(genome)

View File

@@ -1 +1,12 @@
# Timmy core module
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
from .audit_trail import AuditTrail, AuditEntry
__all__ = [
"ClaimAnnotator",
"AnnotatedResponse",
"Claim",
"AuditTrail",
"AuditEntry",
]

View File

@@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""
Response Claim Annotator — Source Distinction System
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
a verified source I can point to, or my own pattern-matching. My user must be
able to tell which is which."
"""
import re
import json
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict
@dataclass
class Claim:
"""A single claim in a response, annotated with source type."""
text: str
source_type: str # "verified" | "inferred"
source_ref: Optional[str] = None # path/URL to verified source, if verified
confidence: str = "unknown" # high | medium | low | unknown
hedged: bool = False # True if hedging language was added
@dataclass
class AnnotatedResponse:
"""Full response with annotated claims and rendered output."""
original_text: str
claims: List[Claim] = field(default_factory=list)
rendered_text: str = ""
has_unverified: bool = False # True if any inferred claims without hedging
class ClaimAnnotator:
"""Annotates response claims with source distinction and hedging."""
# Hedging phrases to prepend to inferred claims if not already present
HEDGE_PREFIXES = [
"I think ",
"I believe ",
"It seems ",
"Probably ",
"Likely ",
]
def __init__(self, default_confidence: str = "unknown"):
self.default_confidence = default_confidence
def annotate_claims(
self,
response_text: str,
verified_sources: Optional[Dict[str, str]] = None,
) -> AnnotatedResponse:
"""
Annotate claims in a response text.
Args:
response_text: Raw response from the model
verified_sources: Dict mapping claim substrings to source references
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
Returns:
AnnotatedResponse with claims marked and rendered text
"""
verified_sources = verified_sources or {}
claims = []
has_unverified = False
# Simple sentence splitting (naive, but sufficient for MVP)
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
for sent in sentences:
# Check if sentence is a claim we can verify
matched_source = None
for claim_substr, source_ref in verified_sources.items():
if claim_substr.lower() in sent.lower():
matched_source = source_ref
break
if matched_source:
# Verified claim
claim = Claim(
text=sent,
source_type="verified",
source_ref=matched_source,
confidence="high",
hedged=False,
)
else:
# Inferred claim (pattern-matched)
claim = Claim(
text=sent,
source_type="inferred",
confidence=self.default_confidence,
hedged=self._has_hedge(sent),
)
if not claim.hedged:
has_unverified = True
claims.append(claim)
# Render the annotated response
rendered = self._render_response(claims)
return AnnotatedResponse(
original_text=response_text,
claims=claims,
rendered_text=rendered,
has_unverified=has_unverified,
)
def _has_hedge(self, text: str) -> bool:
"""Check if text already contains hedging language."""
text_lower = text.lower()
for prefix in self.HEDGE_PREFIXES:
if text_lower.startswith(prefix.lower()):
return True
# Also check for inline hedges
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
return any(word in text_lower for word in hedge_words)
def _render_response(self, claims: List[Claim]) -> str:
"""
Render response with source distinction markers.
Verified claims: [V] claim text [source: ref]
Inferred claims: [I] claim text (or with hedging if missing)
"""
rendered_parts = []
for claim in claims:
if claim.source_type == "verified":
part = f"[V] {claim.text}"
if claim.source_ref:
part += f" [source: {claim.source_ref}]"
else: # inferred
if not claim.hedged:
# Add hedging if missing
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
part = f"[I] {hedged_text}"
else:
part = f"[I] {claim.text}"
rendered_parts.append(part)
return " ".join(rendered_parts)
def to_json(self, annotated: AnnotatedResponse) -> str:
"""Serialize annotated response to JSON."""
return json.dumps(
{
"original_text": annotated.original_text,
"rendered_text": annotated.rendered_text,
"has_unverified": annotated.has_unverified,
"claims": [asdict(c) for c in annotated.claims],
},
indent=2,
ensure_ascii=False,
)

View File

@@ -1,182 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for sherlock_wrapper — validates schema, caching, opt-in gate,
and error handling without requiring sherlock to be installed.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
from sherlock_wrapper import (
compute_query_hash,
normalize_sherlock_output,
require_opt_in,
check_sherlock_available,
get_cache_connection,
save_to_cache,
get_cached_result,
)
class TestSherlockWrapperSmoke(unittest.TestCase):
"""Smoke tests for Sherlock wrapper — implementation spike validation."""
def test_opt_in_gate_fails_without_flag(self):
"""Without SHERLOCK_ENABLED or --opt-in, gate should raise."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
with self.assertRaises(RuntimeError) as ctx:
require_opt_in(opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_opt_in_gate_succeeds_with_env(self):
"""SHERLOCK_ENABLED=1 bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", True):
require_opt_in(opt_in=False) # Should not raise
def test_opt_in_gate_succeeds_with_flag(self):
"""--opt-in flag bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
require_opt_in(opt_in=True) # Should not raise
def test_query_hash_deterministic(self):
"""Same input produces same hash."""
h1 = compute_query_hash("alice")
h2 = compute_query_hash("alice")
self.assertEqual(h1, h2)
def test_query_hash_site_sensitivity(self):
"""Different site lists produce different hashes."""
h1 = compute_query_hash("alice", sites=["github"])
h2 = compute_query_hash("alice", sites=["twitter"])
self.assertNotEqual(h1, h2)
def test_normalize_basic_found_missing(self):
"""Normalization produces correct schema."""
raw = {
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
"instagram": {"status": "error", "error_detail": "timeout"},
}
normalized = normalize_sherlock_output(raw, "alice")
self.assertEqual(normalized["query"], "alice")
self.assertEqual(normalized["metadata"]["found_count"], 1)
self.assertEqual(normalized["metadata"]["missing_count"], 1)
self.assertEqual(normalized["metadata"]["error_count"], 1)
self.assertEqual(len(normalized["found"]), 1)
self.assertEqual(normalized["found"][0]["site"], "github")
self.assertIn("twitter", normalized["missing"])
self.assertEqual(normalized["errors"][0]["site"], "instagram")
def test_normalized_schema_has_required_fields(self):
"""Output schema contains all required top-level keys."""
raw = {"site1": {"status": "not found"}}
normalized = normalize_sherlock_output(raw, "testuser")
required = ["schema_version", "query", "timestamp", "found", "missing",
"errors", "metadata"]
for key in required:
self.assertIn(key, normalized)
self.assertIsInstance(normalized["timestamp"], str)
self.assertIsInstance(normalized["found"], list)
self.assertIsInstance(normalized["missing"], list)
self.assertIsInstance(normalized["errors"], list)
self.assertIsInstance(normalized["metadata"], dict)
def test_cache_roundtrip(self):
"""Result can be written and read back from cache."""
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
test_result = {
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T00:00:00+00:00",
"found": [],
"missing": ["github"],
"errors": [],
"metadata": {"total_sites_checked": 1, "found_count": 0, "missing_count": 1, "error_count": 0},
}
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, test_result)
retrieved = get_cached_result(query_hash)
self.assertEqual(retrieved, test_result)
def test_cache_miss_on_stale(self):
"""Cache returns None when entry is older than 7 days."""
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "cache.db"
with patch("sherlock_wrapper.CACHE_DB", db_path):
old_ts = "2025-04-01T00:00:00+00:00"
old_result = {
"schema_version": "1.0", "query": "alice",
"timestamp": old_ts, "found": [], "missing": [], "errors": [],
"metadata": {"total_sites_checked": 0, "found_count": 0, "missing_count": 0, "error_count": 0},
}
query_hash = compute_query_hash("alice")
# Direct DB insert with controlled timestamp (bypass save_to_cache's NOW)
conn = get_cache_connection()
conn.execute(
"INSERT INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(old_result), old_ts)
)
conn.commit()
retrieved = get_cached_result(query_hash)
self.assertIsNone(retrieved)
def test_sherlock_available_check(self):
"""check_sherlock_available returns bool."""
available = check_sherlock_available()
self.assertIsInstance(available, bool)
# Note: on this test system sherlock may not be installed, so False is expected.
# The important thing is the function returns a bool.
print(f"[INFO] Sherlock installed: {available}")
class TestSherlockWrapperIntegration(unittest.TestCase):
"""Integration tests with mocked sherlock module."""
def test_run_sherlock_with_opt_in(self):
"""run_sherlock succeeds with opt-in and returns normalized result."""
fake_sherlock = MagicMock()
fake_sherlock.sherlock = MagicMock(return_value={
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
})
with patch.dict("sys.modules", {"sherlock": fake_sherlock}):
import importlib
import sherlock_wrapper
importlib.reload(sherlock_wrapper)
with patch.dict(os.environ, {"SHERLOCK_ENABLED": "1"}):
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result["query"], "alice")
self.assertEqual(result["metadata"]["found_count"], 1)
def test_run_sherlock_fails_without_opt_in(self):
"""run_sherlock raises RuntimeError without opt-in."""
from sherlock_wrapper import run_sherlock
with self.assertRaises(RuntimeError) as ctx:
run_sherlock("alice", opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_run_sherlock_uses_cache(self):
"""Cached result short-circuits sherlock execution."""
cached = {
"schema_version": "1.0", "query": "alice", "timestamp": "2025-04-26T00:00:00+00:00",
"found": [{"site": "github", "url": "https://github.com/alice"}],
"missing": ["twitter"],
"errors": [],
"metadata": {"total_sites_checked": 2, "found_count": 1, "missing_count": 1, "error_count": 0},
}
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, cached)
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result, cached)

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Tests for claim_annotator.py — verifies source distinction is present."""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
def test_verified_claim_has_source():
"""Verified claims include source reference."""
annotator = ClaimAnnotator()
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
response = "Paris is the capital of France. It is a beautiful city."
result = annotator.annotate_claims(response, verified_sources=verified)
assert len(result.claims) > 0
verified_claims = [c for c in result.claims if c.source_type == "verified"]
assert len(verified_claims) == 1
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
assert "[V]" in result.rendered_text
assert "[source:" in result.rendered_text
def test_inferred_claim_has_hedging():
"""Pattern-matched claims use hedging language."""
annotator = ClaimAnnotator()
response = "The weather is nice today. It might rain tomorrow."
result = annotator.annotate_claims(response)
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
assert len(inferred_claims) >= 1
# Check that rendered text has [I] marker
assert "[I]" in result.rendered_text
# Check that unhedged inferred claims get hedging
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
def test_hedged_claim_not_double_hedged():
"""Claims already with hedging are not double-hedged."""
annotator = ClaimAnnotator()
response = "I think the sky is blue. It is a nice day."
result = annotator.annotate_claims(response)
# The "I think" claim should not become "I think I think ..."
assert "I think I think" not in result.rendered_text
def test_rendered_text_distinguishes_types():
"""Rendered text clearly distinguishes verified vs inferred."""
annotator = ClaimAnnotator()
verified = {"Earth is round": "https://science.org/earth"}
response = "Earth is round. Stars are far away."
result = annotator.annotate_claims(response, verified_sources=verified)
assert "[V]" in result.rendered_text # verified marker
assert "[I]" in result.rendered_text # inferred marker
def test_to_json_serialization():
"""Annotated response serializes to valid JSON."""
annotator = ClaimAnnotator()
response = "Test claim."
result = annotator.annotate_claims(response)
json_str = annotator.to_json(result)
parsed = json.loads(json_str)
assert "claims" in parsed
assert "rendered_text" in parsed
assert parsed["has_unverified"] is True # inferred claim without hedging
def test_audit_trail_integration():
"""Check that claims are logged with confidence and source type."""
# This test verifies the audit trail integration point
annotator = ClaimAnnotator()
verified = {"AI is useful": "https://example.com/ai"}
response = "AI is useful. It can help with tasks."
result = annotator.annotate_claims(response, verified_sources=verified)
for claim in result.claims:
assert claim.source_type in ("verified", "inferred")
assert claim.confidence in ("high", "medium", "low", "unknown")
if claim.source_type == "verified":
assert claim.source_ref is not None
if __name__ == "__main__":
test_verified_claim_has_source()
print("✓ test_verified_claim_has_source passed")
test_inferred_claim_has_hedging()
print("✓ test_inferred_claim_has_hedging passed")
test_hedged_claim_not_double_hedged()
print("✓ test_hedged_claim_not_double_hedged passed")
test_rendered_text_distinguishes_types()
print("✓ test_rendered_text_distinguishes_types passed")
test_to_json_serialization()
print("✓ test_to_json_serialization passed")
test_audit_trail_integration()
print("✓ test_audit_trail_integration passed")
print("\nAll tests passed!")

View File

View File

@@ -1,249 +0,0 @@
#!/usr/bin/env python3
"""
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
This is an implementation spike (issue #874) to validate local integration
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
"""
import argparse
import hashlib
import json
import os
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
# Cache location
CACHE_DIR = Path.home() / ".cache" / "timmy"
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
# Normalized output schema version
SCHEMA_VERSION = "1.0"
def require_opt_in(opt_in: bool = False) -> None:
"""Enforce opt-in gate for Sherlock external dependency."""
if not (SHERLOCK_ENABLED or opt_in):
raise RuntimeError(
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
)
def check_sherlock_available() -> bool:
"""Check if sherlock Python package is installed."""
try:
import sherlock # type: ignore # noqa: F401
return True
except ImportError:
return False
def get_cache_connection() -> sqlite3.Connection:
"""Initialize cache directory and return DB connection."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(CACHE_DB))
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
query_hash TEXT PRIMARY KEY,
result_json TEXT NOT NULL,
timestamp DATETIME NOT NULL
)
""")
return conn
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
"""Deterministic hash for cache key."""
components = [username.lower().strip()]
if sites:
components.extend(sorted(sites))
raw = "|".join(components)
return hashlib.sha256(raw.encode()).hexdigest()
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
conn = get_cache_connection()
cur = conn.execute(
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
(query_hash,)
)
row = cur.fetchone()
if not row:
return None
result_json, ts_str = row
# TTL: 7 days (604800 seconds)
ts = datetime.fromisoformat(ts_str)
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
if age_seconds >= 604800:
return None
return json.loads(result_json)
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
"""Persist result to cache."""
conn = get_cache_connection()
conn.execute(
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
)
conn.commit()
conn.close()
def normalize_sherlock_output(
raw_result: Dict[str, Any],
username: str,
sites_checked: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Convert raw sherlock output into a stable, normalized schema.
Expected sherlock result shape (via Python API):
{
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
...
}
"""
found: List[Dict[str, str]] = []
missing: List[str] = []
errors: List[Dict[str, str]] = []
for site_name, site_data in raw_result.items():
status = site_data.get("status", "")
url = site_data.get("url", "")
if status == "found" and url:
found.append({"site": site_name, "url": url})
elif status == "not found":
missing.append(site_name)
else:
errors.append({"site": site_name, "error": status or "unknown"})
# Compute totals from the original site list if provided
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
return {
"schema_version": SCHEMA_VERSION,
"query": username,
"timestamp": datetime.now(timezone.utc).isoformat(),
"found": found,
"missing": missing,
"errors": errors,
"metadata": {
"total_sites_checked": total_sites,
"found_count": len(found),
"missing_count": len(missing),
"error_count": len(errors),
},
}
def run_sherlock(
username: str,
sites: Optional[List[str]] = None,
timeout: Optional[int] = None,
opt_in: bool = False
) -> Dict[str, Any]:
"""
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
"""
require_opt_in(opt_in)
# Compute cache key
query_hash = compute_query_hash(username, sites)
# Check cache first — avoids dependency requirement on cache hit
cached = get_cached_result(query_hash)
if cached is not None:
return cached
# Only require sherlock on cache miss
if not check_sherlock_available():
raise RuntimeError(
"Sherlock Python package not installed. "
"Install with: pip install sherlock-project"
)
# Call sherlock
try:
import sherlock
from sherlock import sherlock as sherlock_main # type: ignore
if sites:
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
else:
result = sherlock_main(username, timeout=timeout or 10)
normalized = normalize_sherlock_output(result, username, sites)
save_to_cache(query_hash, normalized)
return normalized
except Exception as e:
raise RuntimeError(f"Sherlock execution failed: {e}") from e
def main() -> int:
parser = argparse.ArgumentParser(
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
)
parser.add_argument(
"--query", "-q", required=True,
help="Username to search across sites"
)
parser.add_argument(
"--opt-in", action="store_true",
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
)
parser.add_argument(
"--sites", "-s", nargs="+",
help="Specific sites to check (default: all supported)"
)
parser.add_argument(
"--timeout", "-t", type=int, default=10,
help="Request timeout per site (default: 10)"
)
parser.add_argument(
"--json", action="store_true",
help="Output normalized JSON to stdout"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass cached result (if any)"
)
args = parser.parse_args()
try:
result = run_sherlock(
username=args.query,
sites=args.sites,
timeout=args.timeout,
opt_in=args.opt_in
)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Query: {result['query']}")
print(f"Found: {result['metadata']['found_count']} site(s)")
print(f"Missing: {result['metadata']['missing_count']} site(s)")
print(f"Errors: {result['metadata']['error_count']} site(s)")
for f in result['found']:
print(f" [{f['site']}] {f['url']}")
return 0
except RuntimeError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())