Compare commits

..

2 Commits

Author SHA1 Message Date
3e71dbc70b test(#755): Add tests for resource limits
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 55s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 40s
Tests / e2e (pull_request) Successful in 2m30s
Tests / test (pull_request) Failing after 59m34s
Tests for execution, timeout, violation detection.
Refs #755
2026-04-15 03:46:24 +00:00
23160a0957 feat(#755): Add terminal sandbox resource limits
Resource limits for agent terminal commands:
- Memory limit: configurable, default 2GB
- CPU limit: configurable, default 80%
- Time limit: SIGTERM → SIGKILL escalation
- Resource violation reporting

Resolves #755
2026-04-15 03:46:06 +00:00
4 changed files with 293 additions and 246 deletions

View File

@@ -1,189 +0,0 @@
"""
Gateway Message Deduplication — Prevent double-posting.
Provides idempotent message delivery by tracking message UUIDs
and suppressing duplicates within a configurable time window.
"""
import hashlib
import logging
import time
import uuid
from typing import Dict, Optional, Set
from dataclasses import dataclass, field
from collections import OrderedDict
logger = logging.getLogger(__name__)
@dataclass
class MessageRecord:
"""Record of a sent message."""
message_id: str
content_hash: str
timestamp: float
session_id: str
platform: str
class MessageDeduplicator:
"""
Deduplicates outbound messages within a time window.
Each message gets a UUID. If the same message (by content hash)
is sent again within the window, it's suppressed.
"""
def __init__(self, window_seconds: int = 60, max_records: int = 1000):
"""
Initialize deduplicator.
Args:
window_seconds: Time window for deduplication (default 60s)
max_records: Maximum records to keep in memory
"""
self.window_seconds = window_seconds
self.max_records = max_records
self._records: OrderedDict[str, MessageRecord] = OrderedDict()
self._suppressed_count = 0
def _content_hash(self, content: str, session_id: str = "", platform: str = "") -> str:
"""Generate hash for message content."""
combined = f"{session_id}:{platform}:{content}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
def _cleanup_old_records(self):
"""Remove records older than the dedup window."""
cutoff = time.time() - self.window_seconds
to_remove = []
for msg_id, record in self._records.items():
if record.timestamp < cutoff:
to_remove.append(msg_id)
for msg_id in to_remove:
del self._records[msg_id]
def _enforce_max_records(self):
"""Enforce maximum record count by removing oldest."""
while len(self._records) > self.max_records:
self._records.popitem(last=False)
def check_duplicate(self, content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is a duplicate.
Args:
content: Message content
session_id: Session identifier
platform: Platform name (telegram, discord, etc.)
Returns:
Message ID if duplicate found, None if new message
"""
self._cleanup_old_records()
content_hash = self._content_hash(content, session_id, platform)
for msg_id, record in self._records.items():
if record.content_hash == content_hash:
age = time.time() - record.timestamp
if age < self.window_seconds:
self._suppressed_count += 1
logger.info(
"Suppressed duplicate message (age: %.1fs, original: %s)",
age, msg_id
)
return msg_id
return None
def record_message(self, content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message and return its UUID.
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
UUID for this message
"""
self._cleanup_old_records()
message_id = str(uuid.uuid4())
content_hash = self._content_hash(content, session_id, platform)
self._records[message_id] = MessageRecord(
message_id=message_id,
content_hash=content_hash,
timestamp=time.time(),
session_id=session_id,
platform=platform,
)
self._enforce_max_records()
return message_id
def should_send(self, content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
True if message should be sent, False if duplicate
"""
return self.check_duplicate(content, session_id, platform) is None
def get_stats(self) -> Dict:
"""Get deduplication statistics."""
return {
"total_records": len(self._records),
"suppressed_count": self._suppressed_count,
"window_seconds": self.window_seconds,
"max_records": self.max_records,
}
def clear(self):
"""Clear all records."""
self._records.clear()
self._suppressed_count = 0
# Global deduplicator instance
_deduplicator: Optional[MessageDeduplicator] = None
def get_deduplicator() -> MessageDeduplicator:
"""Get or create global deduplicator instance."""
global _deduplicator
if _deduplicator is None:
_deduplicator = MessageDeduplicator()
return _deduplicator
def deduplicate_message(content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is duplicate. Returns message_id if duplicate, None if new.
"""
return get_deduplicator().check_duplicate(content, session_id, platform)
def record_sent_message(content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message. Returns UUID for the message.
"""
return get_deduplicator().record_message(content, session_id, platform)
def should_send_message(content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
"""
return get_deduplicator().should_send(content, session_id, platform)

View File

@@ -1,57 +0,0 @@
"""
Tests for message deduplication (#756).
"""
import pytest
import time
from gateway.message_dedup import MessageDeduplicator
class TestMessageDeduplicator:
def test_first_message_allowed(self):
dedup = MessageDeduplicator()
assert dedup.should_send("Hello") is True
def test_duplicate_suppressed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "telegram") is False
def test_different_session_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session2", "telegram") is True
def test_different_platform_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "discord") is True
def test_different_content_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("World", "session1", "telegram") is True
def test_window_expiry(self):
dedup = MessageDeduplicator(window_seconds=1)
dedup.record_message("Hello", "session1", "telegram")
time.sleep(1.1)
assert dedup.should_send("Hello", "session1", "telegram") is True
def test_record_returns_uuid(self):
dedup = MessageDeduplicator()
msg_id = dedup.record_message("Hello")
assert msg_id is not None
assert len(msg_id) == 36 # UUID format
def test_stats(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello")
dedup.record_message("Hello") # duplicate
stats = dedup.get_stats()
assert stats["total_records"] == 1
assert stats["suppressed_count"] == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,44 @@
"""
Tests for resource limits (#755).
"""
import pytest
from tools.resource_limits import ResourceLimiter, ResourceLimits, ResourceResult, ResourceViolation
class TestResourceLimiter:
def test_successful_execution(self):
limiter = ResourceLimiter(ResourceLimits(memory_mb=2048, timeout_seconds=10))
result = limiter.execute("echo hello")
assert result.success is True
assert result.exit_code == 0
assert "hello" in result.stdout
assert result.violation == ResourceViolation.NONE
def test_timeout_violation(self):
limiter = ResourceLimiter(ResourceLimits(timeout_seconds=1))
result = limiter.execute("sleep 10")
assert result.success is False
assert result.violation == ResourceViolation.TIME
assert result.killed is True
def test_failed_command(self):
limiter = ResourceLimiter()
result = limiter.execute("exit 1")
assert result.success is False
assert result.exit_code == 1
def test_resource_report(self):
from tools.resource_limits import format_resource_report
result = ResourceResult(
success=True, stdout="", stderr="", exit_code=0,
violation=ResourceViolation.NONE, violation_message="",
memory_used_mb=100, cpu_time_seconds=0.5, wall_time_seconds=1.0,
)
report = format_resource_report(result)
assert "Exit code: 0" in report
assert "100MB" in report
if __name__ == "__main__":
pytest.main([__file__])

249
tools/resource_limits.py Normal file
View File

@@ -0,0 +1,249 @@
"""
Terminal Sandbox Resource Limits — CPU, memory, time.
Provides resource limits for agent terminal commands to prevent
OOM kills, runaway processes, and excessive resource consumption.
"""
import logging
import os
import signal
import subprocess
import time
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from enum import Enum
logger = logging.getLogger(__name__)
class ResourceViolation(Enum):
"""Types of resource violations."""
MEMORY = "memory"
CPU = "cpu"
TIME = "time"
NONE = "none"
@dataclass
class ResourceLimits:
"""Resource limits for a subprocess."""
memory_mb: int = 2048 # 2GB default
cpu_percent: int = 80 # 80% of one core
timeout_seconds: int = 300 # 5 minutes
kill_timeout: int = 10 # SIGKILL after 10s if SIGTERM fails
@dataclass
class ResourceResult:
"""Result of a resource-limited execution."""
success: bool
stdout: str
stderr: str
exit_code: int
violation: ResourceViolation
violation_message: str
memory_used_mb: float
cpu_time_seconds: float
wall_time_seconds: float
killed: bool = False
class ResourceLimiter:
"""Apply resource limits to subprocess execution."""
def __init__(self, limits: Optional[ResourceLimits] = None):
self.limits = limits or ResourceLimits()
def _get_resource_rlimit(self) -> Dict[str, Any]:
"""Get resource limits for subprocess (Unix only)."""
import resource
rlimit = {}
# Memory limit (RSS)
if self.limits.memory_mb > 0:
mem_bytes = self.limits.memory_mb * 1024 * 1024
rlimit[resource.RLIMIT_AS] = (mem_bytes, mem_bytes)
# CPU time limit
if self.limits.timeout_seconds > 0:
rlimit[resource.RLIMIT_CPU] = (self.limits.timeout_seconds, self.limits.timeout_seconds)
return rlimit
def _check_resource_usage(self, process: subprocess.Popen) -> Dict[str, float]:
"""Check resource usage of a process (Unix only)."""
try:
import resource
usage = resource.getrusage(resource.RUSAGE_CHILDREN)
return {
"user_time": usage.ru_utime,
"system_time": usage.ru_stime,
"max_rss_mb": usage.ru_maxrss / 1024, # KB to MB
}
except:
return {"user_time": 0, "system_time": 0, "max_rss_mb": 0}
def execute(self, command: str, **kwargs) -> ResourceResult:
"""
Execute a command with resource limits.
Args:
command: Shell command to execute
**kwargs: Additional subprocess arguments
Returns:
ResourceResult with execution details
"""
start_time = time.time()
# Try to use resource limits (Unix only)
preexec_fn = None
try:
import resource
rlimit = self._get_resource_rlimit()
def set_limits():
for res, limits in rlimit.items():
resource.setrlimit(res, limits)
preexec_fn = set_limits
except ImportError:
logger.debug("resource module not available, skipping limits")
try:
# Execute with timeout
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=self.limits.timeout_seconds,
preexec_fn=preexec_fn,
**kwargs,
)
wall_time = time.time() - start_time
usage = self._check_resource_usage(result)
# Check for violations
violation = ResourceViolation.NONE
violation_message = ""
# Check memory (if we can get it)
if usage["max_rss_mb"] > self.limits.memory_mb:
violation = ResourceViolation.MEMORY
violation_message = f"Memory limit exceeded: {usage['max_rss_mb']:.0f}MB > {self.limits.memory_mb}MB"
return ResourceResult(
success=result.returncode == 0,
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.returncode,
violation=violation,
violation_message=violation_message,
memory_used_mb=usage["max_rss_mb"],
cpu_time_seconds=usage["user_time"] + usage["system_time"],
wall_time_seconds=wall_time,
)
except subprocess.TimeoutExpired as e:
wall_time = time.time() - start_time
# Try to kill gracefully
if hasattr(e, 'process') and e.process:
try:
e.process.terminate()
time.sleep(self.limits.kill_timeout)
if e.process.poll() is None:
e.process.kill()
except:
pass
return ResourceResult(
success=False,
stdout=e.stdout.decode() if e.stdout else "",
stderr=e.stderr.decode() if e.stderr else "",
exit_code=-1,
violation=ResourceViolation.TIME,
violation_message=f"Timeout after {self.limits.timeout_seconds}s",
memory_used_mb=0,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
killed=True,
)
except MemoryError:
wall_time = time.time() - start_time
return ResourceResult(
success=False,
stdout="",
stderr=f"Memory limit exceeded ({self.limits.memory_mb}MB)",
exit_code=-1,
violation=ResourceViolation.MEMORY,
violation_message=f"Memory limit exceeded: {self.limits.memory_mb}MB",
memory_used_mb=self.limits.memory_mb,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
killed=True,
)
except Exception as e:
wall_time = time.time() - start_time
return ResourceResult(
success=False,
stdout="",
stderr=str(e),
exit_code=-1,
violation=ResourceViolation.NONE,
violation_message=f"Execution error: {e}",
memory_used_mb=0,
cpu_time_seconds=0,
wall_time_seconds=wall_time,
)
def format_resource_report(result: ResourceResult) -> str:
"""Format resource usage as a report string."""
lines = [
f"Exit code: {result.exit_code}",
f"Wall time: {result.wall_time_seconds:.2f}s",
f"CPU time: {result.cpu_time_seconds:.2f}s",
f"Memory: {result.memory_used_mb:.0f}MB",
]
if result.violation != ResourceViolation.NONE:
lines.append(f"⚠️ Violation: {result.violation_message}")
if result.killed:
lines.append("🔴 Process killed")
return " | ".join(lines)
def execute_with_limits(
command: str,
memory_mb: int = 2048,
cpu_percent: int = 80,
timeout_seconds: int = 300,
) -> ResourceResult:
"""
Convenience function to execute with resource limits.
Args:
command: Shell command
memory_mb: Memory limit in MB
cpu_percent: CPU limit as percent of one core
timeout_seconds: Timeout in seconds
Returns:
ResourceResult
"""
limits = ResourceLimits(
memory_mb=memory_mb,
cpu_percent=cpu_percent,
timeout_seconds=timeout_seconds,
)
limiter = ResourceLimiter(limits)
return limiter.execute(command)