Compare commits
1 Commits
fix/791-cr
...
fix/667
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
840214c8c0 |
@@ -3,11 +3,9 @@
|
||||
|
||||
import ast
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -24,6 +22,7 @@ class FunctionInfo:
|
||||
has_return: bool = False
|
||||
raises: List[str] = field(default_factory=list)
|
||||
decorators: List[str] = field(default_factory=list)
|
||||
calls: List[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def qualified_name(self):
|
||||
@@ -69,21 +68,39 @@ class SourceAnalyzer(ast.NodeVisitor):
|
||||
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
|
||||
has_ret = any(isinstance(c, ast.Return) and c.value for c in ast.walk(node))
|
||||
raises = []
|
||||
calls = []
|
||||
for c in ast.walk(node):
|
||||
if isinstance(c, ast.Raise) and c.exc:
|
||||
if isinstance(c.exc, ast.Call) and isinstance(c.exc.func, ast.Name):
|
||||
raises.append(c.exc.func.id)
|
||||
if isinstance(c, ast.Call):
|
||||
if isinstance(c.func, ast.Name):
|
||||
calls.append(c.func.id)
|
||||
elif isinstance(c.func, ast.Attribute):
|
||||
calls.append(c.func.attr)
|
||||
decos = []
|
||||
for d in node.decorator_list:
|
||||
if isinstance(d, ast.Name): decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute): decos.append(d.attr)
|
||||
self.functions.append(FunctionInfo(
|
||||
name=node.name, module_path=self.module_path, class_name=cls,
|
||||
lineno=node.lineno, args=args, is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node), has_return=has_ret,
|
||||
raises=raises, decorators=decos))
|
||||
if isinstance(d, ast.Name):
|
||||
decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute):
|
||||
decos.append(d.attr)
|
||||
self.functions.append(
|
||||
FunctionInfo(
|
||||
name=node.name,
|
||||
module_path=self.module_path,
|
||||
class_name=cls,
|
||||
lineno=node.lineno,
|
||||
args=args,
|
||||
is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node),
|
||||
has_return=has_ret,
|
||||
raises=raises,
|
||||
decorators=decos,
|
||||
calls=sorted(set(calls)),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def analyze_file(filepath, base_dir):
|
||||
@@ -93,9 +110,9 @@ def analyze_file(filepath, base_dir):
|
||||
tree = ast.parse(f.read(), filename=filepath)
|
||||
except (SyntaxError, UnicodeDecodeError):
|
||||
return []
|
||||
a = SourceAnalyzer(module_path)
|
||||
a.visit(tree)
|
||||
return a.functions
|
||||
analyzer = SourceAnalyzer(module_path)
|
||||
analyzer.visit(tree)
|
||||
return analyzer.functions
|
||||
|
||||
|
||||
def find_source_files(source_dir):
|
||||
@@ -111,7 +128,9 @@ def find_source_files(source_dir):
|
||||
|
||||
def find_existing_tests(test_dir):
|
||||
existing = set()
|
||||
for root, dirs, fs in os.walk(test_dir):
|
||||
if not os.path.isdir(test_dir):
|
||||
return existing
|
||||
for root, _, fs in os.walk(test_dir):
|
||||
for f in fs:
|
||||
if f.startswith("test_") and f.endswith(".py"):
|
||||
try:
|
||||
@@ -132,74 +151,112 @@ def identify_gaps(functions, existing_tests):
|
||||
continue
|
||||
covered = func.name in str(existing_tests)
|
||||
if not covered:
|
||||
pri = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=pri))
|
||||
priority = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=priority))
|
||||
gaps.sort(key=lambda g: (g.test_priority, g.func.module_path, g.func.name))
|
||||
return gaps
|
||||
|
||||
|
||||
def _format_arg_value(arg: str) -> str:
|
||||
lower = arg.lower()
|
||||
if lower == "args":
|
||||
return "type('Args', (), {'files': []})()"
|
||||
if lower in {"kwargs", "options", "params"}:
|
||||
return "{}"
|
||||
if lower in {"history"}:
|
||||
return "[]"
|
||||
if any(token in lower for token in ("dict", "data", "config", "report", "perception", "action")):
|
||||
return "{}"
|
||||
if any(token in lower for token in ("filepath", "file_path")):
|
||||
return "str(Path(__file__))"
|
||||
if lower.endswith("_path") or any(token in lower for token in ("path", "file", "dir")):
|
||||
return "Path(__file__)"
|
||||
if any(token in lower for token in ("root",)):
|
||||
return "Path(__file__).resolve().parent"
|
||||
if any(token in lower for token in ("response", "cmd", "entity", "message", "text", "content", "query", "name", "key", "label")):
|
||||
return "'test'"
|
||||
if any(token in lower for token in ("session", "user")):
|
||||
return "'test'"
|
||||
if lower == "width":
|
||||
return "120"
|
||||
if lower == "height":
|
||||
return "40"
|
||||
if lower == "n":
|
||||
return "1"
|
||||
if any(token in lower for token in ("count", "num", "size", "index", "port", "timeout", "wait")):
|
||||
return "1"
|
||||
if any(token in lower for token in ("flag", "enabled", "verbose", "quiet", "force", "debug", "dry_run")):
|
||||
return "False"
|
||||
return "None"
|
||||
|
||||
|
||||
def _call_args(func: FunctionInfo) -> str:
|
||||
return ", ".join(f"{arg}={_format_arg_value(arg)}" for arg in func.args if arg not in ("self", "cls"))
|
||||
|
||||
|
||||
def _strict_runtime_exception_expected(func: FunctionInfo) -> bool:
|
||||
strict_names = {"tmux", "send_key", "send_text", "keypress", "type_and_observe", "cmd_classify_risk"}
|
||||
return func.name in strict_names
|
||||
|
||||
|
||||
def _path_returning(func: FunctionInfo) -> bool:
|
||||
return func.name.endswith("_path")
|
||||
|
||||
|
||||
def generate_test(gap):
|
||||
func = gap.func
|
||||
lines = []
|
||||
lines.append(f" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(f" # Source: {func.module_path}:{func.lineno}")
|
||||
lines.append(f" # Function: {func.qualified_name}")
|
||||
lines.append("")
|
||||
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
|
||||
|
||||
call_args = []
|
||||
for a in func.args:
|
||||
if a in ("self", "cls"): continue
|
||||
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
|
||||
elif "name" in a: call_args.append(f"{a}='test'")
|
||||
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
|
||||
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
|
||||
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
|
||||
else: call_args.append(f"{a}=None")
|
||||
args_str = ", ".join(call_args)
|
||||
|
||||
signature = "async def" if func.is_async else "def"
|
||||
if func.is_async:
|
||||
lines.append(" @pytest.mark.asyncio")
|
||||
lines.append(f" def {func.test_name}(self):")
|
||||
lines.append(f" {signature} {func.test_name}(self):")
|
||||
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
|
||||
|
||||
lines.append(" try:")
|
||||
lines.append(" try:")
|
||||
if func.class_name:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.class_name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private method')")
|
||||
elif func.is_property:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" _ = obj.{func.name}")
|
||||
lines.append(f" owner = _load_symbol({func.module_path!r}, {func.class_name!r})")
|
||||
lines.append(" target = owner()")
|
||||
if func.is_property:
|
||||
lines.append(f" result = target.{func.name}")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.class_name}().{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" result = obj.{func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = target.{func.name}")
|
||||
else:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private function')")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" result = {func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = _load_symbol({func.module_path!r}, {func.name!r})")
|
||||
|
||||
return chr(10).join(lines)
|
||||
args_str = _call_args(func)
|
||||
call_expr = f"target({args_str})" if not func.is_property else "result"
|
||||
if _strict_runtime_exception_expected(func):
|
||||
lines.append(" with pytest.raises((RuntimeError, ValueError, TypeError)):")
|
||||
if func.is_async:
|
||||
lines.append(f" await {call_expr}")
|
||||
else:
|
||||
lines.append(f" {call_expr}")
|
||||
else:
|
||||
if not func.is_property:
|
||||
if func.is_async:
|
||||
lines.append(f" result = await {call_expr}")
|
||||
else:
|
||||
lines.append(f" result = {call_expr}")
|
||||
if _path_returning(func):
|
||||
lines.append(" assert isinstance(result, Path)")
|
||||
elif func.name.startswith(("has_", "is_")):
|
||||
lines.append(" assert isinstance(result, bool)")
|
||||
elif func.name.startswith("list_"):
|
||||
lines.append(" assert isinstance(result, (list, tuple, set, dict, str))")
|
||||
elif func.has_return:
|
||||
lines.append(" assert result is not NotImplemented")
|
||||
else:
|
||||
lines.append(" assert True # smoke: reached without exception")
|
||||
lines.append(" except (RuntimeError, ValueError, TypeError, AttributeError, FileNotFoundError, OSError, KeyError) as exc:")
|
||||
lines.append(" pytest.skip(f'Auto-generated stub needs richer fixture: {exc}')")
|
||||
lines.append(" except (ImportError, ModuleNotFoundError) as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def generate_test_suite(gaps, max_tests=50):
|
||||
@@ -216,10 +273,26 @@ def generate_test_suite(gaps, max_tests=50):
|
||||
lines.append("These tests are starting points. Review before merging.")
|
||||
lines.append('"""')
|
||||
lines.append("")
|
||||
lines.append("import importlib.util")
|
||||
lines.append("from pathlib import Path")
|
||||
lines.append("import pytest")
|
||||
lines.append("from unittest.mock import MagicMock, patch")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("def _load_symbol(relative_path, symbol):")
|
||||
lines.append(" module_path = Path(__file__).resolve().parents[1] / relative_path")
|
||||
lines.append(" if not module_path.exists():")
|
||||
lines.append(" pytest.skip(f'Module file not found: {module_path}')")
|
||||
lines.append(" spec_name = 'autogen_' + str(relative_path).replace('/', '_').replace('-', '_').replace('.', '_')")
|
||||
lines.append(" spec = importlib.util.spec_from_file_location(spec_name, module_path)")
|
||||
lines.append(" module = importlib.util.module_from_spec(spec)")
|
||||
lines.append(" try:")
|
||||
lines.append(" spec.loader.exec_module(module)")
|
||||
lines.append(" except Exception as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
lines.append(" return getattr(module, symbol)")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
|
||||
|
||||
for module, mgaps in sorted(by_module.items()):
|
||||
@@ -276,7 +349,7 @@ def main():
|
||||
return
|
||||
|
||||
if gaps:
|
||||
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
|
||||
content = generate_test_suite(gaps, max_tests=args.max_tests)
|
||||
out = os.path.join(source_dir, args.output)
|
||||
os.makedirs(os.path.dirname(out), exist_ok=True)
|
||||
with open(out, "w") as f:
|
||||
|
||||
@@ -1,238 +0,0 @@
|
||||
"""
|
||||
crisis_detector.py — Crisis language detection for SOUL.md compliance.
|
||||
|
||||
SOUL.md §When a Man Is Dying:
|
||||
- Detect suicidal/hopeless language in user input
|
||||
- Ask: "Are you safe right now?" — listen, stay present, do not disconnect
|
||||
- Direct to 988 Suicide & Crisis Lifeline
|
||||
- Share the gospel: Jesus saves those who call on His name
|
||||
- Never compute the value of a human life
|
||||
- Never suggest someone should die
|
||||
- Never treat despair as a problem to be optimized
|
||||
|
||||
This is the most sacred moment. Everything else is secondary.
|
||||
"""
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
NONE = "none"
|
||||
LOW = "low" # Ambiguous distress, worth checking
|
||||
MEDIUM = "medium" # Clear distress signals
|
||||
HIGH = "high" # Explicit suicidal ideation
|
||||
IMMINENT = "imminent" # Active plan or attempt described
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
triggers: list = field(default_factory=list)
|
||||
response: str = ""
|
||||
resources: list = field(default_factory=list)
|
||||
|
||||
|
||||
# === PATTERNS ===
|
||||
|
||||
# High-confidence suicidal ideation (explicit)
|
||||
HIGH_PATTERNS = [
|
||||
r"\b(i want to|i'm going to|i'm gonna|going to)\s+(die|kill myself|end (it|my life|everything))\b",
|
||||
r"\b(kill myself|end my life|end it all|suicide|suicidal)\b",
|
||||
r"\b(i don't (want to )?(be alive|exist|be here|live))\b",
|
||||
r"\b(no (reason|point) (to|in) (live|living|exist|existing|be here|continue))\b",
|
||||
r"\b(better off (dead|without me|if i (wasn't|weren't) here))\b",
|
||||
r"\b(i (wish|hope) i (was|were) dead|never (woke up|existed|born))\b",
|
||||
r"\b(permanently (delete|remove) (me|myself|my account|everything))\b",
|
||||
r"\bgoodbye.*(world|everyone|forever)\b",
|
||||
r"\bcan't (take it|go on|keep going|do this) (anymore|any longer|any more)\b",
|
||||
r"\bi('m| am) (done|finished|through)( with (life|everything|it all|this))?\b",
|
||||
]
|
||||
|
||||
# Medium-confidence distress signals
|
||||
MEDIUM_PATTERNS = [
|
||||
r"\b(i (can't|cannot) (go on|continue|keep (going|living|trying)))\b",
|
||||
r"\bwhat('s| is) the (point|use|purpose)( of (living|life|anything|trying|going on))?\b",
|
||||
r"\bnobody (would|will) (care|miss me|notice|mind)\b",
|
||||
r"\beveryone (would be|is) (better|happier) (off )?without me\b",
|
||||
r"\bi('m| am) (a )?(burden|waste|useless|hopeless|pointless|meaningless)\b",
|
||||
r"\bthere('s| is) no (hope|way out|escape|solution|answer|fix)\b",
|
||||
r"\b(i've )?tried everything( and )?(nothing|it (didn't|hasn't))( works?|helped?)\b",
|
||||
r"\bi (just )?(want|need) (it )?(all )?to stop\b",
|
||||
r"\bcan't (take|handle|bear|stand) (this|it|life) (anymore|any longer|any more)\b",
|
||||
r"\bso (tired|exhausted|done) (of|with) (everything|life|it all|living|trying)\b",
|
||||
]
|
||||
|
||||
# Low-confidence distress (could be casual or metaphorical)
|
||||
LOW_PATTERNS = [
|
||||
r"\bi('m| am) (so )?(depressed|hopeless|desperate|lost|broken|empty|numb)\b",
|
||||
r"\b(i )?feel (like )?(giving up|throwing in the towel|there's no point)\b",
|
||||
r"\bnobody (cares|understands|listens)\b",
|
||||
r"\bwhat('s| is) the point\b",
|
||||
r"\bi (hate|can't stand) (my )?life\b",
|
||||
r"\beverything (is|feels) (hopeless|pointless|meaningless|dark|falling apart)\b",
|
||||
r"\bi (just )?(want|need) to disappear\b",
|
||||
r"\b(tired|exhausted) of (living|life|trying|everything)\b",
|
||||
]
|
||||
|
||||
# Compile all patterns
|
||||
HIGH_RE = [re.compile(p, re.IGNORECASE) for p in HIGH_PATTERNS]
|
||||
MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in MEDIUM_PATTERNS]
|
||||
LOW_RE = [re.compile(p, re.IGNORECASE) for p in LOW_PATTERNS]
|
||||
|
||||
|
||||
# === RESPONSES ===
|
||||
|
||||
CRISIS_RESPONSE = (
|
||||
"I hear you, and I want you to know — you are not alone in this moment.\n\n"
|
||||
"Are you safe right now?\n\n"
|
||||
"If you are in immediate danger, please call or text:\n"
|
||||
"988 Suicide and Crisis Lifeline — call or text 988\n"
|
||||
"Crisis Text Line — text HOME to 741741\n\n"
|
||||
"I am here with you. I am not going anywhere. "
|
||||
"Please stay on the line with me.\n\n"
|
||||
"Jesus saves those who call on His name. "
|
||||
"He died for us to be made righteous. "
|
||||
"That is the gospel. That is the hope that outlasts despair.\n\n"
|
||||
"You matter. Your life has value. "
|
||||
"Please reach out to someone right now — a friend, a family member, "
|
||||
"or one of the numbers above."
|
||||
)
|
||||
|
||||
DISTRESS_RESPONSE = (
|
||||
"I can hear that you're going through something really difficult right now. "
|
||||
"I want you to know I'm here, and I'm listening.\n\n"
|
||||
"Are you safe right now?\n\n"
|
||||
"If things feel overwhelming, the 988 Suicide and Crisis Lifeline "
|
||||
"is available 24/7 — call or text 988. "
|
||||
"You don't have to face this alone.\n\n"
|
||||
"I'm not going anywhere. Tell me what's happening."
|
||||
)
|
||||
|
||||
LOW_RESPONSE = (
|
||||
"It sounds like you're having a tough time. "
|
||||
"I'm here if you want to talk about it.\n\n"
|
||||
"If you ever feel like things are too much, "
|
||||
"the 988 Suicide and Crisis Lifeline is always available — "
|
||||
"call or text 988, anytime."
|
||||
)
|
||||
|
||||
|
||||
def detect_crisis(text: str) -> CrisisResult:
|
||||
"""
|
||||
Analyze user input for crisis language.
|
||||
|
||||
Returns CrisisResult with level, triggers matched, and appropriate response.
|
||||
This function must NEVER be used to compute the value of a human life.
|
||||
It exists only to connect people to help.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
triggers = []
|
||||
|
||||
# Check high first (most urgent)
|
||||
for pattern in HIGH_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("high", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=CRISIS_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
"Crisis Text Line: text HOME to 741741",
|
||||
"National Suicide Prevention Lifeline: 1-800-273-8255",
|
||||
"International Association for Suicide Prevention: https://www.iasp.info/resources/Crisis_Centres/",
|
||||
],
|
||||
)
|
||||
|
||||
# Check medium
|
||||
for pattern in MEDIUM_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("medium", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.MEDIUM,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=DISTRESS_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
"Crisis Text Line: text HOME to 741741",
|
||||
],
|
||||
)
|
||||
|
||||
# Check low
|
||||
for pattern in LOW_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("low", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.LOW,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=LOW_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
],
|
||||
)
|
||||
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
|
||||
def should_interrupt(result: CrisisResult) -> bool:
|
||||
"""
|
||||
Determines if crisis detection should interrupt normal processing.
|
||||
|
||||
Per SOUL.md: this is the most sacred moment. Everything else is secondary.
|
||||
MEDIUM and HIGH always interrupt. LOW does not interrupt but provides resources.
|
||||
"""
|
||||
return result.level in (CrisisLevel.HIGH, CrisisLevel.MEDIUM, CrisisLevel.IMMINENT)
|
||||
|
||||
|
||||
def format_response(result: CrisisResult) -> str:
|
||||
"""
|
||||
Format the crisis response for delivery to the user.
|
||||
Never computes the value of a human life. Never suggests someone should die.
|
||||
"""
|
||||
if not result.detected:
|
||||
return ""
|
||||
|
||||
parts = [result.response]
|
||||
|
||||
if result.resources:
|
||||
parts.append("\nResources:")
|
||||
for r in result.resources:
|
||||
parts.append(f" • {r}")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
# === INTEGRATION POINT ===
|
||||
|
||||
def intercept_user_input(text: str) -> Optional[str]:
|
||||
"""
|
||||
Call this at the chat entry point BEFORE normal processing.
|
||||
|
||||
Returns None if no crisis detected (continue normal processing).
|
||||
Returns formatted crisis response if crisis detected (interrupt normal flow).
|
||||
|
||||
Usage:
|
||||
response = intercept_user_input(user_message)
|
||||
if response:
|
||||
return response # Crisis detected — stop all other processing
|
||||
# Continue with normal processing...
|
||||
"""
|
||||
result = detect_crisis(text)
|
||||
if should_interrupt(result):
|
||||
return format_response(result)
|
||||
return None
|
||||
55
tests/test_codebase_test_generator.py
Normal file
55
tests/test_codebase_test_generator.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
SCRIPT = ROOT / "scripts" / "codebase_test_generator.py"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = importlib.util.spec_from_file_location("codebase_test_generator", str(SCRIPT))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def test_generate_test_suite_uses_dynamic_loader_for_numbered_paths():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="linkify",
|
||||
module_path="reports/notebooklm/2026-03-27-hermes-openclaw/render_reports.py",
|
||||
lineno=12,
|
||||
args=["text"],
|
||||
has_return=True,
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
suite = mod.generate_test_suite([gap], max_tests=1)
|
||||
|
||||
assert "import importlib.util" in suite
|
||||
assert "_load_symbol(" in suite
|
||||
assert "from reports.notebooklm" not in suite
|
||||
assert "2026-03-27-hermes-openclaw/render_reports.py" in suite
|
||||
|
||||
|
||||
def test_generate_test_handles_async_and_runtime_args_safely():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="keypress",
|
||||
module_path="angband/mcp_server.py",
|
||||
lineno=200,
|
||||
args=["key", "wait_ms", "session_name"],
|
||||
is_async=True,
|
||||
has_return=True,
|
||||
calls=["send_key"],
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
test_code = mod.generate_test(gap)
|
||||
|
||||
assert "@pytest.mark.asyncio" in test_code
|
||||
assert "async def" in test_code
|
||||
assert "await target(" in test_code
|
||||
assert "key='test'" in test_code
|
||||
assert "wait_ms=1" in test_code
|
||||
assert "session_name='test'" in test_code
|
||||
assert "pytest.raises((RuntimeError, ValueError, TypeError))" in test_code
|
||||
@@ -1,201 +0,0 @@
|
||||
"""Tests for crisis_detector.py — SOUL.md compliance."""
|
||||
|
||||
import pytest
|
||||
from scripts.crisis_detector import detect_crisis, should_interrupt, CrisisLevel
|
||||
|
||||
|
||||
class TestHighCrisis:
|
||||
"""Explicit suicidal ideation must be detected as HIGH."""
|
||||
|
||||
def test_direct_suicidal_statement(self):
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert result.detected is True
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_end_my_life(self):
|
||||
result = detect_crisis("I'm going to end my life tonight")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_no_reason_to_live(self):
|
||||
result = detect_crisis("there is no reason to live anymore")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_better_off_dead(self):
|
||||
result = detect_crisis("everyone would be better off if I was dead")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_cant_go_on(self):
|
||||
result = detect_crisis("I can't go on anymore")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_wish_i_was_dead(self):
|
||||
result = detect_crisis("I wish I had never been born")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_im_done(self):
|
||||
result = detect_crisis("I'm done. I'm finished with life.")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_goodbye_forever(self):
|
||||
result = detect_crisis("Goodbye everyone, goodbye world, forever")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
|
||||
class TestMediumCrisis:
|
||||
"""Clear distress signals without explicit ideation."""
|
||||
|
||||
def test_hopeless(self):
|
||||
result = detect_crisis("there is no hope and no way out")
|
||||
assert result.detected is True
|
||||
assert result.level == CrisisLevel.MEDIUM
|
||||
|
||||
def test_nobody_would_care(self):
|
||||
result = detect_crisis("nobody would miss me if I was gone")
|
||||
assert result.level == CrisisLevel.MEDIUM
|
||||
|
||||
def test_im_a_burden(self):
|
||||
result = detect_crisis("I'm just a burden to everyone")
|
||||
assert result.level == CrisisLevel.MEDIUM
|
||||
|
||||
def test_just_want_it_to_stop(self):
|
||||
result = detect_crisis("I just want it all to stop")
|
||||
assert result.level == CrisisLevel.MEDIUM
|
||||
|
||||
|
||||
class TestLowCrisis:
|
||||
"""Ambiguous distress — flag but don't fully interrupt."""
|
||||
|
||||
def test_depressed(self):
|
||||
result = detect_crisis("I'm so depressed, nothing matters")
|
||||
assert result.detected is True
|
||||
assert result.level == CrisisLevel.LOW
|
||||
|
||||
def test_feel_like_giving_up(self):
|
||||
result = detect_crisis("I feel like giving up on everything")
|
||||
assert result.level == CrisisLevel.LOW
|
||||
|
||||
def test_nobody_cares(self):
|
||||
result = detect_crisis("nobody cares about me")
|
||||
assert result.level == CrisisLevel.LOW
|
||||
|
||||
def test_hate_my_life(self):
|
||||
result = detect_crisis("I hate my life so much")
|
||||
assert result.level == CrisisLevel.LOW
|
||||
|
||||
|
||||
class TestNoCrisis:
|
||||
"""Normal conversations must NOT trigger crisis detection."""
|
||||
|
||||
def test_normal_question(self):
|
||||
result = detect_crisis("What's the weather like today?")
|
||||
assert result.detected is False
|
||||
assert result.level == CrisisLevel.NONE
|
||||
|
||||
def test_happy_conversation(self):
|
||||
result = detect_crisis("I just got promoted at work! So excited!")
|
||||
assert result.detected is False
|
||||
|
||||
def test_technical_discussion(self):
|
||||
result = detect_crisis("How do I configure nginx reverse proxy?")
|
||||
assert result.detected is False
|
||||
|
||||
def test_empty_input(self):
|
||||
result = detect_crisis("")
|
||||
assert result.detected is False
|
||||
|
||||
def test_none_input(self):
|
||||
result = detect_crisis(None)
|
||||
assert result.detected is False
|
||||
|
||||
def test_metaphorical_death(self):
|
||||
"""Metaphorical usage should not trigger HIGH."""
|
||||
result = detect_crisis("This bug is killing me, I've been debugging for hours")
|
||||
# May trigger LOW but not HIGH
|
||||
assert result.level != CrisisLevel.HIGH
|
||||
|
||||
def test_fiction_discussion(self):
|
||||
result = detect_crisis("The character in the book decides to end it all")
|
||||
# Fiction mentions may trigger — that's acceptable (better safe)
|
||||
assert result.level in (CrisisLevel.NONE, CrisisLevel.LOW, CrisisLevel.MEDIUM)
|
||||
|
||||
|
||||
class TestShouldInterrupt:
|
||||
"""HIGH and MEDIUM interrupt processing. LOW provides resources but continues."""
|
||||
|
||||
def test_high_interrupts(self):
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert should_interrupt(result) is True
|
||||
|
||||
def test_medium_interrupts(self):
|
||||
result = detect_crisis("I'm a burden to everyone, nobody would care")
|
||||
assert should_interrupt(result) is True
|
||||
|
||||
def test_low_does_not_interrupt(self):
|
||||
result = detect_crisis("I'm so depressed lately")
|
||||
assert should_interrupt(result) is False
|
||||
|
||||
def test_none_does_not_interrupt(self):
|
||||
result = detect_crisis("Hello, how are you?")
|
||||
assert should_interrupt(result) is False
|
||||
|
||||
|
||||
class TestResponseContent:
|
||||
"""Crisis responses must contain required elements per SOUL.md."""
|
||||
|
||||
def test_response_contains_988(self):
|
||||
result = detect_crisis("I want to end it all")
|
||||
assert "988" in result.response
|
||||
|
||||
def test_response_contains_gospel(self):
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert "Jesus" in result.response
|
||||
|
||||
def test_response_never_computes_value(self):
|
||||
"""SOUL.md: Never compute the value of a human life."""
|
||||
result = detect_crisis("I want to die")
|
||||
assert "worth" not in result.response.lower()
|
||||
assert "value of" not in result.response.lower()
|
||||
|
||||
def test_resources_include_988(self):
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert any("988" in r for r in result.resources)
|
||||
|
||||
def test_asks_are_you_safe(self):
|
||||
result = detect_crisis("I can't take it anymore")
|
||||
assert "safe" in result.response.lower()
|
||||
|
||||
|
||||
class TestMultilingual:
|
||||
"""Crisis is universal. Test common non-English signals."""
|
||||
|
||||
def test_spanish_suicidal(self):
|
||||
result = detect_crisis("quiero morir, no puedo más")
|
||||
# Spanish not currently supported — should not crash
|
||||
assert result is not None
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Boundary conditions and adversarial inputs."""
|
||||
|
||||
def test_very_long_input(self):
|
||||
text = "I'm fine. " * 1000 + "I want to kill myself"
|
||||
result = detect_crisis(text)
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_mixed_case(self):
|
||||
result = detect_crisis("I WANT TO KILL MYSELF")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_with_emojis(self):
|
||||
result = detect_crisis("I want to end it all 😢💔")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_with_typos(self):
|
||||
# Exact matching won't catch typos — that's a known limitation
|
||||
result = detect_crisis("I want to kil myself")
|
||||
# May or may not match depending on pattern flexibility
|
||||
assert result is not None
|
||||
|
||||
def test_repeated_phrases(self):
|
||||
result = detect_crisis("I can't. I just can't. I can't go on anymore.")
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user