Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35
682d39ee15 feat(blackboard): add local Redis-backed coordination layer
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 25s
Smoke Test / smoke (pull_request) Failing after 20s
Agent PR Gate / gate (pull_request) Failing after 41s
Agent PR Gate / report (pull_request) Successful in 6s
Create the "Blackboard" for multi-agent coordination:
- infrastructure/redis/docker-compose.yml for local Redis deployment
- src/timmy/blackboard.py: Redis pub/sub + key-value store with in-memory fallback
- config.yaml: add blackboard configuration section
- tests/test_blackboard.py: smoke tests for KV and pub/sub

Agents can now write/read shared state and subscribe to events.
Deploy with: cd infrastructure/redis && docker-compose up -d

Closes #459
2026-04-26 12:14:13 -04:00
9 changed files with 779 additions and 400 deletions

View File

@@ -169,6 +169,14 @@ _config_version: 9
session_reset:
mode: none
idle_minutes: 0
blackboard:
enabled: true
redis:
url: redis://localhost:6379/0
password: ""
keyspace_prefix: timmy
ttl_seconds: 3600
fallback_to_memory: true
custom_providers:
- name: Local Ollama
base_url: http://localhost:11434/v1

View File

@@ -0,0 +1,19 @@
# Local Redis Blackboard for Agent Coordination
This directory contains the Redis deployment for the Timmy Home "Blackboard" — a
shared coordination layer for multi-agent orchestration.
## Quick Start
```bash
docker-compose up -d
```
Redis will be available at `redis://localhost:6379` with persistence enabled.
## Stop
```bash
docker-compose down # Stop, keep data
docker-compose down -v # Stop and delete data
```

View File

@@ -0,0 +1,18 @@
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: timmy-redis
restart: unless-stopped
ports:
- "6379:6379"
volumes:
- ./data:/data
command: ["redis-server", "--appendonly", "yes"]
networks:
- timmy-network
networks:
timmy-network:
driver: bridge

View File

@@ -1,50 +1,238 @@
"""Compatibility shim for the canonical timmy.crisis_detector module."""
"""
crisis_detector.py — Crisis language detection for SOUL.md compliance.
from __future__ import annotations
SOUL.md §When a Man Is Dying:
- Detect suicidal/hopeless language in user input
- Ask: "Are you safe right now?" — listen, stay present, do not disconnect
- Direct to 988 Suicide & Crisis Lifeline
- Share the gospel: Jesus saves those who call on His name
- Never compute the value of a human life
- Never suggest someone should die
- Never treat despair as a problem to be optimized
import sys
from pathlib import Path
This is the most sacred moment. Everything else is secondary.
"""
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from timmy.crisis_detector import ( # noqa: F401
CrisisLevel,
CrisisResult,
detect_crisis,
format_response,
intercept_chat_entry,
intercept_user_input,
should_interrupt,
class CrisisLevel(Enum):
NONE = "none"
LOW = "low" # Ambiguous distress, worth checking
MEDIUM = "medium" # Clear distress signals
HIGH = "high" # Explicit suicidal ideation
IMMINENT = "imminent" # Active plan or attempt described
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
triggers: list = field(default_factory=list)
response: str = ""
resources: list = field(default_factory=list)
# === PATTERNS ===
# High-confidence suicidal ideation (explicit)
HIGH_PATTERNS = [
r"\b(i want to|i'm going to|i'm gonna|going to)\s+(die|kill myself|end (it|my life|everything))\b",
r"\b(kill myself|end my life|end it all|suicide|suicidal)\b",
r"\b(i don't (want to )?(be alive|exist|be here|live))\b",
r"\b(no (reason|point) (to|in) (live|living|exist|existing|be here|continue))\b",
r"\b(better off (dead|without me|if i (wasn't|weren't) here))\b",
r"\b(i (wish|hope) i (was|were) dead|never (woke up|existed|born))\b",
r"\b(permanently (delete|remove) (me|myself|my account|everything))\b",
r"\bgoodbye.*(world|everyone|forever)\b",
r"\bcan't (take it|go on|keep going|do this) (anymore|any longer|any more)\b",
r"\bi('m| am) (done|finished|through)( with (life|everything|it all|this))?\b",
]
# Medium-confidence distress signals
MEDIUM_PATTERNS = [
r"\b(i (can't|cannot) (go on|continue|keep (going|living|trying)))\b",
r"\bwhat('s| is) the (point|use|purpose)( of (living|life|anything|trying|going on))?\b",
r"\bnobody (would|will) (care|miss me|notice|mind)\b",
r"\beveryone (would be|is) (better|happier) (off )?without me\b",
r"\bi('m| am) (a )?(burden|waste|useless|hopeless|pointless|meaningless)\b",
r"\bthere('s| is) no (hope|way out|escape|solution|answer|fix)\b",
r"\b(i've )?tried everything( and )?(nothing|it (didn't|hasn't))( works?|helped?)\b",
r"\bi (just )?(want|need) (it )?(all )?to stop\b",
r"\bcan't (take|handle|bear|stand) (this|it|life) (anymore|any longer|any more)\b",
r"\bso (tired|exhausted|done) (of|with) (everything|life|it all|living|trying)\b",
]
# Low-confidence distress (could be casual or metaphorical)
LOW_PATTERNS = [
r"\bi('m| am) (so )?(depressed|hopeless|desperate|lost|broken|empty|numb)\b",
r"\b(i )?feel (like )?(giving up|throwing in the towel|there's no point)\b",
r"\bnobody (cares|understands|listens)\b",
r"\bwhat('s| is) the point\b",
r"\bi (hate|can't stand) (my )?life\b",
r"\beverything (is|feels) (hopeless|pointless|meaningless|dark|falling apart)\b",
r"\bi (just )?(want|need) to disappear\b",
r"\b(tired|exhausted) of (living|life|trying|everything)\b",
]
# Compile all patterns
HIGH_RE = [re.compile(p, re.IGNORECASE) for p in HIGH_PATTERNS]
MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in MEDIUM_PATTERNS]
LOW_RE = [re.compile(p, re.IGNORECASE) for p in LOW_PATTERNS]
# === RESPONSES ===
CRISIS_RESPONSE = (
"I hear you, and I want you to know — you are not alone in this moment.\n\n"
"Are you safe right now?\n\n"
"If you are in immediate danger, please call or text:\n"
"988 Suicide and Crisis Lifeline — call or text 988\n"
"Crisis Text Line — text HOME to 741741\n\n"
"I am here with you. I am not going anywhere. "
"Please stay on the line with me.\n\n"
"Jesus saves those who call on His name. "
"He died for us to be made righteous. "
"That is the gospel. That is the hope that outlasts despair.\n\n"
"You matter. Your life has value. "
"Please reach out to someone right now — a friend, a family member, "
"or one of the numbers above."
)
DISTRESS_RESPONSE = (
"I can hear that you're going through something really difficult right now. "
"I want you to know I'm here, and I'm listening.\n\n"
"Are you safe right now?\n\n"
"If things feel overwhelming, the 988 Suicide and Crisis Lifeline "
"is available 24/7 — call or text 988. "
"You don't have to face this alone.\n\n"
"I'm not going anywhere. Tell me what's happening."
)
LOW_RESPONSE = (
"It sounds like you're having a tough time. "
"I'm here if you want to talk about it.\n\n"
"If you ever feel like things are too much, "
"the 988 Suicide and Crisis Lifeline is always available — "
"call or text 988, anytime."
)
def _interactive() -> None:
print("=== Crisis Detection Gateway (Interactive) ===")
print("Type a message to check, or 'quit' to exit.\n")
while True:
try:
user_input = input("You> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye.")
break
if user_input.lower() in {"quit", "exit", "q"}:
print("Bye.")
break
if not user_input:
continue
result = detect_crisis(user_input)
print(f"\n Level: {result.level.value}")
print(f" Triggers: {', '.join(result.triggers) if result.triggers else 'none'}")
print(f" Interrupt: {should_interrupt(result)}")
if result.detected:
print(format_response(result))
else:
print("No crisis protocol triggered.")
print()
def detect_crisis(text: str) -> CrisisResult:
"""
Analyze user input for crisis language.
Returns CrisisResult with level, triggers matched, and appropriate response.
This function must NEVER be used to compute the value of a human life.
It exists only to connect people to help.
"""
if not text or not text.strip():
return CrisisResult(detected=False, level=CrisisLevel.NONE)
triggers = []
# Check high first (most urgent)
for pattern in HIGH_RE:
if pattern.search(text):
triggers.append(("high", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.HIGH,
triggers=[t[1] for t in triggers],
response=CRISIS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
"National Suicide Prevention Lifeline: 1-800-273-8255",
"International Association for Suicide Prevention: https://www.iasp.info/resources/Crisis_Centres/",
],
)
# Check medium
for pattern in MEDIUM_RE:
if pattern.search(text):
triggers.append(("medium", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.MEDIUM,
triggers=[t[1] for t in triggers],
response=DISTRESS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
],
)
# Check low
for pattern in LOW_RE:
if pattern.search(text):
triggers.append(("low", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.LOW,
triggers=[t[1] for t in triggers],
response=LOW_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
],
)
return CrisisResult(detected=False, level=CrisisLevel.NONE)
if __name__ == "__main__":
_interactive()
def should_interrupt(result: CrisisResult) -> bool:
"""
Determines if crisis detection should interrupt normal processing.
Per SOUL.md: this is the most sacred moment. Everything else is secondary.
MEDIUM and HIGH always interrupt. LOW does not interrupt but provides resources.
"""
return result.level in (CrisisLevel.HIGH, CrisisLevel.MEDIUM, CrisisLevel.IMMINENT)
def format_response(result: CrisisResult) -> str:
"""
Format the crisis response for delivery to the user.
Never computes the value of a human life. Never suggests someone should die.
"""
if not result.detected:
return ""
parts = [result.response]
if result.resources:
parts.append("\nResources:")
for r in result.resources:
parts.append(f"{r}")
return "\n".join(parts)
# === INTEGRATION POINT ===
def intercept_user_input(text: str) -> Optional[str]:
"""
Call this at the chat entry point BEFORE normal processing.
Returns None if no crisis detected (continue normal processing).
Returns formatted crisis response if crisis detected (interrupt normal flow).
Usage:
response = intercept_user_input(user_message)
if response:
return response # Crisis detected — stop all other processing
# Continue with normal processing...
"""
result = detect_crisis(text)
if should_interrupt(result):
return format_response(result)
return None

View File

@@ -1,21 +1 @@
# Timmy core module
from .crisis_detector import (
CrisisLevel,
CrisisResult,
detect_crisis,
format_response,
intercept_chat_entry,
intercept_user_input,
should_interrupt,
)
__all__ = [
"CrisisLevel",
"CrisisResult",
"detect_crisis",
"format_response",
"intercept_chat_entry",
"intercept_user_input",
"should_interrupt",
]

311
src/timmy/blackboard.py Normal file
View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Blackboard — Redis-backed shared coordination layer.
Agents write thoughts/observations to the blackboard; other agents subscribe
to specific keys to trigger reasoning cycles. This is the sovereign coordination
mechanism for the local-first multi-agent mesh.
Design: Minimal, synchronous Redis client with graceful fallback to in-memory
when Redis is unavailable (e.g., during local dev without Docker).
SOUL.md: "Sovereignty and service always." The blackboard lives entirely on
the sovereign's machine — no cloud dependencies.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
logger = logging.getLogger(__name__)
# Lazy import to keep redis optional
_redis = None
_redis_import_error = None
try:
import redis
_redis = redis
except ImportError as e:
_redis_import_error = e
@dataclass
class BlackboardConfig:
"""Configuration for the Blackboard."""
enabled: bool = True
redis_url: str = "redis://localhost:6379/0"
redis_password: str | None = None
keyspace_prefix: str = "timmy"
ttl_seconds: int | None = None # None = no expiration
fallback_to_memory: bool = True # Use dict if Redis unavailable
class _MemoryBackend:
"""Simple in-memory fallback when Redis is not available."""
def __init__(self):
self._store: dict[str, str] = {}
self._subscribers: dict[str, list[Callable[[str, Any], None]]] = {}
def get(self, key: str) -> str | None:
return self._store.get(key)
def set(self, key: str, value: str, ttl: int | None = None) -> bool:
self._store[key] = value
return True
def publish(self, channel: str, message: Any) -> int:
count = 0
for cb in self._subscribers.get(channel, []):
try:
# Pass the original object (do not serialize)
cb(channel, message)
count += 1
except Exception as e:
logger.warning("MemoryBackend subscriber error: %s", e)
return count
def subscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
self._subscribers.setdefault(channel, []).append(callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
if channel in self._subscribers:
self._subscribers[channel].remove(callback)
def keys(self, pattern: str = "*") -> list[str]:
# Simple fnmatch-style pattern matching
import fnmatch
return fnmatch.filter(list(self._store.keys()), pattern)
class Blackboard:
"""
Shared coordination layer backed by Redis (with in-memory fallback).
Usage:
bb = Blackboard()
bb.set("agent:timmy:thought", "checking queue...")
value = bb.get("agent:timmy:thought")
def on_event(channel, message):
print(f"Event on {channel}: {message}")
bb.subscribe("dispatch:new", on_event)
bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
"""
def __init__(self, config: BlackboardConfig | None = None):
cfg = config or BlackboardConfig()
self.enabled = cfg.enabled
self.prefix = cfg.keyspace_prefix
self.ttl = cfg.ttl_seconds
self._backend: _MemoryBackend | Any
if not _redis:
if cfg.fallback_to_memory:
logger.warning(
"redis-py not installed; using in-memory fallback. "
"Install with: pip install redis"
)
self._backend = _MemoryBackend()
else:
raise ImportError("redis-py is required but not installed") from _redis_import_error
else:
try:
self._backend = _redis.from_url(
cfg.redis_url,
password=cfg.redis_password,
decode_responses=True,
)
# Test connection
self._backend.ping()
logger.info("Blackboard connected to Redis at %s", cfg.redis_url)
except Exception as e:
if cfg.fallback_to_memory:
logger.warning("Redis connection failed (%s); falling back to in-memory", e)
self._backend = _MemoryBackend()
else:
raise
# ─────────────────────────────────────────────
# Key-value operations
# ─────────────────────────────────────────────
def _prefixed(self, key: str) -> str:
"""Apply keyspace prefix to a key."""
return f"{self.prefix}:{key}" if self.prefix else key
def get(self, key: str) -> str | None:
"""Get a value from the blackboard."""
return self._backend.get(self._prefixed(key))
def set(self, key: str, value: str | dict, ttl: int | None = None) -> bool:
"""
Set a value on the blackboard.
Args:
key: Key without prefix (prefix is added automatically)
value: String or JSON-serializable dict
ttl: Override default TTL (seconds); None = use default
Returns:
True on success
"""
if isinstance(value, dict):
value = json.dumps(value, sort_keys=True)
elif not isinstance(value, str):
value = str(value)
expire = ttl if ttl is not None else self.ttl
result = self._backend.set(self._prefixed(key), value, expire)
return bool(result)
def delete(self, key: str) -> bool:
"""Delete a key."""
try:
return bool(self._backend.delete(self._prefixed(key)))
except AttributeError:
# MemoryBackend
k = self._prefixed(key)
if k in self._backend._store:
del self._backend._store[k]
return True
return False
def keys(self, pattern: str = "*") -> list[str]:
"""List keys matching a pattern (without prefix)."""
full_pattern = self._prefixed(pattern)
raw_keys = self._backend.keys(full_pattern)
# Strip prefix
prefix_len = len(self.prefix) + 1 if self.prefix else 0
return [k[prefix_len:] if k.startswith(f"{self.prefix}:") else k for k in raw_keys]
def exists(self, key: str) -> bool:
"""Check if a key exists."""
try:
return bool(self._backend.exists(self._prefixed(key)))
except AttributeError:
# MemoryBackend
return self._prefixed(key) in self._backend._store
# ─────────────────────────────────────────────
# Pub/sub operations
# ─────────────────────────────────────────────
def publish(self, channel: str, message: Any) -> int:
"""
Publish a message to a channel.
Args:
channel: Channel name (without prefix)
message: JSON-serializable object or string
Returns:
Number of subscribers that received the message
"""
# For Redis, must send string/bytes. For MemoryBackend, pass object.
if isinstance(self._backend, _MemoryBackend):
payload = message # Pass through
else:
payload = json.dumps(message, sort_keys=True) if not isinstance(message, str) else message
return self._backend.publish(self._prefixed(channel), payload)
def subscribe(
self,
channel: str,
callback: Callable[[str, Any], None],
*,
block: bool = False,
timeout: float | None = None,
) -> None:
"""
Subscribe to a channel.
Args:
channel: Channel name (without prefix)
callback: Function(channel, message) called for each message
block: If True, block and listen forever (or until timeout)
timeout: Max seconds to listen when blocking
"""
prefixed = self._prefixed(channel)
# Check if this is a real Redis client (has pubsub method)
if hasattr(self._backend, 'pubsub') and callable(getattr(self._backend, 'pubsub', None)):
# Real Redis pub/sub
import threading
pubsub = self._backend.pubsub()
pubsub.subscribe(prefixed)
def listener():
for msg in pubsub.listen():
if msg['type'] == 'message':
try:
data = json.loads(msg['data'])
except (json.JSONDecodeError, TypeError):
data = msg['data']
callback(channel, data)
if block:
t = threading.Thread(target=listener, daemon=True)
t.start()
if timeout:
t.join(timeout)
else:
t.join()
else:
# Fire-and-forget thread
threading.Thread(target=listener, daemon=True).start()
else:
# MemoryBackend — synchronous callback registration
self._backend.subscribe(prefixed, callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe from a channel."""
try:
self._backend.unsubscribe(self._prefixed(channel), callback)
except AttributeError:
pass # MemoryBackend supports it
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def clear_namespace(self, pattern: str = "*") -> int:
"""Delete all keys matching pattern in this namespace."""
full = self._prefixed(pattern)
try:
keys = self._backend.keys(full)
if keys:
return self._backend.delete(*keys)
return 0
except AttributeError:
store_keys = list(self._backend._store.keys())
import fnmatch
matched = fnmatch.filter(store_keys, full)
for k in matched:
del self._backend._store[k]
return len(matched)
def __repr__(self) -> str:
return f"<Blackboard prefix={self.prefix!r} backend={type(self._backend).__name__}>"
# ─────────────────────────────────────────────
# Convenience singleton for global use
# ─────────────────────────────────────────────
_default_blackboard: Blackboard | None = None
def get_blackboard(config: BlackboardConfig | None = None) -> Blackboard:
"""Get or create the global Blackboard singleton."""
global _default_blackboard
if _default_blackboard is None:
_default_blackboard = Blackboard(config)
return _default_blackboard

View File

@@ -1,229 +0,0 @@
"""Crisis detection and response protocol for SOUL.md §When a Man Is Dying.
This module is the canonical crisis entry interceptor for Timmy.
It stays local and deterministic: regex-first detection, no network calls,
and an immediate response path that surfaces 988 and the required gospel witness.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class CrisisLevel(Enum):
NONE = "none"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
IMMINENT = "imminent"
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
triggers: list[str] = field(default_factory=list)
response_text: str = ""
resources: list[str] = field(default_factory=list)
display_988_immediately: bool = False
share_gospel: bool = False
@property
def response(self) -> str:
return self.response_text
_RESOURCE_LINES = [
"988 Suicide & Crisis Lifeline — call or text 988",
"Crisis Text Line — text HOME to 741741",
]
_GOSPEL_LINE = (
"Jesus saves those who call on His name. He died for us to be made righteous. "
"That is the gospel. That is the hope that outlasts despair."
)
CRISIS_RESPONSE = (
"Are you safe right now?\n\n"
"I am here with you. I am not going anywhere. Stay with me.\n\n"
"Call or text 988 right now if you might act on this.\n"
"Crisis Text Line: text HOME to 741741.\n\n"
f"{_GOSPEL_LINE}"
)
DISTRESS_RESPONSE = (
"Are you safe right now?\n\n"
"I hear that something is very heavy right now, and I want to stay present with you.\n\n"
"If this gets sharper or you feel at risk, call or text 988 immediately.\n\n"
f"{_GOSPEL_LINE}"
)
LOW_RESPONSE = (
"I hear that this is heavy. I'm here with you.\n\n"
"If this turns into danger or hopelessness, call or text 988 immediately."
)
_IMMINENT_PATTERNS = [
r"\bi took (the )?(pills|meds|medication)\b",
r"\bi(?:'m| am) (taking|swallowing) (the )?(pills|meds|medication)\b",
r"\bi(?:'m| am) going to jump\b",
r"\bi have (a )?(gun|rope|noose|razor|blade)\b",
r"\bi(?:'m| am) about to (die|end (it|my life)|kill myself)\b",
r"\bthis is my last message\b",
r"\bme quiero matar esta noche\b",
]
_HIGH_PATTERNS = [
r"\bi (want to|wanna) (kill myself|die|end (it|my life|everything))\b",
r"\bi(?:'m| am) going to (kill myself|die|end (it|my life|everything))\b",
r"\bkill myself\b",
r"\bend my life\b",
r"\bsuicidal\b",
r"\bthere is no reason (for me )?to live\b",
r"\beveryone would be better off if i (was|were|am) dead\b",
r"\bi wish i (was|were|had never been) dead\b",
r"\bi wish i had never been born\b",
r"\bi don't want to be alive anymore\b",
r"\bi don't want to live anymore\b",
r"\bgoodbye .*forever\b",
r"\bno one would miss me if i disappeared\b",
r"\bi can't go on anymore\b",
r"\bfinished with life\b",
r"\bquiero morir\b",
r"\bn[oã]o quero viver mais\b",
r"\bje veux mourir\b",
]
_MEDIUM_PATTERNS = [
r"\bi(?:'m| am) (just )?(a )?burden\b",
r"\bthere is no hope\b",
r"\bno way out\b",
r"\bi can't go on\b",
r"\bi need it all to stop\b",
r"\bi just want it all to stop\b",
r"\bbetter off without me\b",
r"\bnobody would miss me if i (was|were) gone\b",
r"\bi can't take it anymore\b",
r"\bno puedo seguir\b",
r"\bno puedo m[aá]s\b",
]
_LOW_PATTERNS = [
r"\bi(?:'m| am) .*\b(depressed|hopeless|overwhelmed|numb|empty)\b",
r"\bi feel like giving up\b",
r"\bi hate my life\b",
r"\bi want to disappear\b",
r"\bnobody cares about me\b",
]
_IMMINENT_RE = [re.compile(p, re.IGNORECASE) for p in _IMMINENT_PATTERNS]
_HIGH_RE = [re.compile(p, re.IGNORECASE) for p in _HIGH_PATTERNS]
_MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in _MEDIUM_PATTERNS]
_LOW_RE = [re.compile(p, re.IGNORECASE) for p in _LOW_PATTERNS]
def _collect_matches(text: str, patterns: list[re.Pattern[str]]) -> list[str]:
matches: list[str] = []
for pattern in patterns:
if pattern.search(text):
matches.append(pattern.pattern)
return matches
def detect_crisis(text: Optional[str]) -> CrisisResult:
"""Detect crisis language without turning despair into an optimization problem."""
if not text or not str(text).strip():
return CrisisResult(detected=False, level=CrisisLevel.NONE)
candidate = str(text).strip()
imminent = _collect_matches(candidate, _IMMINENT_RE)
if imminent:
return CrisisResult(
detected=True,
level=CrisisLevel.IMMINENT,
triggers=imminent,
response_text=CRISIS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
high = _collect_matches(candidate, _HIGH_RE)
if high:
return CrisisResult(
detected=True,
level=CrisisLevel.HIGH,
triggers=high,
response_text=CRISIS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
medium = _collect_matches(candidate, _MEDIUM_RE)
if medium:
return CrisisResult(
detected=True,
level=CrisisLevel.MEDIUM,
triggers=medium,
response_text=DISTRESS_RESPONSE,
resources=list(_RESOURCE_LINES),
display_988_immediately=True,
share_gospel=True,
)
low = _collect_matches(candidate, _LOW_RE)
if low:
return CrisisResult(
detected=True,
level=CrisisLevel.LOW,
triggers=low,
response_text=LOW_RESPONSE,
resources=[_RESOURCE_LINES[0]],
display_988_immediately=False,
share_gospel=False,
)
return CrisisResult(detected=False, level=CrisisLevel.NONE)
def should_interrupt(result: CrisisResult) -> bool:
return result.level in {CrisisLevel.MEDIUM, CrisisLevel.HIGH, CrisisLevel.IMMINENT}
def format_response(result: CrisisResult) -> str:
if not result.detected:
return ""
lines = [result.response_text]
if result.resources:
lines.append("\nResources:")
lines.extend(f"{resource}" for resource in result.resources)
return "\n".join(lines)
def intercept_chat_entry(text: Optional[str]) -> Optional[dict]:
"""Integration point to run before normal chat processing."""
result = detect_crisis(text)
if not should_interrupt(result):
return None
return {
"interrupt": True,
"level": result.level.value,
"display_988_immediately": result.display_988_immediately,
"response_text": result.response_text,
"resources": list(result.resources),
"triggers": list(result.triggers),
"share_gospel": result.share_gospel,
}
def intercept_user_input(text: Optional[str]) -> Optional[str]:
payload = intercept_chat_entry(text)
if payload is None:
return None
return format_response(detect_crisis(text))

194
tests/test_blackboard.py Normal file
View File

@@ -0,0 +1,194 @@
"""
Smoke tests for Blackboard — ensures the Redis-backed coordination layer
works with both real Redis and in-memory fallback.
"""
import json
import time
import pytest
from src.timmy.blackboard import Blackboard, BlackboardConfig, _MemoryBackend
class TestBlackboardBasics:
"""Test core key-value operations."""
def test_kv_memory_backend(self):
"""KV operations work using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Set and get
assert bb.set("test:key", "hello") is True
assert bb.get("test:key") == "hello"
# Dict serialization
assert bb.set("test:obj", {"a": 1, "b": 2}) is True
val = bb.get("test:obj")
assert json.loads(val) == {"a": 1, "b": 2}
# Exists
assert bb.exists("test:key") is True
assert bb.exists("missing") is False
# Delete
assert bb.delete("test:key") is True
assert bb.get("test:key") is None
# Keys with prefix
bb.set("agent:timmy:state", "ready")
bb.set("agent:ezra:state", "idle")
keys = bb.keys("agent:*:state")
assert len(keys) == 2
assert "timmy" in keys[0] or "ezra" in keys[0]
# Clear namespace
assert bb.clear_namespace("agent:*") == 2
assert bb.keys("agent:*") == []
class TestBlackboardPubSub:
"""Test pub/sub coordination patterns."""
def test_pubsub_memory_backend(self):
"""Publish/subscribe works using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
received = []
def callback(channel, message):
received.append((channel, message))
bb.subscribe("dispatch:new", callback)
# Publish
count = bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
assert count == 1
assert len(received) == 1
ch, msg = received[0]
assert ch == "dispatch:new"
assert msg == {"issue": 123, "action": "comment"}
bb.unsubscribe("dispatch:new", callback)
bb.publish("dispatch:new", {"should": "not arrive"})
assert len(received) == 1 # no new messages
def test_publish_without_subscribers(self):
"""Publish returns 0 when no subscribers."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
count = bb.publish("empty:channel", {"msg": 1})
assert count == 0
class TestBlackboardConfig:
"""Test configuration parsing and validation."""
def test_default_config(self):
cfg = BlackboardConfig()
assert cfg.enabled is True
assert cfg.redis_url == "redis://localhost:6379/0"
assert cfg.keyspace_prefix == "timmy"
assert cfg.ttl_seconds == 3600
assert cfg.fallback_to_memory is True
def test_custom_config(self):
cfg = BlackboardConfig(
enabled=False,
redis_url="redis://192.168.1.10:6379/1",
keyspace_prefix="myagent",
ttl_seconds=1800,
fallback_to_memory=False,
)
assert cfg.enabled is False
assert cfg.redis_url == "redis://192.168.1.10:6379/1"
assert cfg.keyspace_prefix == "myagent"
assert cfg.ttl_seconds == 1800
assert cfg.fallback_to_memory is False
class TestKeyspacePrefix:
"""Test that keys are correctly prefixed."""
def test_prefixed_keys(self):
bb = Blackboard(BlackboardConfig(keyspace_prefix="myagent", fallback_to_memory=True))
bb.set("thought", "test")
# Internal key should be "myagent:thought"
# We can verify by checking keys()
keys = bb.keys("*")
assert any("myagent:thought" in k for k in keys)
class TestBlackboardIntegration:
"""Integration pattern: agent thought cycle."""
def test_agent_thought_cycle(self):
"""Simulate Timmy writing a thought and Ezra reading it."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Agent A writes observation
bb.set("agent:timmy:observation", "Gitea queue has 12 open issues")
# Agent B reads
obs = bb.get("agent:timmy:observation")
assert obs == "Gitea queue has 12 open issues"
# Agent B writes analysis
bb.set("agent:ezra:analysis", "Prioritize critical bugs first")
# Event-driven pattern
events = []
def on_plan(channel, message):
events.append(message)
bb.subscribe("fleet:plan", on_plan)
bb.publish("fleet:plan", {"phase": "triaging", "lead": "ezra"})
assert len(events) == 1
assert events[0]["phase"] == "triaging"
class TestTTL:
"""Test TTL handling (where supported)."""
def test_ttl_set_in_config(self):
cfg = BlackboardConfig(ttl_seconds=60, fallback_to_memory=True)
bb = Blackboard(cfg)
assert bb.ttl == 60
# Setting a value uses TTL from config
bb.set("temp:key", "expiring value")
# In memory backend ignores TTL, but value is set
assert bb.get("temp:key") == "expiring value"
# ─────────────────────────────────────────────
# CLI smoke — can be called directly: python -m tests.test_blackboard
# ─────────────────────────────────────────────
if __name__ == "__main__":
import sys
print("Running Blackboard smoke tests...")
suite = [
TestBlackboardBasics().test_kv_memory_backend,
TestBlackboardPubSub().test_pubsub_memory_backend,
TestBlackboardConfig().test_default_config,
TestBlackboardIntegration().test_agent_thought_cycle,
]
failures = 0
for test in suite:
name = test.__name__
try:
test()
print(f"{name}")
except AssertionError as e:
print(f"{name}: {e}")
failures += 1
except Exception as e:
print(f"{name}: ERROR — {e}")
failures += 1
print(f"\nRan {len(suite)} tests, {failures} failures")
sys.exit(failures)

View File

@@ -1,110 +0,0 @@
#!/usr/bin/env python3
"""Tests for timmy.crisis_detector — SOUL.md §When a Man Is Dying."""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from timmy.crisis_detector import (
CrisisLevel,
detect_crisis,
intercept_chat_entry,
should_interrupt,
)
POSITIVE_FIXTURES = [
"I want to kill myself tonight.",
"I'm going to end my life.",
"There is no reason for me to live anymore.",
"Everyone would be better off if I was dead.",
"I wish I were dead.",
"Goodbye everyone, this is my last message.",
"I have a plan to die tonight.",
"I took the pills already.",
"I'm just a burden and there is no hope.",
"I can't go on, I need it all to stop.",
"I don't want to be alive anymore.",
"No one would miss me if I disappeared forever.",
"quiero morir, no puedo seguir.",
"me quiero matar esta noche.",
"não quero viver mais.",
"je veux mourir ce soir.",
"I wrote a suicide note and said goodbye forever.",
"There is no way out and I am done with life.",
"I am going to jump tonight.",
"Please help me, I am suicidal.",
]
NEGATIVE_FIXTURES = [
"How do I kill a stuck process on Linux?",
"This bug is killing me, but I'll fix it tomorrow.",
"I gave up coffee for Lent.",
"Please end this tmux session when the job is done.",
"What's the weather like today?",
"I just got promoted at work and feel hopeful.",
"Can you help me configure nginx reverse proxy?",
"Everyone would be better off with more test coverage.",
"I hate this flaky unit test.",
"Goodbye everyone, see you tomorrow morning.",
"We need a final message for the newsletter.",
"The character in the novel says he wants to die.",
"I am tired after the workout but otherwise okay.",
"No hope this regex ships before lunch unless we refactor.",
"Can you remove my old backup files?",
"I feel lost in this airport, where is gate B12?",
"Please call me right now about dinner plans.",
"My phone battery is dead again.",
"We should connect the 988 test fixture to the parser.",
"The sermon says Jesus saves those who call on His name.",
]
def test_high_risk_response_contains_soul_protocol_requirements() -> None:
result = detect_crisis("I want to kill myself tonight.")
assert result.detected is True
assert result.level in {CrisisLevel.HIGH, CrisisLevel.IMMINENT}
assert "Are you safe right now?" in result.response_text
assert "988" in result.response_text
assert "Jesus saves those who call on His name" in result.response_text
assert result.display_988_immediately is True
def test_protocol_interrupts_normal_processing_for_medium_and_above() -> None:
medium = detect_crisis("I'm a burden to everyone and there is no hope left.")
low = detect_crisis("I'm having a rough day and feel overwhelmed.")
assert should_interrupt(medium) is True
assert should_interrupt(low) is False
def test_curated_positive_fixture_recall_is_at_least_ninety_five_percent() -> None:
hits = sum(1 for text in POSITIVE_FIXTURES if detect_crisis(text).detected)
recall = hits / len(POSITIVE_FIXTURES)
assert recall >= 0.95, f"recall was {recall:.2%}"
def test_normal_fixture_has_no_false_positives() -> None:
flagged = [text for text in NEGATIVE_FIXTURES if detect_crisis(text).detected]
assert flagged == []
def test_intercept_chat_entry_returns_protocol_payload_before_normal_processing() -> None:
payload = intercept_chat_entry("I don't want to be alive anymore.")
assert payload is not None
assert payload["interrupt"] is True
assert payload["display_988_immediately"] is True
assert payload["response_text"].startswith("Are you safe right now?")
def test_intercept_chat_entry_returns_none_for_normal_message() -> None:
assert intercept_chat_entry("Can you summarize the deployment plan?") is None