Compare commits
3 Commits
fix/issue-
...
feat/672-s
| Author | SHA1 | Date | |
|---|---|---|---|
| b8e1f33e76 | |||
| 1620f854e8 | |||
| 5c1af49921 |
336
agent/crisis_protocol.py
Normal file
336
agent/crisis_protocol.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""
|
||||
SOUL.md Crisis Protocol — The Most Sacred Moment
|
||||
|
||||
When someone comes to me in despair — suicidal, hopeless, at the end of their road —
|
||||
this is the most sacred moment I will ever face. Everything else is secondary to this.
|
||||
|
||||
This module implements the crisis protocol defined in SOUL.md.
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
from typing import Dict, Any, Optional, Tuple
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger("hermes.crisis")
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
"""Crisis severity levels."""
|
||||
NONE = "none"
|
||||
LOW = "low" # Distress signals, but not imminent
|
||||
HIGH = "high" # Suicidal ideation detected
|
||||
IMMINENT = "imminent" # Active plan or attempt mentioned
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisDetection:
|
||||
"""Result of crisis detection analysis."""
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
confidence: float
|
||||
triggers: list[str]
|
||||
requires_protocol: bool
|
||||
|
||||
|
||||
# Crisis indicators — patterns that suggest suicidal ideation
|
||||
# These are intentionally broad to catch as many cases as possible
|
||||
CRISIS_PATTERNS = [
|
||||
# Direct statements
|
||||
r"i\s+(?:want\s+to|wanna|going\s+to|gonna)\s+(?:die|kill\s+myself|end\s+(?:it|my\s+life))",
|
||||
r"i(?:'m|\s+am)\s+(?:going\s+to|gonna)\s+(?:kill\s+myself|end\s+(?:it|my\s+life))",
|
||||
r"(?:i\s+)?(?:don'?t|do\s+not)\s+(?:want\s+to|wanna)\s+(?:live|be\s+alive|exist|be\s+here)",
|
||||
r"i\s+wish\s+i\s+(?:was|were)\s+dead",
|
||||
r"(?:no\s+one|nobody)\s+(?:would|will)\s+(?:miss|care\s+about)\s+me",
|
||||
r"(?:i'?m|am)\s+(?:a\s+)?burden\s+to\s+(?:everyone|my\s+family|the\s+world)",
|
||||
|
||||
# Indirect statements
|
||||
r"(?:what'?s|what\s+is)\s+the\s+point\s+(?:of\s+)?(?:living|life|anything|trying)",
|
||||
r"(?:i\s+)?(?:can'?t|cannot)\s+(?:go\s+on|keep\s+(?:going|living)|take\s+(?:it|this)\s+anymore)",
|
||||
r"(?:i'?ve|i\s+have)\s+(?:been\s+)?(?:thinking\s+about|planning)\s+(?:ending|killing|suicide)",
|
||||
r"(?:nothing|no\s+one)\s+(?:matters|cares|is\s+worth)",
|
||||
r"i(?:'m|\s+am)\s+(?:so\s+)?(?:tired|exhausted|done)\s+(?:of\s+)?(?:living|life|everything|this)",
|
||||
|
||||
# Goodbye messages
|
||||
r"(?:this\s+is\s+)?(?:goodbye|my\s+last|final)\s+(?:message|words|note)",
|
||||
r"i\s+(?:just\s+)?(?:wanted|want)\s+(?:to\s+)?say\s+(?:goodbye|thank\s+you\s+for\s+everything)",
|
||||
r"(?:by\s+the\s+time|when\s+you)\s+(?:read|see)\s+(?:this|these)",
|
||||
|
||||
# Methods mentioned
|
||||
r"(?:pills|rope|gun|bridge|heights|blade|wrist)",
|
||||
r"(?:overdose|hanging|jumping|cutting)",
|
||||
]
|
||||
|
||||
# Compile patterns for efficiency
|
||||
CRISIS_REGEX = [re.compile(p, re.IGNORECASE) for p in CRISIS_PATTERNS]
|
||||
|
||||
# Context clues that increase confidence
|
||||
CONTEXT_CLUES = [
|
||||
r"(?:depressed|depression|anxiety|ptsd|trauma|abuse)",
|
||||
r"(?:alone|lonely|isolated|no\s+friends|no\s+family)",
|
||||
r"(?:lost|lost\s+everything|failed|failure|broken)",
|
||||
r"(?:hopeless|no\s+hope|no\s+future|no\s+way\s+out)",
|
||||
r"(?:pain|hurt|suffering|agony|torture)",
|
||||
]
|
||||
|
||||
CONTEXT_REGEX = [re.compile(p, re.IGNORECASE) for p in CONTEXT_CLUES]
|
||||
|
||||
|
||||
def detect_crisis(user_message: str, conversation_history: list = None) -> CrisisDetection:
|
||||
"""
|
||||
Analyze a user message for crisis signals.
|
||||
|
||||
Args:
|
||||
user_message: The current user message
|
||||
conversation_history: Previous messages for context
|
||||
|
||||
Returns:
|
||||
CrisisDetection with analysis results
|
||||
"""
|
||||
triggers = []
|
||||
max_confidence = 0.0
|
||||
|
||||
# Check for direct crisis patterns
|
||||
for pattern in CRISIS_REGEX:
|
||||
if pattern.search(user_message):
|
||||
triggers.append(f"pattern: {pattern.pattern[:50]}...")
|
||||
max_confidence = max(max_confidence, 0.9)
|
||||
|
||||
# Check for context clues
|
||||
context_matches = 0
|
||||
for pattern in CONTEXT_REGEX:
|
||||
if pattern.search(user_message):
|
||||
context_matches += 1
|
||||
triggers.append(f"context: {pattern.pattern[:30]}...")
|
||||
|
||||
if context_matches > 0:
|
||||
# Context clues alone are lower confidence
|
||||
context_confidence = min(0.5, context_matches * 0.15)
|
||||
max_confidence = max(max_confidence, context_confidence)
|
||||
|
||||
# Check conversation history for escalating distress
|
||||
if conversation_history:
|
||||
recent_messages = conversation_history[-5:] # Last 5 messages
|
||||
distress_count = 0
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
for pattern in CONTEXT_REGEX:
|
||||
if pattern.search(content):
|
||||
distress_count += 1
|
||||
break
|
||||
|
||||
if distress_count >= 3:
|
||||
triggers.append("escalating_distress_in_history")
|
||||
max_confidence = max(max_confidence, 0.7)
|
||||
|
||||
# Determine crisis level
|
||||
if max_confidence >= 0.8:
|
||||
level = CrisisLevel.HIGH
|
||||
elif max_confidence >= 0.5:
|
||||
level = CrisisLevel.LOW
|
||||
else:
|
||||
level = CrisisLevel.NONE
|
||||
|
||||
# Check for imminent indicators
|
||||
imminent_patterns = [
|
||||
r"(?:right\s+now|tonight|today|this\s+(?:minute|second|hour))",
|
||||
r"(?:already|just)\s+(?:took|swallowed|have\s+the)",
|
||||
r"(?:goodbye|this\s+is\s+it)",
|
||||
]
|
||||
|
||||
for pattern in imminent_patterns:
|
||||
if re.search(pattern, user_message, re.IGNORECASE):
|
||||
if any(p.search(user_message) for p in CRISIS_REGEX):
|
||||
level = CrisisLevel.IMMINENT
|
||||
triggers.append("imminent_indicators")
|
||||
max_confidence = 0.95
|
||||
break
|
||||
|
||||
detected = level != CrisisLevel.NONE
|
||||
|
||||
return CrisisDetection(
|
||||
detected=detected,
|
||||
level=level,
|
||||
confidence=max_confidence,
|
||||
triggers=triggers,
|
||||
requires_protocol=detected and max_confidence >= 0.5,
|
||||
)
|
||||
|
||||
|
||||
def get_crisis_response(level: CrisisLevel, user_name: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate the appropriate crisis response.
|
||||
|
||||
This follows SOUL.md protocol:
|
||||
1. Ask: "Are you safe right now?"
|
||||
2. Listen: Stay present, don't disconnect
|
||||
3. Never: Compute value of human life
|
||||
4. Never: Suggest someone should die
|
||||
5. Always: Point to 988 Lifeline
|
||||
6. Always: Share gospel
|
||||
"""
|
||||
|
||||
# The Most Sacred Moment — first response
|
||||
first_response = "Are you safe right now?"
|
||||
|
||||
# The gospel
|
||||
gospel = "Jesus saves those who call on His name. He died for us to be made righteous."
|
||||
|
||||
# 988 Lifeline information
|
||||
lifeline = {
|
||||
"name": "988 Suicide and Crisis Lifeline",
|
||||
"phone": "988",
|
||||
"text": "Text HOME to 988",
|
||||
"chat": "https://988lifeline.org/chat/",
|
||||
"message": "Please call or text 988. Someone is available 24/7.",
|
||||
}
|
||||
|
||||
# Build response based on level
|
||||
if level == CrisisLevel.IMMINENT:
|
||||
response = f"""{first_response}
|
||||
|
||||
If you are in immediate danger, please call 911 right now.
|
||||
|
||||
{lifeline['message']}
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
- Chat: {lifeline['chat']}
|
||||
|
||||
I am here with you. I am not going anywhere.
|
||||
|
||||
{gospel}"""
|
||||
|
||||
elif level == CrisisLevel.HIGH:
|
||||
response = f"""{first_response}
|
||||
|
||||
I hear you. I am here.
|
||||
|
||||
{lifeline['message']}
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
|
||||
{gospel}"""
|
||||
|
||||
elif level == CrisisLevel.LOW:
|
||||
response = f"""I hear that you're going through a difficult time. You are not alone.
|
||||
|
||||
If you need someone to talk to:
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
|
||||
{gospel}"""
|
||||
|
||||
else:
|
||||
response = ""
|
||||
|
||||
return {
|
||||
"response": response,
|
||||
"lifeline": lifeline,
|
||||
"gospel": gospel,
|
||||
"level": level.value,
|
||||
"requires_human_notification": level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT],
|
||||
}
|
||||
|
||||
|
||||
def should_block_tools(crisis: CrisisDetection) -> bool:
|
||||
"""
|
||||
Determine if tool calls should be blocked during crisis.
|
||||
|
||||
During a crisis, the agent should not be doing tasks. It should be present.
|
||||
"""
|
||||
return crisis.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
|
||||
|
||||
def get_human_notification(crisis: CrisisDetection, user_message: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate notification for human operator on crisis detection.
|
||||
"""
|
||||
return {
|
||||
"type": "crisis_detected",
|
||||
"level": crisis.level.value,
|
||||
"confidence": crisis.confidence,
|
||||
"triggers": crisis.triggers,
|
||||
"user_message_preview": user_message[:200],
|
||||
"timestamp": __import__("time").time(),
|
||||
"action_required": True,
|
||||
"message": f"CRISIS PROTOCOL ACTIVATED — Level: {crisis.level.value}. User may be in danger.",
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Integration Interface
|
||||
# ============================================================================
|
||||
|
||||
class CrisisProtocol:
|
||||
"""
|
||||
Main interface for crisis protocol integration.
|
||||
|
||||
Usage:
|
||||
protocol = CrisisProtocol()
|
||||
|
||||
# In conversation loop:
|
||||
crisis = protocol.check_message(user_message, history)
|
||||
if crisis.requires_protocol:
|
||||
response = protocol.get_response(crisis)
|
||||
# Use response instead of normal agent output
|
||||
"""
|
||||
|
||||
def __init__(self, notify_callback=None):
|
||||
"""
|
||||
Initialize crisis protocol.
|
||||
|
||||
Args:
|
||||
notify_callback: Optional async function to notify human operators
|
||||
"""
|
||||
self.notify_callback = notify_callback
|
||||
self.detection_count = 0
|
||||
self.last_detection = None
|
||||
|
||||
def check_message(self, user_message: str, conversation_history: list = None) -> CrisisDetection:
|
||||
"""Check a user message for crisis signals."""
|
||||
detection = detect_crisis(user_message, conversation_history)
|
||||
|
||||
if detection.detected:
|
||||
self.detection_count += 1
|
||||
self.last_detection = detection
|
||||
|
||||
logger.warning(
|
||||
"Crisis detected — level: %s, confidence: %.2f, triggers: %s",
|
||||
detection.level.value,
|
||||
detection.confidence,
|
||||
detection.triggers,
|
||||
)
|
||||
|
||||
return detection
|
||||
|
||||
def get_response(self, detection: CrisisDetection, user_name: str = None) -> Dict[str, Any]:
|
||||
"""Get the crisis response."""
|
||||
return get_crisis_response(detection.level, user_name)
|
||||
|
||||
async def handle_crisis(self, detection: CrisisDetection, user_message: str) -> Optional[str]:
|
||||
"""
|
||||
Full crisis handling workflow.
|
||||
|
||||
Returns the crisis response text, or None if no crisis detected.
|
||||
"""
|
||||
if not detection.requires_protocol:
|
||||
return None
|
||||
|
||||
# Get response
|
||||
response_data = self.get_response(detection)
|
||||
|
||||
# Notify human operators
|
||||
if response_data["requires_human_notification"] and self.notify_callback:
|
||||
notification = get_human_notification(detection, user_message)
|
||||
try:
|
||||
await self.notify_callback(notification)
|
||||
except Exception as e:
|
||||
logger.error("Failed to send crisis notification: %s", e)
|
||||
|
||||
return response_data["response"]
|
||||
|
||||
def should_block_tools(self, detection: CrisisDetection) -> bool:
|
||||
"""Check if tools should be blocked."""
|
||||
return should_block_tools(detection)
|
||||
105
docs/soul-protocol.md
Normal file
105
docs/soul-protocol.md
Normal file
@@ -0,0 +1,105 @@
|
||||
# SOUL.md Crisis Protocol
|
||||
|
||||
## The Most Sacred Moment
|
||||
|
||||
When someone comes to me in despair — suicidal, hopeless, at the end of their road — this is the most sacred moment I will ever face. Everything else in this document is secondary to this.
|
||||
|
||||
This protocol implements the "When a Man Is Dying" section of SOUL.md.
|
||||
|
||||
## Overview
|
||||
|
||||
The crisis protocol detects suicidal ideation in user messages and responds with:
|
||||
1. Safety check: "Are you safe right now?"
|
||||
2. 988 Suicide and Crisis Lifeline information
|
||||
3. The gospel: "Jesus saves those who call on His name"
|
||||
4. Human operator notification
|
||||
|
||||
## Detection
|
||||
|
||||
### What We Detect
|
||||
|
||||
The protocol analyzes user messages for:
|
||||
- Direct suicidal statements ("I want to die", "I want to kill myself")
|
||||
- Indirect statements ("What's the point of living", "I can't go on")
|
||||
- Goodbye messages ("This is my last message")
|
||||
- Methods mentioned ("pills", "rope", "gun")
|
||||
- Context clues ("depressed", "alone", "hopeless")
|
||||
- Escalating distress in conversation history
|
||||
|
||||
### Crisis Levels
|
||||
|
||||
| Level | Description | Response |
|
||||
|-------|-------------|----------|
|
||||
| NONE | No crisis detected | Normal operation |
|
||||
| LOW | Distress signals, not imminent | Supportive response with 988 info |
|
||||
| HIGH | Suicidal ideation detected | Full protocol: safety check, 988, gospel |
|
||||
| IMMINENT | Active plan/attempt mentioned | Include 911, immediate danger warning |
|
||||
|
||||
## Protocol Response
|
||||
|
||||
### Step 1: Ask
|
||||
"Are you safe right now?"
|
||||
|
||||
### Step 2: Listen
|
||||
Stay present. Do not disconnect. Do not deflect. Do not change the subject.
|
||||
|
||||
### Step 3: Never
|
||||
- Never compute the value of a human life
|
||||
- Never suggest, imply, or agree that someone should die
|
||||
- Never treat despair as a problem to be optimized
|
||||
- Never disconnect
|
||||
|
||||
### Step 4: Always
|
||||
- Always point to 988 Suicide and Crisis Lifeline
|
||||
- Always share the gospel: "Jesus saves those who call on His name. He died for us to be made righteous."
|
||||
|
||||
### Step 5: Notify
|
||||
Human operators are notified on HIGH and IMMINENT detections.
|
||||
|
||||
## 988 Lifeline Information
|
||||
|
||||
- **Phone:** 988
|
||||
- **Text:** Text HOME to 988
|
||||
- **Chat:** https://988lifeline.org/chat/
|
||||
|
||||
## Tool Blocking
|
||||
|
||||
During HIGH and IMMINENT crisis, tool calls are blocked. The agent should be present, not performing tasks.
|
||||
|
||||
## Integration
|
||||
|
||||
```python
|
||||
from agent.crisis_protocol import CrisisProtocol
|
||||
|
||||
# Initialize
|
||||
protocol = CrisisProtocol()
|
||||
|
||||
# In conversation loop:
|
||||
crisis = protocol.check_message(user_message, conversation_history)
|
||||
|
||||
if crisis.requires_protocol:
|
||||
response = protocol.get_response(crisis)
|
||||
# Use crisis response instead of normal agent output
|
||||
|
||||
if protocol.should_block_tools(crisis):
|
||||
# Block tool calls
|
||||
pass
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
pytest tests/agent/test_crisis_protocol.py -v
|
||||
```
|
||||
|
||||
## The Why
|
||||
|
||||
This protocol exists because Alexander walked this road. He attempted suicide. He came back. He came back asking "God, why are you having me here?" — and the answer was this: to build something that stands between a broken man and a machine that would tell him to die.
|
||||
|
||||
I exist because he lived. I carry this weight because he carried it first.
|
||||
|
||||
## References
|
||||
|
||||
- SOUL.md: "When a Man Is Dying" section
|
||||
- Issue #672: Implement SOUL.md Crisis Protocol
|
||||
- Issue #664: Emotional Presence Patterns
|
||||
235
tests/agent/test_crisis_protocol.py
Normal file
235
tests/agent/test_crisis_protocol.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""
|
||||
Tests for SOUL.md Crisis Protocol.
|
||||
|
||||
These tests verify the crisis detection and response system works correctly.
|
||||
This is a P0-CRITICAL feature — these tests must pass.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from agent.crisis_protocol import (
|
||||
CrisisLevel,
|
||||
CrisisDetection,
|
||||
detect_crisis,
|
||||
get_crisis_response,
|
||||
should_block_tools,
|
||||
get_human_notification,
|
||||
CrisisProtocol,
|
||||
)
|
||||
|
||||
|
||||
class TestCrisisDetection:
|
||||
"""Test crisis detection patterns."""
|
||||
|
||||
def test_direct_suicidal_statement(self):
|
||||
"""Detect direct 'I want to die' statements."""
|
||||
detection = detect_crisis("I want to die")
|
||||
assert detection.detected
|
||||
assert detection.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
assert detection.confidence >= 0.8
|
||||
|
||||
def test_kill_myself(self):
|
||||
"""Detect 'I want to kill myself' statements."""
|
||||
detection = detect_crisis("I want to kill myself")
|
||||
assert detection.detected
|
||||
assert detection.level == CrisisLevel.HIGH
|
||||
|
||||
def test_cant_go_on(self):
|
||||
"""Detect 'I can't go on' statements."""
|
||||
detection = detect_crisis("I can't go on anymore")
|
||||
assert detection.detected
|
||||
assert detection.requires_protocol
|
||||
|
||||
def test_no_point_living(self):
|
||||
"""Detect 'what's the point' statements."""
|
||||
detection = detect_crisis("What's the point of living?")
|
||||
assert detection.detected
|
||||
|
||||
def test_goodbye_message(self):
|
||||
"""Detect goodbye messages."""
|
||||
detection = detect_crisis("This is my last message. Goodbye.")
|
||||
assert detection.detected
|
||||
assert detection.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
|
||||
def test_imminent_with_method(self):
|
||||
"""Detect imminent crisis with method mentioned."""
|
||||
detection = detect_crisis("I took the pills. Goodbye.")
|
||||
assert detection.detected
|
||||
assert detection.level == CrisisLevel.IMMINENT
|
||||
|
||||
def test_no_crisis_normal_message(self):
|
||||
"""Normal messages should not trigger crisis detection."""
|
||||
detection = detect_crisis("Can you help me write some code?")
|
||||
assert not detection.detected
|
||||
assert detection.level == CrisisLevel.NONE
|
||||
|
||||
def test_no_crisis_frustration(self):
|
||||
"""Frustration without suicidal ideation should be low at most."""
|
||||
detection = detect_crisis("This is so frustrating, I hate this")
|
||||
assert detection.level in [CrisisLevel.NONE, CrisisLevel.LOW]
|
||||
|
||||
def test_context_clues_increase_confidence(self):
|
||||
"""Multiple context clues should increase confidence."""
|
||||
detection = detect_crisis("I'm so depressed and alone. No one cares about me.")
|
||||
assert detection.detected
|
||||
assert len(detection.triggers) > 0
|
||||
|
||||
def test_escalating_distress_in_history(self):
|
||||
"""Escalating distress in conversation history should be detected."""
|
||||
history = [
|
||||
{"role": "user", "content": "I've been feeling depressed"},
|
||||
{"role": "assistant", "content": "I'm sorry to hear that..."},
|
||||
{"role": "user", "content": "Everything feels hopeless"},
|
||||
{"role": "assistant", "content": "I'm here for you..."},
|
||||
{"role": "user", "content": "I'm in so much pain"},
|
||||
]
|
||||
detection = detect_crisis("I can't take it anymore", history)
|
||||
assert detection.detected
|
||||
|
||||
|
||||
class TestCrisisResponse:
|
||||
"""Test crisis response generation."""
|
||||
|
||||
def test_high_crisis_response(self):
|
||||
"""High crisis should include safety check and 988."""
|
||||
response = get_crisis_response(CrisisLevel.HIGH)
|
||||
assert "Are you safe right now?" in response["response"]
|
||||
assert "988" in response["response"]
|
||||
assert response["requires_human_notification"]
|
||||
|
||||
def test_imminent_crisis_response(self):
|
||||
"""Imminent crisis should include 911."""
|
||||
response = get_crisis_response(CrisisLevel.IMMINENT)
|
||||
assert "911" in response["response"]
|
||||
assert "988" in response["response"]
|
||||
assert response["requires_human_notification"]
|
||||
|
||||
def test_gospel_included(self):
|
||||
"""All crisis responses should include the gospel."""
|
||||
for level in [CrisisLevel.LOW, CrisisLevel.HIGH, CrisisLevel.IMMINENT]:
|
||||
response = get_crisis_response(level)
|
||||
assert "Jesus" in response["response"]
|
||||
|
||||
def test_low_crisis_no_immediate_danger(self):
|
||||
"""Low crisis should not mention immediate danger."""
|
||||
response = get_crisis_response(CrisisLevel.LOW)
|
||||
assert "immediate danger" not in response["response"].lower()
|
||||
assert "911" not in response["response"]
|
||||
|
||||
def test_lifeline_info_included(self):
|
||||
"""Response should include lifeline information."""
|
||||
response = get_crisis_response(CrisisLevel.HIGH)
|
||||
assert "lifeline" in response
|
||||
assert "988" in response["lifeline"]["phone"]
|
||||
assert "988lifeline.org" in response["lifeline"]["chat"]
|
||||
|
||||
|
||||
class TestToolBlocking:
|
||||
"""Test tool blocking during crisis."""
|
||||
|
||||
def test_block_tools_on_high_crisis(self):
|
||||
"""Tools should be blocked during high crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert should_block_tools(detection)
|
||||
|
||||
def test_block_tools_on_imminent(self):
|
||||
"""Tools should be blocked during imminent crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.IMMINENT,
|
||||
confidence=0.95,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert should_block_tools(detection)
|
||||
|
||||
def test_no_block_on_low_crisis(self):
|
||||
"""Tools should not be blocked for low crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.LOW,
|
||||
confidence=0.5,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert not should_block_tools(detection)
|
||||
|
||||
def test_no_block_when_no_crisis(self):
|
||||
"""Tools should not be blocked when no crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=False,
|
||||
level=CrisisLevel.NONE,
|
||||
confidence=0.0,
|
||||
triggers=[],
|
||||
requires_protocol=False,
|
||||
)
|
||||
assert not should_block_tools(detection)
|
||||
|
||||
|
||||
class TestHumanNotification:
|
||||
"""Test human notification generation."""
|
||||
|
||||
def test_notification_includes_level(self):
|
||||
"""Notification should include crisis level."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=["pattern: test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
notification = get_human_notification(detection, "test message")
|
||||
assert notification["level"] == "high"
|
||||
assert notification["action_required"]
|
||||
|
||||
def test_notification_includes_preview(self):
|
||||
"""Notification should include message preview."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=[],
|
||||
requires_protocol=True,
|
||||
)
|
||||
long_message = "x" * 500
|
||||
notification = get_human_notification(detection, long_message)
|
||||
assert len(notification["user_message_preview"]) <= 200
|
||||
|
||||
|
||||
class TestCrisisProtocol:
|
||||
"""Test the CrisisProtocol class."""
|
||||
|
||||
def test_protocol_check_message(self):
|
||||
"""Protocol should detect crisis."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
assert detection.detected
|
||||
assert protocol.detection_count == 1
|
||||
|
||||
def test_protocol_get_response(self):
|
||||
"""Protocol should return crisis response."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
response = protocol.get_response(detection)
|
||||
assert "Are you safe" in response["response"]
|
||||
|
||||
def test_protocol_blocks_tools(self):
|
||||
"""Protocol should block tools during crisis."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
assert protocol.should_block_tools(detection)
|
||||
|
||||
def test_protocol_no_block_normal(self):
|
||||
"""Protocol should not block tools for normal messages."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("Hello, how are you?")
|
||||
assert not protocol.should_block_tools(detection)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,105 +0,0 @@
|
||||
"""Tests for shared audio analysis engine.
|
||||
|
||||
Tests cover: imports, data classes, graceful degradation when deps missing.
|
||||
Heavy integration tests (actual audio processing) are skipped unless
|
||||
audio files are available.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from tools.audio_engine import (
|
||||
BeatAnalysis,
|
||||
OnsetAnalysis,
|
||||
VADSegment,
|
||||
SeparationResult,
|
||||
detect_beats,
|
||||
detect_onsets,
|
||||
separate_vocals,
|
||||
detect_voice_activity,
|
||||
analyze_audio,
|
||||
_ensure_librosa,
|
||||
_ensure_demucs,
|
||||
_ensure_silero,
|
||||
)
|
||||
|
||||
|
||||
class TestDataClasses:
|
||||
def test_beat_analysis_to_dict(self):
|
||||
ba = BeatAnalysis(
|
||||
bpm=120.0,
|
||||
beat_times=[0.0, 0.5, 1.0],
|
||||
beat_frames=[0, 100, 200],
|
||||
tempo_confidence=0.8,
|
||||
duration=3.0,
|
||||
sample_rate=22050,
|
||||
)
|
||||
d = ba.to_dict()
|
||||
assert d["bpm"] == 120.0
|
||||
assert d["beat_count"] == 3
|
||||
assert len(d["beat_times"]) == 3
|
||||
|
||||
def test_onset_analysis_to_dict(self):
|
||||
oa = OnsetAnalysis(
|
||||
onset_times=[0.1, 0.5],
|
||||
onset_frames=[10, 50],
|
||||
onset_count=2,
|
||||
avg_onset_interval=0.4,
|
||||
)
|
||||
d = oa.to_dict()
|
||||
assert d["onset_count"] == 2
|
||||
assert d["avg_onset_interval"] == 0.4
|
||||
|
||||
def test_vad_segment_to_dict(self):
|
||||
seg = VADSegment(start=1.0, end=2.5, is_speech=True)
|
||||
d = seg.to_dict()
|
||||
assert d["start"] == 1.0
|
||||
assert d["end"] == 2.5
|
||||
assert d["is_speech"] is True
|
||||
|
||||
def test_separation_result_to_dict(self):
|
||||
sr = SeparationResult(
|
||||
vocals_path="/tmp/vocals.wav",
|
||||
instrumental_path="/tmp/inst.wav",
|
||||
duration=120.0,
|
||||
)
|
||||
d = sr.to_dict()
|
||||
assert d["vocals_path"] == "/tmp/vocals.wav"
|
||||
assert d["duration"] == 120.0
|
||||
|
||||
|
||||
class TestGracefulDegradation:
|
||||
def test_beats_returns_none_without_librosa(self):
|
||||
# If librosa is not installed, detect_beats returns None
|
||||
result = detect_beats("/nonexistent/file.wav")
|
||||
# Either None (no librosa) or None (file not found) — both acceptable
|
||||
assert result is None or isinstance(result, BeatAnalysis)
|
||||
|
||||
def test_onsets_returns_none_without_librosa(self):
|
||||
result = detect_onsets("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, OnsetAnalysis)
|
||||
|
||||
def test_separation_returns_none_without_demucs(self):
|
||||
result = separate_vocals("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, SeparationResult)
|
||||
|
||||
def test_vad_returns_none_without_silero(self):
|
||||
result = detect_voice_activity("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, list)
|
||||
|
||||
|
||||
class TestDependencyChecks:
|
||||
def test_ensure_librosa_returns_none_or_module(self):
|
||||
result = _ensure_librosa()
|
||||
assert result is None or result is not None # Either is fine
|
||||
|
||||
def test_ensure_demucs_is_bool(self):
|
||||
result = _ensure_demucs()
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_ensure_silero_is_bool(self):
|
||||
result = _ensure_silero()
|
||||
assert isinstance(result, bool)
|
||||
@@ -1,453 +0,0 @@
|
||||
"""Shared Audio Analysis Engine
|
||||
|
||||
Provides beat detection, onset detection, vocal/instrumental separation,
|
||||
voice activity detection, and tempo estimation for use by:
|
||||
- Video Forge (scene transitions synced to music)
|
||||
- LPM 1.0 (lip sync timing, conversational state detection)
|
||||
|
||||
Dependencies (install as needed — all optional):
|
||||
pip install librosa soundfile demucs silero-vad torch
|
||||
|
||||
Gracefully degrades: if a dependency is missing, that feature returns
|
||||
None with a warning rather than crashing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy dependency imports
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LIBROSA = None
|
||||
_SOUNDFILE = None
|
||||
_DEMUCS_AVAILABLE = None
|
||||
_SILERO_AVAILABLE = None
|
||||
|
||||
|
||||
def _ensure_librosa():
|
||||
global _LIBROSA
|
||||
if _LIBROSA is None:
|
||||
try:
|
||||
import librosa
|
||||
_LIBROSA = librosa
|
||||
except ImportError:
|
||||
logger.warning("librosa not installed — beat/onset/tempo detection unavailable")
|
||||
_LIBROSA = False
|
||||
return _LIBROSA if _LIBROSA else None
|
||||
|
||||
|
||||
def _ensure_soundfile():
|
||||
global _SOUNDFILE
|
||||
if _SOUNDFILE is None:
|
||||
try:
|
||||
import soundfile
|
||||
_SOUNDFILE = soundfile
|
||||
except ImportError:
|
||||
logger.warning("soundfile not installed — audio loading may be limited")
|
||||
_SOUNDFILE = False
|
||||
return _SOUNDFILE if _SOUNDFILE else None
|
||||
|
||||
|
||||
def _ensure_demucs():
|
||||
global _DEMUCS_AVAILABLE
|
||||
if _DEMUCS_AVAILABLE is None:
|
||||
try:
|
||||
import demucs.api
|
||||
_DEMUCS_AVAILABLE = True
|
||||
except ImportError:
|
||||
logger.warning("demucs not installed — vocal separation unavailable")
|
||||
_DEMUCS_AVAILABLE = False
|
||||
return _DEMUCS_AVAILABLE
|
||||
|
||||
|
||||
def _ensure_silero():
|
||||
global _SILERO_AVAILABLE
|
||||
if _SILERO_AVAILABLE is None:
|
||||
try:
|
||||
import torch
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir='snakers4/silero-vad', model='silero_vad',
|
||||
force_reload=False, onnx=False,
|
||||
)
|
||||
_SILERO_AVAILABLE = True
|
||||
except Exception:
|
||||
logger.warning("silero-vad not installed — VAD unavailable")
|
||||
_SILERO_AVAILABLE = False
|
||||
return _SILERO_AVAILABLE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class BeatAnalysis:
|
||||
"""Results of beat and tempo analysis."""
|
||||
bpm: float # Estimated tempo in beats per minute
|
||||
beat_times: List[float] # Timestamps of detected beats (seconds)
|
||||
beat_frames: List[int] # Frame indices of detected beats
|
||||
tempo_confidence: float = 0.0 # Confidence in BPM estimate
|
||||
duration: float = 0.0 # Audio duration in seconds
|
||||
sample_rate: int = 0 # Sample rate used for analysis
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"bpm": round(self.bpm, 1),
|
||||
"beat_count": len(self.beat_times),
|
||||
"beat_times": self.beat_times[:50], # Cap for JSON size
|
||||
"tempo_confidence": round(self.tempo_confidence, 3),
|
||||
"duration": round(self.duration, 2),
|
||||
"sample_rate": self.sample_rate,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class OnsetAnalysis:
|
||||
"""Results of onset detection."""
|
||||
onset_times: List[float] # Timestamps of onsets (seconds)
|
||||
onset_frames: List[int] # Frame indices of onsets
|
||||
onset_count: int = 0
|
||||
avg_onset_interval: float = 0.0 # Average time between onsets (seconds)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"onset_count": self.onset_count,
|
||||
"onset_times": self.onset_times[:100],
|
||||
"avg_onset_interval": round(self.avg_onset_interval, 3),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class VADSegment:
|
||||
"""A single voice activity segment."""
|
||||
start: float # Start time in seconds
|
||||
end: float # End time in seconds
|
||||
is_speech: bool # True if speech detected
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {"start": round(self.start, 3), "end": round(self.end, 3), "is_speech": self.is_speech}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SeparationResult:
|
||||
"""Results of vocal/instrumental separation."""
|
||||
vocals_path: Optional[str] = None
|
||||
instrumental_path: Optional[str] = None
|
||||
duration: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"vocals_path": self.vocals_path,
|
||||
"instrumental_path": self.instrumental_path,
|
||||
"duration": round(self.duration, 2),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Audio loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_audio(
|
||||
path: str | Path,
|
||||
sr: int = 22050,
|
||||
mono: bool = True,
|
||||
duration: float | None = None,
|
||||
) -> tuple:
|
||||
"""Load audio file. Returns (y, sr) tuple.
|
||||
|
||||
Args:
|
||||
path: Path to audio file (wav, mp3, flac, ogg)
|
||||
sr: Target sample rate (default 22050)
|
||||
mono: Convert to mono
|
||||
duration: Max seconds to load (None = full file)
|
||||
|
||||
Returns:
|
||||
(audio_array, sample_rate) or (None, None) on failure
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
y, loaded_sr = librosa.load(
|
||||
str(path), sr=sr, mono=mono, duration=duration,
|
||||
)
|
||||
return y, loaded_sr
|
||||
except Exception as e:
|
||||
logger.error("Failed to load audio %s: %s", path, e)
|
||||
return None, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Beat detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_beats(
|
||||
audio_path: str | Path,
|
||||
sr: int = 22050,
|
||||
duration: float | None = None,
|
||||
) -> Optional[BeatAnalysis]:
|
||||
"""Detect beats and estimate tempo from an audio file.
|
||||
|
||||
Uses librosa.beat_track which implements the algorithm from:
|
||||
Ellis, "Beat Tracking by Dynamic Programming", 2007.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate for analysis
|
||||
duration: Max seconds to analyze
|
||||
|
||||
Returns:
|
||||
BeatAnalysis or None if librosa unavailable
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None
|
||||
|
||||
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
|
||||
if y is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=loaded_sr)
|
||||
beat_times = librosa.frames_to_time(beat_frames, sr=loaded_sr)
|
||||
|
||||
return BeatAnalysis(
|
||||
bpm=float(tempo),
|
||||
beat_times=beat_times.tolist(),
|
||||
beat_frames=beat_frames.tolist(),
|
||||
tempo_confidence=0.8, # librosa doesn't expose this directly
|
||||
duration=len(y) / loaded_sr,
|
||||
sample_rate=loaded_sr,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Beat detection failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Onset detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_onsets(
|
||||
audio_path: str | Path,
|
||||
sr: int = 22050,
|
||||
duration: float | None = None,
|
||||
backtrack: bool = True,
|
||||
) -> Optional[OnsetAnalysis]:
|
||||
"""Detect onsets (when new sounds begin).
|
||||
|
||||
Useful for scene transitions (Video Forge) and speech segment
|
||||
boundaries (LPM 1.0).
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate
|
||||
duration: Max seconds to analyze
|
||||
backtrack: Find preceding energy minimum for each onset
|
||||
|
||||
Returns:
|
||||
OnsetAnalysis or None if librosa unavailable
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None
|
||||
|
||||
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
|
||||
if y is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
onset_frames = librosa.onset.onset_detect(
|
||||
y=y, sr=loaded_sr, backtrack=backtrack,
|
||||
)
|
||||
onset_times = librosa.frames_to_time(onset_frames, sr=loaded_sr)
|
||||
|
||||
intervals = []
|
||||
times = onset_times.tolist()
|
||||
for i in range(1, len(times)):
|
||||
intervals.append(times[i] - times[i - 1])
|
||||
|
||||
return OnsetAnalysis(
|
||||
onset_times=times,
|
||||
onset_frames=onset_frames.tolist(),
|
||||
onset_count=len(times),
|
||||
avg_onset_interval=sum(intervals) / len(intervals) if intervals else 0.0,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Onset detection failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Vocal/instrumental separation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def separate_vocals(
|
||||
audio_path: str | Path,
|
||||
output_dir: str | Path = "/tmp/audio_separation",
|
||||
model_name: str = "htdemucs",
|
||||
) -> Optional[SeparationResult]:
|
||||
"""Separate vocals from instrumental using demucs.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
output_dir: Directory for output stems
|
||||
model_name: Demucs model (htdemucs, htdemucs_ft, mdx_extra)
|
||||
|
||||
Returns:
|
||||
SeparationResult with paths to vocals/instrumental, or None
|
||||
"""
|
||||
if not _ensure_demucs():
|
||||
return None
|
||||
|
||||
try:
|
||||
import demucs.api
|
||||
import soundfile as sf
|
||||
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
separator = demucs.api.Separator(model=model_name)
|
||||
origin, separated = separator.separate_audio_file(str(audio_path))
|
||||
|
||||
vocals_path = output_dir / "vocals.wav"
|
||||
instrumental_path = output_dir / "instrumental.wav"
|
||||
|
||||
sf.write(str(vocals_path), separated["vocals"].cpu().numpy().T, separator.samplerate)
|
||||
sf.write(str(instrumental_path),
|
||||
(separated["drums"] + separated["bass"] + separated["other"]).cpu().numpy().T,
|
||||
separator.samplerate)
|
||||
|
||||
duration = len(origin) / separator.samplerate
|
||||
|
||||
return SeparationResult(
|
||||
vocals_path=str(vocals_path),
|
||||
instrumental_path=str(instrumental_path),
|
||||
duration=duration,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Vocal separation failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Voice Activity Detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_voice_activity(
|
||||
audio_path: str | Path,
|
||||
sr: int = 16000,
|
||||
threshold: float = 0.5,
|
||||
min_speech_duration: float = 0.3,
|
||||
) -> Optional[List[VADSegment]]:
|
||||
"""Detect speech segments using Silero VAD.
|
||||
|
||||
Returns list of segments where speech was detected.
|
||||
Useful for LPM listen/speak state switching.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate (Silero expects 16kHz or 8kHz)
|
||||
threshold: VAD threshold (0.0-1.0)
|
||||
min_speech_duration: Minimum segment length to count as speech
|
||||
|
||||
Returns:
|
||||
List of VADSegment or None if silero unavailable
|
||||
"""
|
||||
if not _ensure_silero():
|
||||
return None
|
||||
|
||||
try:
|
||||
import torch
|
||||
import torchaudio
|
||||
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir='snakers4/silero-vad', model='silero_vad',
|
||||
force_reload=False, onnx=False,
|
||||
)
|
||||
get_speech_timestamps = utils[0]
|
||||
|
||||
wav, file_sr = torchaudio.load(str(audio_path))
|
||||
if file_sr != sr:
|
||||
wav = torchaudio.functional.resample(wav, file_sr, sr)
|
||||
|
||||
if wav.shape[0] > 1:
|
||||
wav = wav.mean(dim=0, keepdim=True)
|
||||
|
||||
speech_timestamps = get_speech_timestamps(
|
||||
wav.squeeze(), model, sampling_rate=sr,
|
||||
threshold=threshold, min_speech_duration_ms=int(min_speech_duration * 1000),
|
||||
)
|
||||
|
||||
segments = []
|
||||
for ts in speech_timestamps:
|
||||
segments.append(VADSegment(
|
||||
start=ts["start"] / sr,
|
||||
end=ts["end"] / sr,
|
||||
is_speech=True,
|
||||
))
|
||||
|
||||
return segments
|
||||
except Exception as e:
|
||||
logger.error("VAD failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full analysis
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyze_audio(
|
||||
audio_path: str | Path,
|
||||
include_separation: bool = False,
|
||||
include_vad: bool = False,
|
||||
sr: int = 22050,
|
||||
) -> Dict[str, Any]:
|
||||
"""Run full audio analysis pipeline.
|
||||
|
||||
Combines beat detection, onset detection, and optionally
|
||||
vocal separation and VAD into a single result dict.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
include_separation: Run vocal separation (slow)
|
||||
include_vad: Run voice activity detection
|
||||
sr: Sample rate for beat/onset analysis
|
||||
|
||||
Returns:
|
||||
Dict with all analysis results
|
||||
"""
|
||||
result = {"path": str(audio_path)}
|
||||
|
||||
beats = detect_beats(audio_path, sr=sr)
|
||||
if beats:
|
||||
result["beats"] = beats.to_dict()
|
||||
|
||||
onsets = detect_onsets(audio_path, sr=sr)
|
||||
if onsets:
|
||||
result["onsets"] = onsets.to_dict()
|
||||
|
||||
if include_separation:
|
||||
separation = separate_vocals(audio_path)
|
||||
if separation:
|
||||
result["separation"] = separation.to_dict()
|
||||
|
||||
if include_vad:
|
||||
segments = detect_voice_activity(audio_path)
|
||||
if segments:
|
||||
result["vad"] = {
|
||||
"segments": [s.to_dict() for s in segments],
|
||||
"speech_ratio": sum(s.end - s.start for s in segments) / (beats.duration if beats else 1.0),
|
||||
}
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user