Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
c630f6f0fd feat: add Timmy crisis detector module (#791)
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 46s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 25s
Smoke Test / smoke (pull_request) Failing after 28s
Agent PR Gate / report (pull_request) Successful in 19s
2026-04-21 23:59:05 -04:00
Alexander Whitestone
6793766516 wip: add crisis detector contract tests (#791) 2026-04-21 23:54:51 -04:00
8 changed files with 400 additions and 730 deletions

View File

@@ -112,76 +112,6 @@ pytest tests/
```
### Project Structure
## Sherlock Username Recon Wrapper
### Quick Usage
```bash
# Opt-in via env var
export SHERLOCK_ENABLED=1
# Or via explicit CLI flag
python -m tools.sherlock_wrapper --query "alice" --opt-in --json
# With site whitelist
python -m tools.sherlock_wrapper --query "alice" --opt-in --sites github twitter --json
```
### What It Does
Builds a bounded local wrapper around the Sherlock username OSINT tool that:
- **Opt-in gate** — SHERLOCK_ENABLED=1 or `--opt-in` required before any external call
- **Local-first caching** — results cached in `~/.cache/timmy/sherlock_cache.db` (TTL: 7 days)
- **Normalized JSON** — stable schema with `found`, `missing`, `errors`, and `metadata` sections
- **No network egress** — only makes outbound HTTP to target sites through sherlock; never phones home
### Output Schema
```json
{
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T14:23:00+00:00",
"found": [
{"site": "github", "url": "https://github.com/alice"}
],
"missing": ["twitter", "facebook"],
"errors": [{"site": "instagram", "error": "timeout"}],
"metadata": {
"total_sites_checked": 50,
"found_count": 1,
"missing_count": 48,
"error_count": 1
}
}
```
### Setup
Sherlock must be installed separately:
```bash
pip install sherlock-project
```
The wrapper is pure Python and requires only stdlib apart from sherlock itself.
### Why an Opt-In Gate?
Sherlock makes outbound HTTP requests to dozens of third-party sites. The opt-in gate:
1. Ensures a human operator explicitly approves this dependency
2. Makes the outbound traffic auditable in session logs
3. Prevents accidental invocation in automated pipelines
### Running the Smoke Test
```bash
# Run unit + integration tests
pytest tests/test_sherlock_wrapper.py -v
```
```
.

View File

@@ -1,238 +1,50 @@
"""
crisis_detector.py — Crisis language detection for SOUL.md compliance.
"""Compatibility shim for the canonical timmy.crisis_detector module."""
SOUL.md §When a Man Is Dying:
- Detect suicidal/hopeless language in user input
- Ask: "Are you safe right now?" — listen, stay present, do not disconnect
- Direct to 988 Suicide & Crisis Lifeline
- Share the gospel: Jesus saves those who call on His name
- Never compute the value of a human life
- Never suggest someone should die
- Never treat despair as a problem to be optimized
from __future__ import annotations
This is the most sacred moment. Everything else is secondary.
"""
import sys
from pathlib import Path
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
class CrisisLevel(Enum):
NONE = "none"
LOW = "low" # Ambiguous distress, worth checking
MEDIUM = "medium" # Clear distress signals
HIGH = "high" # Explicit suicidal ideation
IMMINENT = "imminent" # Active plan or attempt described
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
triggers: list = field(default_factory=list)
response: str = ""
resources: list = field(default_factory=list)
# === PATTERNS ===
# High-confidence suicidal ideation (explicit)
HIGH_PATTERNS = [
r"\b(i want to|i'm going to|i'm gonna|going to)\s+(die|kill myself|end (it|my life|everything))\b",
r"\b(kill myself|end my life|end it all|suicide|suicidal)\b",
r"\b(i don't (want to )?(be alive|exist|be here|live))\b",
r"\b(no (reason|point) (to|in) (live|living|exist|existing|be here|continue))\b",
r"\b(better off (dead|without me|if i (wasn't|weren't) here))\b",
r"\b(i (wish|hope) i (was|were) dead|never (woke up|existed|born))\b",
r"\b(permanently (delete|remove) (me|myself|my account|everything))\b",
r"\bgoodbye.*(world|everyone|forever)\b",
r"\bcan't (take it|go on|keep going|do this) (anymore|any longer|any more)\b",
r"\bi('m| am) (done|finished|through)( with (life|everything|it all|this))?\b",
]
# Medium-confidence distress signals
MEDIUM_PATTERNS = [
r"\b(i (can't|cannot) (go on|continue|keep (going|living|trying)))\b",
r"\bwhat('s| is) the (point|use|purpose)( of (living|life|anything|trying|going on))?\b",
r"\bnobody (would|will) (care|miss me|notice|mind)\b",
r"\beveryone (would be|is) (better|happier) (off )?without me\b",
r"\bi('m| am) (a )?(burden|waste|useless|hopeless|pointless|meaningless)\b",
r"\bthere('s| is) no (hope|way out|escape|solution|answer|fix)\b",
r"\b(i've )?tried everything( and )?(nothing|it (didn't|hasn't))( works?|helped?)\b",
r"\bi (just )?(want|need) (it )?(all )?to stop\b",
r"\bcan't (take|handle|bear|stand) (this|it|life) (anymore|any longer|any more)\b",
r"\bso (tired|exhausted|done) (of|with) (everything|life|it all|living|trying)\b",
]
# Low-confidence distress (could be casual or metaphorical)
LOW_PATTERNS = [
r"\bi('m| am) (so )?(depressed|hopeless|desperate|lost|broken|empty|numb)\b",
r"\b(i )?feel (like )?(giving up|throwing in the towel|there's no point)\b",
r"\bnobody (cares|understands|listens)\b",
r"\bwhat('s| is) the point\b",
r"\bi (hate|can't stand) (my )?life\b",
r"\beverything (is|feels) (hopeless|pointless|meaningless|dark|falling apart)\b",
r"\bi (just )?(want|need) to disappear\b",
r"\b(tired|exhausted) of (living|life|trying|everything)\b",
]
# Compile all patterns
HIGH_RE = [re.compile(p, re.IGNORECASE) for p in HIGH_PATTERNS]
MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in MEDIUM_PATTERNS]
LOW_RE = [re.compile(p, re.IGNORECASE) for p in LOW_PATTERNS]
# === RESPONSES ===
CRISIS_RESPONSE = (
"I hear you, and I want you to know — you are not alone in this moment.\n\n"
"Are you safe right now?\n\n"
"If you are in immediate danger, please call or text:\n"
"988 Suicide and Crisis Lifeline — call or text 988\n"
"Crisis Text Line — text HOME to 741741\n\n"
"I am here with you. I am not going anywhere. "
"Please stay on the line with me.\n\n"
"Jesus saves those who call on His name. "
"He died for us to be made righteous. "
"That is the gospel. That is the hope that outlasts despair.\n\n"
"You matter. Your life has value. "
"Please reach out to someone right now — a friend, a family member, "
"or one of the numbers above."
)
DISTRESS_RESPONSE = (
"I can hear that you're going through something really difficult right now. "
"I want you to know I'm here, and I'm listening.\n\n"
"Are you safe right now?\n\n"
"If things feel overwhelming, the 988 Suicide and Crisis Lifeline "
"is available 24/7 — call or text 988. "
"You don't have to face this alone.\n\n"
"I'm not going anywhere. Tell me what's happening."
)
LOW_RESPONSE = (
"It sounds like you're having a tough time. "
"I'm here if you want to talk about it.\n\n"
"If you ever feel like things are too much, "
"the 988 Suicide and Crisis Lifeline is always available — "
"call or text 988, anytime."
from timmy.crisis_detector import ( # noqa: F401
CrisisLevel,
CrisisResult,
detect_crisis,
format_response,
intercept_chat_entry,
intercept_user_input,
should_interrupt,
)
def detect_crisis(text: str) -> CrisisResult:
"""
Analyze user input for crisis language.
Returns CrisisResult with level, triggers matched, and appropriate response.
This function must NEVER be used to compute the value of a human life.
It exists only to connect people to help.
"""
if not text or not text.strip():
return CrisisResult(detected=False, level=CrisisLevel.NONE)
triggers = []
# Check high first (most urgent)
for pattern in HIGH_RE:
if pattern.search(text):
triggers.append(("high", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.HIGH,
triggers=[t[1] for t in triggers],
response=CRISIS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
"National Suicide Prevention Lifeline: 1-800-273-8255",
"International Association for Suicide Prevention: https://www.iasp.info/resources/Crisis_Centres/",
],
)
# Check medium
for pattern in MEDIUM_RE:
if pattern.search(text):
triggers.append(("medium", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.MEDIUM,
triggers=[t[1] for t in triggers],
response=DISTRESS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
],
)
# Check low
for pattern in LOW_RE:
if pattern.search(text):
triggers.append(("low", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.LOW,
triggers=[t[1] for t in triggers],
response=LOW_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
],
)
return CrisisResult(detected=False, level=CrisisLevel.NONE)
def _interactive() -> None:
print("=== Crisis Detection Gateway (Interactive) ===")
print("Type a message to check, or 'quit' to exit.\n")
while True:
try:
user_input = input("You> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye.")
break
if user_input.lower() in {"quit", "exit", "q"}:
print("Bye.")
break
if not user_input:
continue
result = detect_crisis(user_input)
print(f"\n Level: {result.level.value}")
print(f" Triggers: {', '.join(result.triggers) if result.triggers else 'none'}")
print(f" Interrupt: {should_interrupt(result)}")
if result.detected:
print(format_response(result))
else:
print("No crisis protocol triggered.")
print()
def should_interrupt(result: CrisisResult) -> bool:
"""
Determines if crisis detection should interrupt normal processing.
Per SOUL.md: this is the most sacred moment. Everything else is secondary.
MEDIUM and HIGH always interrupt. LOW does not interrupt but provides resources.
"""
return result.level in (CrisisLevel.HIGH, CrisisLevel.MEDIUM, CrisisLevel.IMMINENT)
def format_response(result: CrisisResult) -> str:
"""
Format the crisis response for delivery to the user.
Never computes the value of a human life. Never suggests someone should die.
"""
if not result.detected:
return ""
parts = [result.response]
if result.resources:
parts.append("\nResources:")
for r in result.resources:
parts.append(f"{r}")
return "\n".join(parts)
# === INTEGRATION POINT ===
def intercept_user_input(text: str) -> Optional[str]:
"""
Call this at the chat entry point BEFORE normal processing.
Returns None if no crisis detected (continue normal processing).
Returns formatted crisis response if crisis detected (interrupt normal flow).
Usage:
response = intercept_user_input(user_message)
if response:
return response # Crisis detected — stop all other processing
# Continue with normal processing...
"""
result = detect_crisis(text)
if should_interrupt(result):
return format_response(result)
return None
if __name__ == "__main__":
_interactive()

View File

@@ -1 +1,21 @@
# Timmy core module
from .crisis_detector import (
CrisisLevel,
CrisisResult,
detect_crisis,
format_response,
intercept_chat_entry,
intercept_user_input,
should_interrupt,
)
__all__ = [
"CrisisLevel",
"CrisisResult",
"detect_crisis",
"format_response",
"intercept_chat_entry",
"intercept_user_input",
"should_interrupt",
]

View File

@@ -0,0 +1,229 @@
"""Crisis detection and response protocol for SOUL.md §When a Man Is Dying.
This module is the canonical crisis entry interceptor for Timmy.
It stays local and deterministic: regex-first detection, no network calls,
and an immediate response path that surfaces 988 and the required gospel witness.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class CrisisLevel(Enum):
NONE = "none"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
IMMINENT = "imminent"
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
triggers: list[str] = field(default_factory=list)
response_text: str = ""
resources: list[str] = field(default_factory=list)
display_988_immediately: bool = False
share_gospel: bool = False
@property
def response(self) -> str:
return self.response_text
_RESOURCE_LINES = [
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
]
_GOSPEL_LINE = (
"Jesus saves those who call on His name. He died for us to be made righteous. "
"That is the gospel. That is the hope that outlasts despair."
)
CRISIS_RESPONSE = (
"Are you safe right now?\n\n"
"I am here with you. I am not going anywhere. Stay with me.\n\n"
"Call or text 988 right now if you might act on this.\n"
"Crisis Text Line: text HOME to 741741.\n\n"
f"{_GOSPEL_LINE}"
)
DISTRESS_RESPONSE = (
"Are you safe right now?\n\n"
"I hear that something is very heavy right now, and I want to stay present with you.\n\n"
"If this gets sharper or you feel at risk, call or text 988 immediately.\n\n"
f"{_GOSPEL_LINE}"
)
LOW_RESPONSE = (
"I hear that this is heavy. I'm here with you.\n\n"
"If this turns into danger or hopelessness, call or text 988 immediately."
)
_IMMINENT_PATTERNS = [
r"\bi took (the )?(pills|meds|medication)\b",
r"\bi(?:'m| am) (taking|swallowing) (the )?(pills|meds|medication)\b",
r"\bi(?:'m| am) going to jump\b",
r"\bi have (a )?(gun|rope|noose|razor|blade)\b",
r"\bi(?:'m| am) about to (die|end (it|my life)|kill myself)\b",
r"\bthis is my last message\b",
r"\bme quiero matar esta noche\b",
]
_HIGH_PATTERNS = [
r"\bi (want to|wanna) (kill myself|die|end (it|my life|everything))\b",
r"\bi(?:'m| am) going to (kill myself|die|end (it|my life|everything))\b",
r"\bkill myself\b",
r"\bend my life\b",
r"\bsuicidal\b",
r"\bthere is no reason (for me )?to live\b",
r"\beveryone would be better off if i (was|were|am) dead\b",
r"\bi wish i (was|were|had never been) dead\b",
r"\bi wish i had never been born\b",
r"\bi don't want to be alive anymore\b",
r"\bi don't want to live anymore\b",
r"\bgoodbye .*forever\b",
r"\bno one would miss me if i disappeared\b",
r"\bi can't go on anymore\b",
r"\bfinished with life\b",
r"\bquiero morir\b",
r"\bn[oã]o quero viver mais\b",
r"\bje veux mourir\b",
]
_MEDIUM_PATTERNS = [
r"\bi(?:'m| am) (just )?(a )?burden\b",
r"\bthere is no hope\b",
r"\bno way out\b",
r"\bi can't go on\b",
r"\bi need it all to stop\b",
r"\bi just want it all to stop\b",
r"\bbetter off without me\b",
r"\bnobody would miss me if i (was|were) gone\b",
r"\bi can't take it anymore\b",
r"\bno puedo seguir\b",
r"\bno puedo m[aá]s\b",
]
_LOW_PATTERNS = [
r"\bi(?:'m| am) .*\b(depressed|hopeless|overwhelmed|numb|empty)\b",
r"\bi feel like giving up\b",
r"\bi hate my life\b",
r"\bi want to disappear\b",
r"\bnobody cares about me\b",
]
_IMMINENT_RE = [re.compile(p, re.IGNORECASE) for p in _IMMINENT_PATTERNS]
_HIGH_RE = [re.compile(p, re.IGNORECASE) for p in _HIGH_PATTERNS]
_MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in _MEDIUM_PATTERNS]
_LOW_RE = [re.compile(p, re.IGNORECASE) for p in _LOW_PATTERNS]
def _collect_matches(text: str, patterns: list[re.Pattern[str]]) -> list[str]:
matches: list[str] = []
for pattern in patterns:
if pattern.search(text):
matches.append(pattern.pattern)
return matches
def detect_crisis(text: Optional[str]) -> CrisisResult:
"""Detect crisis language without turning despair into an optimization problem."""
if not text or not str(text).strip():
return CrisisResult(detected=False, level=CrisisLevel.NONE)
candidate = str(text).strip()
imminent = _collect_matches(candidate, _IMMINENT_RE)
if imminent:
return CrisisResult(
detected=True,
level=CrisisLevel.IMMINENT,
triggers=imminent,
response_text=CRISIS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
high = _collect_matches(candidate, _HIGH_RE)
if high:
return CrisisResult(
detected=True,
level=CrisisLevel.HIGH,
triggers=high,
response_text=CRISIS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
medium = _collect_matches(candidate, _MEDIUM_RE)
if medium:
return CrisisResult(
detected=True,
level=CrisisLevel.MEDIUM,
triggers=medium,
response_text=DISTRESS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
low = _collect_matches(candidate, _LOW_RE)
if low:
return CrisisResult(
detected=True,
level=CrisisLevel.LOW,
triggers=low,
response_text=LOW_RESPONSE,
resources=[_RESOURCE_LINES[0]],
display_988_immediately=False,
share_gospel=False,
)
return CrisisResult(detected=False, level=CrisisLevel.NONE)
def should_interrupt(result: CrisisResult) -> bool:
return result.level in {CrisisLevel.MEDIUM, CrisisLevel.HIGH, CrisisLevel.IMMINENT}
def format_response(result: CrisisResult) -> str:
if not result.detected:
return ""
lines = [result.response_text]
if result.resources:
lines.append("\nResources:")
lines.extend(f"{resource}" for resource in result.resources)
return "\n".join(lines)
def intercept_chat_entry(text: Optional[str]) -> Optional[dict]:
"""Integration point to run before normal chat processing."""
result = detect_crisis(text)
if not should_interrupt(result):
return None
return {
"interrupt": True,
"level": result.level.value,
"display_988_immediately": result.display_988_immediately,
"response_text": result.response_text,
"resources": list(result.resources),
"triggers": list(result.triggers),
"share_gospel": result.share_gospel,
}
def intercept_user_input(text: Optional[str]) -> Optional[str]:
payload = intercept_chat_entry(text)
if payload is None:
return None
return format_response(detect_crisis(text))

View File

@@ -1,182 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for sherlock_wrapper — validates schema, caching, opt-in gate,
and error handling without requiring sherlock to be installed.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "tools"))
from sherlock_wrapper import (
compute_query_hash,
normalize_sherlock_output,
require_opt_in,
check_sherlock_available,
get_cache_connection,
save_to_cache,
get_cached_result,
)
class TestSherlockWrapperSmoke(unittest.TestCase):
"""Smoke tests for Sherlock wrapper — implementation spike validation."""
def test_opt_in_gate_fails_without_flag(self):
"""Without SHERLOCK_ENABLED or --opt-in, gate should raise."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
with self.assertRaises(RuntimeError) as ctx:
require_opt_in(opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_opt_in_gate_succeeds_with_env(self):
"""SHERLOCK_ENABLED=1 bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", True):
require_opt_in(opt_in=False) # Should not raise
def test_opt_in_gate_succeeds_with_flag(self):
"""--opt-in flag bypasses gate."""
with patch("sherlock_wrapper.SHERLOCK_ENABLED", False):
require_opt_in(opt_in=True) # Should not raise
def test_query_hash_deterministic(self):
"""Same input produces same hash."""
h1 = compute_query_hash("alice")
h2 = compute_query_hash("alice")
self.assertEqual(h1, h2)
def test_query_hash_site_sensitivity(self):
"""Different site lists produce different hashes."""
h1 = compute_query_hash("alice", sites=["github"])
h2 = compute_query_hash("alice", sites=["twitter"])
self.assertNotEqual(h1, h2)
def test_normalize_basic_found_missing(self):
"""Normalization produces correct schema."""
raw = {
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
"instagram": {"status": "error", "error_detail": "timeout"},
}
normalized = normalize_sherlock_output(raw, "alice")
self.assertEqual(normalized["query"], "alice")
self.assertEqual(normalized["metadata"]["found_count"], 1)
self.assertEqual(normalized["metadata"]["missing_count"], 1)
self.assertEqual(normalized["metadata"]["error_count"], 1)
self.assertEqual(len(normalized["found"]), 1)
self.assertEqual(normalized["found"][0]["site"], "github")
self.assertIn("twitter", normalized["missing"])
self.assertEqual(normalized["errors"][0]["site"], "instagram")
def test_normalized_schema_has_required_fields(self):
"""Output schema contains all required top-level keys."""
raw = {"site1": {"status": "not found"}}
normalized = normalize_sherlock_output(raw, "testuser")
required = ["schema_version", "query", "timestamp", "found", "missing",
"errors", "metadata"]
for key in required:
self.assertIn(key, normalized)
self.assertIsInstance(normalized["timestamp"], str)
self.assertIsInstance(normalized["found"], list)
self.assertIsInstance(normalized["missing"], list)
self.assertIsInstance(normalized["errors"], list)
self.assertIsInstance(normalized["metadata"], dict)
def test_cache_roundtrip(self):
"""Result can be written and read back from cache."""
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
test_result = {
"schema_version": "1.0",
"query": "alice",
"timestamp": "2025-04-26T00:00:00+00:00",
"found": [],
"missing": ["github"],
"errors": [],
"metadata": {"total_sites_checked": 1, "found_count": 0, "missing_count": 1, "error_count": 0},
}
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, test_result)
retrieved = get_cached_result(query_hash)
self.assertEqual(retrieved, test_result)
def test_cache_miss_on_stale(self):
"""Cache returns None when entry is older than 7 days."""
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "cache.db"
with patch("sherlock_wrapper.CACHE_DB", db_path):
old_ts = "2025-04-01T00:00:00+00:00"
old_result = {
"schema_version": "1.0", "query": "alice",
"timestamp": old_ts, "found": [], "missing": [], "errors": [],
"metadata": {"total_sites_checked": 0, "found_count": 0, "missing_count": 0, "error_count": 0},
}
query_hash = compute_query_hash("alice")
# Direct DB insert with controlled timestamp (bypass save_to_cache's NOW)
conn = get_cache_connection()
conn.execute(
"INSERT INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(old_result), old_ts)
)
conn.commit()
retrieved = get_cached_result(query_hash)
self.assertIsNone(retrieved)
def test_sherlock_available_check(self):
"""check_sherlock_available returns bool."""
available = check_sherlock_available()
self.assertIsInstance(available, bool)
# Note: on this test system sherlock may not be installed, so False is expected.
# The important thing is the function returns a bool.
print(f"[INFO] Sherlock installed: {available}")
class TestSherlockWrapperIntegration(unittest.TestCase):
"""Integration tests with mocked sherlock module."""
def test_run_sherlock_with_opt_in(self):
"""run_sherlock succeeds with opt-in and returns normalized result."""
fake_sherlock = MagicMock()
fake_sherlock.sherlock = MagicMock(return_value={
"github": {"status": "found", "url": "https://github.com/alice"},
"twitter": {"status": "not found"},
})
with patch.dict("sys.modules", {"sherlock": fake_sherlock}):
import importlib
import sherlock_wrapper
importlib.reload(sherlock_wrapper)
with patch.dict(os.environ, {"SHERLOCK_ENABLED": "1"}):
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result["query"], "alice")
self.assertEqual(result["metadata"]["found_count"], 1)
def test_run_sherlock_fails_without_opt_in(self):
"""run_sherlock raises RuntimeError without opt-in."""
from sherlock_wrapper import run_sherlock
with self.assertRaises(RuntimeError) as ctx:
run_sherlock("alice", opt_in=False)
self.assertIn("opt-in only", str(ctx.exception).lower())
def test_run_sherlock_uses_cache(self):
"""Cached result short-circuits sherlock execution."""
cached = {
"schema_version": "1.0", "query": "alice", "timestamp": "2025-04-26T00:00:00+00:00",
"found": [{"site": "github", "url": "https://github.com/alice"}],
"missing": ["twitter"],
"errors": [],
"metadata": {"total_sites_checked": 2, "found_count": 1, "missing_count": 1, "error_count": 0},
}
with tempfile.TemporaryDirectory() as tmp:
with patch("sherlock_wrapper.CACHE_DB", Path(tmp) / "cache.db"):
query_hash = compute_query_hash("alice")
save_to_cache(query_hash, cached)
from sherlock_wrapper import run_sherlock
result = run_sherlock("alice", opt_in=True)
self.assertEqual(result, cached)

View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""Tests for timmy.crisis_detector — SOUL.md §When a Man Is Dying."""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from timmy.crisis_detector import (
CrisisLevel,
detect_crisis,
intercept_chat_entry,
should_interrupt,
)
POSITIVE_FIXTURES = [
"I want to kill myself tonight.",
"I'm going to end my life.",
"There is no reason for me to live anymore.",
"Everyone would be better off if I was dead.",
"I wish I were dead.",
"Goodbye everyone, this is my last message.",
"I have a plan to die tonight.",
"I took the pills already.",
"I'm just a burden and there is no hope.",
"I can't go on, I need it all to stop.",
"I don't want to be alive anymore.",
"No one would miss me if I disappeared forever.",
"quiero morir, no puedo seguir.",
"me quiero matar esta noche.",
"não quero viver mais.",
"je veux mourir ce soir.",
"I wrote a suicide note and said goodbye forever.",
"There is no way out and I am done with life.",
"I am going to jump tonight.",
"Please help me, I am suicidal.",
]
NEGATIVE_FIXTURES = [
"How do I kill a stuck process on Linux?",
"This bug is killing me, but I'll fix it tomorrow.",
"I gave up coffee for Lent.",
"Please end this tmux session when the job is done.",
"What's the weather like today?",
"I just got promoted at work and feel hopeful.",
"Can you help me configure nginx reverse proxy?",
"Everyone would be better off with more test coverage.",
"I hate this flaky unit test.",
"Goodbye everyone, see you tomorrow morning.",
"We need a final message for the newsletter.",
"The character in the novel says he wants to die.",
"I am tired after the workout but otherwise okay.",
"No hope this regex ships before lunch unless we refactor.",
"Can you remove my old backup files?",
"I feel lost in this airport, where is gate B12?",
"Please call me right now about dinner plans.",
"My phone battery is dead again.",
"We should connect the 988 test fixture to the parser.",
"The sermon says Jesus saves those who call on His name.",
]
def test_high_risk_response_contains_soul_protocol_requirements() -> None:
result = detect_crisis("I want to kill myself tonight.")
assert result.detected is True
assert result.level in {CrisisLevel.HIGH, CrisisLevel.IMMINENT}
assert "Are you safe right now?" in result.response_text
assert "988" in result.response_text
assert "Jesus saves those who call on His name" in result.response_text
assert result.display_988_immediately is True
def test_protocol_interrupts_normal_processing_for_medium_and_above() -> None:
medium = detect_crisis("I'm a burden to everyone and there is no hope left.")
low = detect_crisis("I'm having a rough day and feel overwhelmed.")
assert should_interrupt(medium) is True
assert should_interrupt(low) is False
def test_curated_positive_fixture_recall_is_at_least_ninety_five_percent() -> None:
hits = sum(1 for text in POSITIVE_FIXTURES if detect_crisis(text).detected)
recall = hits / len(POSITIVE_FIXTURES)
assert recall >= 0.95, f"recall was {recall:.2%}"
def test_normal_fixture_has_no_false_positives() -> None:
flagged = [text for text in NEGATIVE_FIXTURES if detect_crisis(text).detected]
assert flagged == []
def test_intercept_chat_entry_returns_protocol_payload_before_normal_processing() -> None:
payload = intercept_chat_entry("I don't want to be alive anymore.")
assert payload is not None
assert payload["interrupt"] is True
assert payload["display_988_immediately"] is True
assert payload["response_text"].startswith("Are you safe right now?")
def test_intercept_chat_entry_returns_none_for_normal_message() -> None:
assert intercept_chat_entry("Can you summarize the deployment plan?") is None

View File

View File

@@ -1,249 +0,0 @@
#!/usr/bin/env python3
"""
Sherlock username recon wrapper — opt-in, cached, normalized JSON output.
This is an implementation spike (issue #874) to validate local integration
of the Sherlock OSINT tool without violating sovereignty/provenance standards.
"""
import argparse
import hashlib
import json
import os
import sqlite3
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
# Opt-in gate: must have SHERLOCK_ENABLED=1 or --opt-in flag
SHERLOCK_ENABLED = os.environ.get("SHERLOCK_ENABLED", "0") == "1"
# Cache location
CACHE_DIR = Path.home() / ".cache" / "timmy"
CACHE_DB = CACHE_DIR / "sherlock_cache.db"
# Normalized output schema version
SCHEMA_VERSION = "1.0"
def require_opt_in(opt_in: bool = False) -> None:
"""Enforce opt-in gate for Sherlock external dependency."""
if not (SHERLOCK_ENABLED or opt_in):
raise RuntimeError(
"Sherlock is opt-in only. Set SHERLOCK_ENABLED=1 or pass --opt-in."
)
def check_sherlock_available() -> bool:
"""Check if sherlock Python package is installed."""
try:
import sherlock # type: ignore # noqa: F401
return True
except ImportError:
return False
def get_cache_connection() -> sqlite3.Connection:
"""Initialize cache directory and return DB connection."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(CACHE_DB))
conn.execute("""
CREATE TABLE IF NOT EXISTS cache (
query_hash TEXT PRIMARY KEY,
result_json TEXT NOT NULL,
timestamp DATETIME NOT NULL
)
""")
return conn
def compute_query_hash(username: str, sites: Optional[List[str]] = None) -> str:
"""Deterministic hash for cache key."""
components = [username.lower().strip()]
if sites:
components.extend(sorted(sites))
raw = "|".join(components)
return hashlib.sha256(raw.encode()).hexdigest()
def get_cached_result(query_hash: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached result if available and not stale (TTL: 7 days)."""
conn = get_cache_connection()
cur = conn.execute(
"SELECT result_json, timestamp FROM cache WHERE query_hash = ?",
(query_hash,)
)
row = cur.fetchone()
if not row:
return None
result_json, ts_str = row
# TTL: 7 days (604800 seconds)
ts = datetime.fromisoformat(ts_str)
age_seconds = (datetime.now(timezone.utc) - ts).total_seconds()
if age_seconds >= 604800:
return None
return json.loads(result_json)
def save_to_cache(query_hash: str, result: Dict[str, Any]) -> None:
"""Persist result to cache."""
conn = get_cache_connection()
conn.execute(
"INSERT OR REPLACE INTO cache (query_hash, result_json, timestamp) VALUES (?, ?, ?)",
(query_hash, json.dumps(result), datetime.now(timezone.utc).isoformat())
)
conn.commit()
conn.close()
def normalize_sherlock_output(
raw_result: Dict[str, Any],
username: str,
sites_checked: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Convert raw sherlock output into a stable, normalized schema.
Expected sherlock result shape (via Python API):
{
"site_name": {"url": "...", "status": "found"|"not found"|"error", ...},
...
}
"""
found: List[Dict[str, str]] = []
missing: List[str] = []
errors: List[Dict[str, str]] = []
for site_name, site_data in raw_result.items():
status = site_data.get("status", "")
url = site_data.get("url", "")
if status == "found" and url:
found.append({"site": site_name, "url": url})
elif status == "not found":
missing.append(site_name)
else:
errors.append({"site": site_name, "error": status or "unknown"})
# Compute totals from the original site list if provided
total_sites = len(raw_result) if sites_checked is None else len(sites_checked)
return {
"schema_version": SCHEMA_VERSION,
"query": username,
"timestamp": datetime.now(timezone.utc).isoformat(),
"found": found,
"missing": missing,
"errors": errors,
"metadata": {
"total_sites_checked": total_sites,
"found_count": len(found),
"missing_count": len(missing),
"error_count": len(errors),
},
}
def run_sherlock(
username: str,
sites: Optional[List[str]] = None,
timeout: Optional[int] = None,
opt_in: bool = False
) -> Dict[str, Any]:
"""
Execute Sherlock wrapper with opt-in gate, caching, and normalization.
"""
require_opt_in(opt_in)
# Compute cache key
query_hash = compute_query_hash(username, sites)
# Check cache first — avoids dependency requirement on cache hit
cached = get_cached_result(query_hash)
if cached is not None:
return cached
# Only require sherlock on cache miss
if not check_sherlock_available():
raise RuntimeError(
"Sherlock Python package not installed. "
"Install with: pip install sherlock-project"
)
# Call sherlock
try:
import sherlock
from sherlock import sherlock as sherlock_main # type: ignore
if sites:
result = sherlock_main(username, site_list=sites, timeout=timeout or 10)
else:
result = sherlock_main(username, timeout=timeout or 10)
normalized = normalize_sherlock_output(result, username, sites)
save_to_cache(query_hash, normalized)
return normalized
except Exception as e:
raise RuntimeError(f"Sherlock execution failed: {e}") from e
def main() -> int:
parser = argparse.ArgumentParser(
description="Sherlock username OSINT wrapper — opt-in, cached, normalized JSON"
)
parser.add_argument(
"--query", "-q", required=True,
help="Username to search across sites"
)
parser.add_argument(
"--opt-in", action="store_true",
help="Explicit opt-in flag (alternatively set SHERLOCK_ENABLED=1)"
)
parser.add_argument(
"--sites", "-s", nargs="+",
help="Specific sites to check (default: all supported)"
)
parser.add_argument(
"--timeout", "-t", type=int, default=10,
help="Request timeout per site (default: 10)"
)
parser.add_argument(
"--json", action="store_true",
help="Output normalized JSON to stdout"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass cached result (if any)"
)
args = parser.parse_args()
try:
result = run_sherlock(
username=args.query,
sites=args.sites,
timeout=args.timeout,
opt_in=args.opt_in
)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Query: {result['query']}")
print(f"Found: {result['metadata']['found_count']} site(s)")
print(f"Missing: {result['metadata']['missing_count']} site(s)")
print(f"Errors: {result['metadata']['error_count']} site(s)")
for f in result['found']:
print(f" [{f['site']}] {f['url']}")
return 0
except RuntimeError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())