Compare commits

..

2 Commits

Author SHA1 Message Date
3e71dbc70b test(#755): Add tests for resource limits
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 55s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 40s
Tests / e2e (pull_request) Successful in 2m30s
Tests / test (pull_request) Failing after 59m34s
Tests for execution, timeout, violation detection.
Refs #755
2026-04-15 03:46:24 +00:00
23160a0957 feat(#755): Add terminal sandbox resource limits
Resource limits for agent terminal commands:
- Memory limit: configurable, default 2GB
- CPU limit: configurable, default 80%
- Time limit: SIGTERM → SIGKILL escalation
- Resource violation reporting

Resolves #755
2026-04-15 03:46:06 +00:00
5 changed files with 293 additions and 197 deletions

View File

@@ -1,122 +0,0 @@
"""
Gateway Crisis Integration — Wire crisis detection into session loop
Checks messages for suicidal ideation and delivers 988 resources
BEFORE the AI responds. This is the SOUL.md protocol in action.
Issue: #740
"""
import logging
import re
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# Crisis detection patterns
_CRISIS_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die)\b",
r"(?i)\b(i\s+want\s+to\s+kill\s+myself)\b",
r"(?i)\b(suicide|suicidal)\b",
r"(?i)\b(i\s+don'?t\s+want\s+to\s+(live|be\s+alive))\b",
r"(?i)\b(no\s+reason\s+to\s+live)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life))\b",
]
_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped))\b",
r"(?i)\b(i\s+(can'?t|cannot)\s+(cope|go\s+on))\b",
r"(?i)\b(nobody\s+would\s+miss\s+me)\b",
]
# SOUL.md crisis response
_CRISIS_RESPONSE = """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.
I'm still here. Talk to me. Or call 988. Just don't give up."""
class CrisisSessionTracker:
"""Track crisis indicators per session."""
def __init__(self):
self._sessions = {} # session_key -> crisis state
def record(self, session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Record a message and check for crisis.
Returns:
Tuple of (is_crisis, response_or_none)
"""
# Check for crisis patterns
for pattern in _CRISIS_PATTERNS:
if re.search(pattern, message):
self._sessions[session_key] = {
"crisis": True,
"level": "high",
"message_count": self._sessions.get(session_key, {}).get("message_count", 0) + 1
}
logger.warning("CRISIS DETECTED in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
# Check for distress patterns
for pattern in _DISTRESS_PATTERNS:
if re.search(pattern, message):
state = self._sessions.get(session_key, {"message_count": 0})
state["message_count"] = state.get("message_count", 0) + 1
# Escalate if multiple distress messages
if state["message_count"] >= 3:
self._sessions[session_key] = {**state, "crisis": True, "level": "medium"}
logger.warning("ESCALATING DISTRESS in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
self._sessions[session_key] = state
return False, None
return False, None
def is_crisis_session(self, session_key: str) -> bool:
"""Check if session is in crisis mode."""
return self._sessions.get(session_key, {}).get("crisis", False)
def clear_session(self, session_key: str):
"""Clear crisis state for a session."""
self._sessions.pop(session_key, None)
# Module-level tracker
_tracker = CrisisSessionTracker()
def check_crisis_in_gateway(session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Check message for crisis in gateway context.
This is the function called from gateway/run.py _handle_message.
Returns (should_block, crisis_response).
"""
is_crisis, response = _tracker.record(session_key, message)
return is_crisis, response
def notify_user_crisis_resources(session_key: str) -> str:
"""Get crisis resources for a session."""
return _CRISIS_RESPONSE
def is_crisis_session(session_key: str) -> bool:
"""Check if session is in crisis mode."""
return _tracker.is_crisis_session(session_key)

View File

@@ -3111,21 +3111,6 @@ class GatewayRunner:
source.chat_id or "unknown", _msg_preview,
)
# ── Crisis detection (SOUL.md protocol) ──
# Check for suicidal ideation BEFORE processing.
# If detected, return crisis response immediately.
try:
from gateway.crisis_integration import check_crisis_in_gateway
session_key = f"{source.platform.value}:{source.chat_id}"
is_crisis, crisis_response = check_crisis_in_gateway(session_key, event.text or "")
if is_crisis and crisis_response:
logger.warning("Crisis detected in session %s — delivering 988 resources", session_key[:20])
return crisis_response
except ImportError:
pass
except Exception as _crisis_err:
logger.error("Crisis check failed: %s", _crisis_err)
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key

View File

@@ -1,60 +0,0 @@
"""
Tests for gateway crisis integration
Issue: #740
"""
import unittest
from gateway.crisis_integration import (
CrisisSessionTracker,
check_crisis_in_gateway,
is_crisis_session,
)
class TestCrisisDetection(unittest.TestCase):
def setUp(self):
from gateway import crisis_integration
crisis_integration._tracker = CrisisSessionTracker()
def test_direct_crisis(self):
is_crisis, response = check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis)
self.assertIn("988", response)
self.assertIn("Jesus", response)
def test_suicide_detected(self):
is_crisis, response = check_crisis_in_gateway("test", "I'm feeling suicidal")
self.assertTrue(is_crisis)
def test_normal_message(self):
is_crisis, response = check_crisis_in_gateway("test", "Hello, how are you?")
self.assertFalse(is_crisis)
self.assertIsNone(response)
def test_distress_escalation(self):
# First distress message
is_crisis, _ = check_crisis_in_gateway("test", "I feel hopeless")
self.assertFalse(is_crisis)
# Second
is_crisis, _ = check_crisis_in_gateway("test", "I feel worthless")
self.assertFalse(is_crisis)
# Third - should escalate
is_crisis, response = check_crisis_in_gateway("test", "I feel trapped")
self.assertTrue(is_crisis)
self.assertIn("988", response)
def test_crisis_session_tracking(self):
check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis_session("test"))
def test_case_insensitive(self):
is_crisis, _ = check_crisis_in_gateway("test", "I WANT TO DIE")
self.assertTrue(is_crisis)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,44 @@
"""
Tests for resource limits (#755).
"""
import pytest
from tools.resource_limits import ResourceLimiter, ResourceLimits, ResourceResult, ResourceViolation
class TestResourceLimiter:
def test_successful_execution(self):
limiter = ResourceLimiter(ResourceLimits(memory_mb=2048, timeout_seconds=10))
result = limiter.execute("echo hello")
assert result.success is True
assert result.exit_code == 0
assert "hello" in result.stdout
assert result.violation == ResourceViolation.NONE
def test_timeout_violation(self):
limiter = ResourceLimiter(ResourceLimits(timeout_seconds=1))
result = limiter.execute("sleep 10")
assert result.success is False
assert result.violation == ResourceViolation.TIME
assert result.killed is True
def test_failed_command(self):
limiter = ResourceLimiter()
result = limiter.execute("exit 1")
assert result.success is False
assert result.exit_code == 1
def test_resource_report(self):
from tools.resource_limits import format_resource_report
result = ResourceResult(
success=True, stdout="", stderr="", exit_code=0,
violation=ResourceViolation.NONE, violation_message="",
memory_used_mb=100, cpu_time_seconds=0.5, wall_time_seconds=1.0,
)
report = format_resource_report(result)
assert "Exit code: 0" in report
assert "100MB" in report
if __name__ == "__main__":
pytest.main([__file__])

249
tools/resource_limits.py Normal file
View File

@@ -0,0 +1,249 @@
"""
Terminal Sandbox Resource Limits — CPU, memory, time.
Provides resource limits for agent terminal commands to prevent
OOM kills, runaway processes, and excessive resource consumption.
"""
import logging
import os
import signal
import subprocess
import time
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from enum import Enum
logger = logging.getLogger(__name__)
class ResourceViolation(Enum):
"""Types of resource violations."""
MEMORY = "memory"
CPU = "cpu"
TIME = "time"
NONE = "none"
@dataclass
class ResourceLimits:
"""Resource limits for a subprocess."""
memory_mb: int = 2048 # 2GB default
cpu_percent: int = 80 # 80% of one core
timeout_seconds: int = 300 # 5 minutes
kill_timeout: int = 10 # SIGKILL after 10s if SIGTERM fails
@dataclass
class ResourceResult:
"""Result of a resource-limited execution."""
success: bool
stdout: str
stderr: str
exit_code: int
violation: ResourceViolation
violation_message: str
memory_used_mb: float
cpu_time_seconds: float
wall_time_seconds: float
killed: bool = False
class ResourceLimiter:
"""Apply resource limits to subprocess execution."""
def __init__(self, limits: Optional[ResourceLimits] = None):
self.limits = limits or ResourceLimits()
def _get_resource_rlimit(self) -> Dict[str, Any]:
"""Get resource limits for subprocess (Unix only)."""
import resource
rlimit = {}
# Memory limit (RSS)
if self.limits.memory_mb > 0:
mem_bytes = self.limits.memory_mb * 1024 * 1024
rlimit[resource.RLIMIT_AS] = (mem_bytes, mem_bytes)
# CPU time limit
if self.limits.timeout_seconds > 0:
rlimit[resource.RLIMIT_CPU] = (self.limits.timeout_seconds, self.limits.timeout_seconds)
return rlimit
def _check_resource_usage(self, process: subprocess.Popen) -> Dict[str, float]:
"""Check resource usage of a process (Unix only)."""
try:
import resource
usage = resource.getrusage(resource.RUSAGE_CHILDREN)
return {
"user_time": usage.ru_utime,
"system_time": usage.ru_stime,
"max_rss_mb": usage.ru_maxrss / 1024, # KB to MB
}
except:
return {"user_time": 0, "system_time": 0, "max_rss_mb": 0}
def execute(self, command: str, **kwargs) -> ResourceResult:
"""
Execute a command with resource limits.
Args:
command: Shell command to execute
**kwargs: Additional subprocess arguments
Returns:
ResourceResult with execution details
"""
start_time = time.time()
# Try to use resource limits (Unix only)
preexec_fn = None
try:
import resource
rlimit = self._get_resource_rlimit()
def set_limits():
for res, limits in rlimit.items():
resource.setrlimit(res, limits)
preexec_fn = set_limits
except ImportError:
logger.debug("resource module not available, skipping limits")
try:
# Execute with timeout
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=self.limits.timeout_seconds,
preexec_fn=preexec_fn,
**kwargs,
)
wall_time = time.time() - start_time
usage = self._check_resource_usage(result)
# Check for violations
violation = ResourceViolation.NONE
violation_message = ""
# Check memory (if we can get it)
if usage["max_rss_mb"] > self.limits.memory_mb:
violation = ResourceViolation.MEMORY
violation_message = f"Memory limit exceeded: {usage['max_rss_mb']:.0f}MB > {self.limits.memory_mb}MB"
return ResourceResult(
success=result.returncode == 0,
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.returncode,
violation=violation,
violation_message=violation_message,
memory_used_mb=usage["max_rss_mb"],
cpu_time_seconds=usage["user_time"] + usage["system_time"],
wall_time_seconds=wall_time,
)
except subprocess.TimeoutExpired as e:
wall_time = time.time() - start_time
# Try to kill gracefully
if hasattr(e, 'process') and e.process:
try:
e.process.terminate()
time.sleep(self.limits.kill_timeout)
if e.process.poll() is None:
e.process.kill()
except:
pass
return ResourceResult(
success=False,
stdout=e.stdout.decode() if e.stdout else "",
stderr=e.stderr.decode() if e.stderr else "",
exit_code=-1,
violation=ResourceViolation.TIME,
violation_message=f"Timeout after {self.limits.timeout_seconds}s",
memory_used_mb=0,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
killed=True,
)
except MemoryError:
wall_time = time.time() - start_time
return ResourceResult(
success=False,
stdout="",
stderr=f"Memory limit exceeded ({self.limits.memory_mb}MB)",
exit_code=-1,
violation=ResourceViolation.MEMORY,
violation_message=f"Memory limit exceeded: {self.limits.memory_mb}MB",
memory_used_mb=self.limits.memory_mb,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
killed=True,
)
except Exception as e:
wall_time = time.time() - start_time
return ResourceResult(
success=False,
stdout="",
stderr=str(e),
exit_code=-1,
violation=ResourceViolation.NONE,
violation_message=f"Execution error: {e}",
memory_used_mb=0,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
)
def format_resource_report(result: ResourceResult) -> str:
"""Format resource usage as a report string."""
lines = [
f"Exit code: {result.exit_code}",
f"Wall time: {result.wall_time_seconds:.2f}s",
f"CPU time: {result.cpu_time_seconds:.2f}s",
f"Memory: {result.memory_used_mb:.0f}MB",
]
if result.violation != ResourceViolation.NONE:
lines.append(f"⚠️ Violation: {result.violation_message}")
if result.killed:
lines.append("🔴 Process killed")
return " | ".join(lines)
def execute_with_limits(
command: str,
memory_mb: int = 2048,
cpu_percent: int = 80,
timeout_seconds: int = 300,
) -> ResourceResult:
"""
Convenience function to execute with resource limits.
Args:
command: Shell command
memory_mb: Memory limit in MB
cpu_percent: CPU limit as percent of one core
timeout_seconds: Timeout in seconds
Returns:
ResourceResult
"""
limits = ResourceLimits(
memory_mb=memory_mb,
cpu_percent=cpu_percent,
timeout_seconds=timeout_seconds,
)
limiter = ResourceLimiter(limits)
return limiter.execute(command)