Compare commits
2 Commits
fix/670-ap
...
fix/748
| Author | SHA1 | Date | |
|---|---|---|---|
| 8dd0aaa89d | |||
| 4ad81ce646 |
221
agent/session_compaction.py
Normal file
221
agent/session_compaction.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""
|
||||
Session Compaction with Fact Extraction — #748
|
||||
|
||||
Before compressing a long conversation, extracts durable facts
|
||||
(user preferences, corrections, project details) and saves them
|
||||
to the fact store. Then compresses the conversation.
|
||||
|
||||
This ensures key information survives context limits.
|
||||
|
||||
Usage:
|
||||
from agent.session_compaction import compact_session
|
||||
|
||||
# In the conversation loop, when context is near limit:
|
||||
compact_session(messages, fact_store)
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fact Extraction Patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Patterns that indicate durable facts worth preserving
|
||||
_FACT_PATTERNS = [
|
||||
# User preferences
|
||||
(r"(?:i prefer|i like|i always|my preference is|remember that i)\s+(.+?)(?:\.|$)", "user_pref"),
|
||||
(r"(?:call me|my name is|i\'m)\s+([A-Z][a-z]+)", "user_name"),
|
||||
(r"(?:don\'t|do not|never)\s+(?:use|do|show|tell)\s+(.+?)(?:\.|$)", "user_constraint"),
|
||||
|
||||
# Corrections
|
||||
(r"(?:actually|no,?|correction:?)\s+(.+?)(?:\.|$)", "correction"),
|
||||
(r"(?:that\'s wrong|not correct|i meant)\s+(.+?)(?:\.|$)", "correction"),
|
||||
|
||||
# Project facts
|
||||
(r"(?:the project|this repo|the codebase)\s+(?:is|has|uses|runs)\s+(.+?)(?:\.|$)", "project_fact"),
|
||||
(r"(?:we use|our stack is|deployed on)\s+(.+?)(?:\.|$)", "project_fact"),
|
||||
|
||||
# Technical facts
|
||||
(r"(?:the server|the service|the endpoint)\s+(?:is|runs on|listens on)\s+(.+?)(?:\.|$)", "technical"),
|
||||
(r"(?:port|url|address|host)\s*(?::|is|=)\s*(.+?)(?:\.|$)", "technical"),
|
||||
]
|
||||
|
||||
|
||||
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Scan conversation messages for durable facts.
|
||||
|
||||
Returns list of fact dicts suitable for fact_store.
|
||||
"""
|
||||
facts = []
|
||||
seen = set() # Deduplicate
|
||||
|
||||
for msg in messages:
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
|
||||
content = msg.get("content", "")
|
||||
if not isinstance(content, str) or len(content) < 10:
|
||||
continue
|
||||
|
||||
for pattern, category in _FACT_PATTERNS:
|
||||
matches = re.findall(pattern, content, re.IGNORECASE)
|
||||
for match in matches:
|
||||
if isinstance(match, tuple):
|
||||
match = match[0] if match else ""
|
||||
fact_text = match.strip()
|
||||
|
||||
if len(fact_text) < 5 or len(fact_text) > 200:
|
||||
continue
|
||||
|
||||
# Deduplicate
|
||||
dedup_key = f"{category}:{fact_text.lower()}"
|
||||
if dedup_key in seen:
|
||||
continue
|
||||
seen.add(dedup_key)
|
||||
|
||||
facts.append({
|
||||
"content": fact_text,
|
||||
"category": category,
|
||||
"source": "session_compaction",
|
||||
"trust": 0.7, # Medium trust — extracted, not explicitly stated
|
||||
})
|
||||
|
||||
return facts
|
||||
|
||||
|
||||
def extract_preferences(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Extract user preferences specifically."""
|
||||
prefs = []
|
||||
|
||||
pref_patterns = [
|
||||
r"(?:i prefer|i like|i want|use|always)\s+(.+?)(?:\.|$)",
|
||||
r"(?:my (?:preferred|favorite|default))\s+(?:is|are)\s+(.+?)(?:\.|$)",
|
||||
r"(?:set|configure|make)\s+(?:it to|the default to)\s+(.+?)(?:\.|$)",
|
||||
]
|
||||
|
||||
for msg in messages:
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
content = msg.get("content", "")
|
||||
if not isinstance(content, str):
|
||||
continue
|
||||
|
||||
for pattern in pref_patterns:
|
||||
matches = re.findall(pattern, content, re.IGNORECASE)
|
||||
for match in matches:
|
||||
if isinstance(match, str) and len(match) > 5 and len(match) < 200:
|
||||
prefs.append({
|
||||
"content": match.strip(),
|
||||
"category": "user_pref",
|
||||
"source": "session_compaction",
|
||||
"trust": 0.8,
|
||||
})
|
||||
|
||||
return prefs
|
||||
|
||||
|
||||
def compact_session(
|
||||
messages: List[Dict[str, Any]],
|
||||
fact_store: Any = None,
|
||||
keep_recent: int = 10,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
"""
|
||||
Compact a session by extracting facts and compressing old messages.
|
||||
|
||||
Args:
|
||||
messages: Full conversation history
|
||||
fact_store: Optional fact_store instance for saving facts
|
||||
keep_recent: Number of recent messages to keep uncompressed
|
||||
|
||||
Returns:
|
||||
Tuple of (compacted_messages, facts_extracted)
|
||||
"""
|
||||
if len(messages) <= keep_recent * 2:
|
||||
return messages, 0
|
||||
|
||||
# Split into old (to compress) and recent (to keep)
|
||||
split_point = len(messages) - keep_recent
|
||||
old_messages = messages[:split_point]
|
||||
recent_messages = messages[split_point:]
|
||||
|
||||
# Extract facts from old messages
|
||||
facts = extract_facts_from_messages(old_messages)
|
||||
prefs = extract_preferences(old_messages)
|
||||
all_facts = facts + prefs
|
||||
|
||||
# Save facts to store if available
|
||||
saved_count = 0
|
||||
if fact_store and all_facts:
|
||||
for fact in all_facts:
|
||||
try:
|
||||
if hasattr(fact_store, 'store'):
|
||||
fact_store.store(
|
||||
content=fact["content"],
|
||||
category=fact["category"],
|
||||
tags=["session_compaction"],
|
||||
)
|
||||
saved_count += 1
|
||||
elif hasattr(fact_store, 'add'):
|
||||
fact_store.add(fact["content"])
|
||||
saved_count += 1
|
||||
except Exception:
|
||||
pass # Don't let fact saving block compaction
|
||||
|
||||
# Create summary of old messages
|
||||
summary_parts = []
|
||||
if saved_count > 0:
|
||||
summary_parts.append(f"[Session compacted: {saved_count} facts extracted and saved]")
|
||||
|
||||
# Count message types
|
||||
user_msgs = sum(1 for m in old_messages if m.get("role") == "user")
|
||||
asst_msgs = sum(1 for m in old_messages if m.get("role") == "assistant")
|
||||
summary_parts.append(f"[Previous conversation: {user_msgs} user messages, {asst_msgs} assistant responses]")
|
||||
|
||||
summary = " ".join(summary_parts)
|
||||
|
||||
# Build compacted messages
|
||||
compacted = []
|
||||
|
||||
# Add summary as system message
|
||||
if summary:
|
||||
compacted.append({
|
||||
"role": "system",
|
||||
"content": summary,
|
||||
"_compacted": True,
|
||||
})
|
||||
|
||||
# Add extracted facts as system context
|
||||
if all_facts:
|
||||
facts_text = "Known facts from previous conversation:\n"
|
||||
for fact in all_facts[:20]: # Limit to 20 facts
|
||||
facts_text += f"- [{fact['category']}] {fact['content']}\n"
|
||||
|
||||
compacted.append({
|
||||
"role": "system",
|
||||
"content": facts_text,
|
||||
"_extracted_facts": True,
|
||||
})
|
||||
|
||||
# Add recent messages
|
||||
compacted.extend(recent_messages)
|
||||
|
||||
return compacted, saved_count
|
||||
|
||||
|
||||
def should_compact(messages: List[Dict[str, Any]], max_tokens: int = 80000) -> bool:
|
||||
"""
|
||||
Determine if compaction is needed based on message count/length.
|
||||
|
||||
Simple heuristic: compact if we have many messages or very long content.
|
||||
"""
|
||||
if len(messages) < 50:
|
||||
return False
|
||||
|
||||
# Estimate token count (rough: 4 chars per token)
|
||||
total_chars = sum(len(str(m.get("content", ""))) for m in messages)
|
||||
estimated_tokens = total_chars // 4
|
||||
|
||||
return estimated_tokens > max_tokens * 0.8 # Compact at 80% of limit
|
||||
@@ -1,80 +0,0 @@
|
||||
# Approval Tier System
|
||||
|
||||
Graduated safety for command approval based on risk level.
|
||||
|
||||
## Tiers
|
||||
|
||||
| Tier | Name | Action Types | Who Approves | Timeout |
|
||||
|------|------|--------------|--------------|---------|
|
||||
| 0 | SAFE | Read, search, list, view | None | N/A |
|
||||
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
|
||||
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
|
||||
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
|
||||
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
|
||||
|
||||
## How It Works
|
||||
|
||||
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
|
||||
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
|
||||
3. **Human approval**: MEDIUM+ tiers require human confirmation
|
||||
4. **Timeout handling**: If no response within timeout, escalate to next tier
|
||||
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
from tools.approval import TieredApproval, detect_tier, ApprovalTier
|
||||
|
||||
# Detect tier
|
||||
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
|
||||
|
||||
# Request approval
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
|
||||
if result["approved"]:
|
||||
# Auto-approved (SAFE or LOW tier)
|
||||
execute_command()
|
||||
else:
|
||||
# Needs human approval
|
||||
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
|
||||
```
|
||||
|
||||
## Crisis Bypass
|
||||
|
||||
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
|
||||
|
||||
```python
|
||||
from tools.approval import is_crisis_bypass
|
||||
|
||||
is_crisis_bypass("call 988 for help") # True — bypasses approval
|
||||
```
|
||||
|
||||
## Timeout Escalation
|
||||
|
||||
When a tier times out without human response:
|
||||
- MEDIUM → HIGH (30s timeout)
|
||||
- HIGH → CRITICAL (10s timeout)
|
||||
- CRITICAL → Deny
|
||||
|
||||
## Integration
|
||||
|
||||
The tier system integrates with:
|
||||
- **CLI**: Interactive prompts with tier-aware timeouts
|
||||
- **Gateway**: Telegram/Discord approval buttons
|
||||
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
|
||||
|
||||
## Testing
|
||||
|
||||
Run tests with:
|
||||
```bash
|
||||
python -m pytest tests/test_approval_tiers.py -v
|
||||
```
|
||||
|
||||
26 tests covering:
|
||||
- Tier detection from commands and actions
|
||||
- Timeout values per tier
|
||||
- Approver requirements
|
||||
- Crisis bypass logic
|
||||
- Approval request and resolution
|
||||
- Timeout escalation
|
||||
@@ -1,141 +0,0 @@
|
||||
"""Tests for approval tier system (Issue #670)."""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.approval import (
|
||||
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
|
||||
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
|
||||
)
|
||||
|
||||
|
||||
class TestApprovalTier:
|
||||
def test_safe_read(self):
|
||||
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
|
||||
|
||||
def test_safe_search(self):
|
||||
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
|
||||
|
||||
def test_low_write(self):
|
||||
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
|
||||
|
||||
def test_medium_message(self):
|
||||
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
|
||||
|
||||
def test_high_config(self):
|
||||
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
|
||||
|
||||
def test_critical_delete(self):
|
||||
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_crisis_keyword(self):
|
||||
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_dangerous_pattern_escalation(self):
|
||||
# rm -rf should be CRITICAL
|
||||
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
|
||||
|
||||
|
||||
class TestTierTimeouts:
|
||||
def test_safe_no_timeout(self):
|
||||
assert get_tier_timeout(ApprovalTier.SAFE) == 0
|
||||
|
||||
def test_medium_60s(self):
|
||||
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
|
||||
|
||||
def test_high_30s(self):
|
||||
assert get_tier_timeout(ApprovalTier.HIGH) == 30
|
||||
|
||||
def test_critical_10s(self):
|
||||
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
|
||||
|
||||
|
||||
class TestTierApprovers:
|
||||
def test_safe_no_approvers(self):
|
||||
assert get_tier_approvers(ApprovalTier.SAFE) == ()
|
||||
|
||||
def test_low_llm_only(self):
|
||||
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
|
||||
|
||||
def test_medium_human_llm(self):
|
||||
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
|
||||
|
||||
def test_requires_human(self):
|
||||
assert requires_human_approval(ApprovalTier.SAFE) == False
|
||||
assert requires_human_approval(ApprovalTier.LOW) == False
|
||||
assert requires_human_approval(ApprovalTier.MEDIUM) == True
|
||||
assert requires_human_approval(ApprovalTier.HIGH) == True
|
||||
assert requires_human_approval(ApprovalTier.CRITICAL) == True
|
||||
|
||||
|
||||
class TestCrisisBypass:
|
||||
def test_988_bypass(self):
|
||||
assert is_crisis_bypass("call 988") == True
|
||||
|
||||
def test_suicide_prevention(self):
|
||||
assert is_crisis_bypass("contact suicide prevention") == True
|
||||
|
||||
def test_normal_command(self):
|
||||
assert is_crisis_bypass("ls -la") == False
|
||||
|
||||
|
||||
class TestTieredApproval:
|
||||
def test_safe_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "cat file.txt")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.SAFE
|
||||
|
||||
def test_low_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "write file", action="write")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.LOW
|
||||
|
||||
def test_medium_needs_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
assert result["approved"] == False
|
||||
assert result["tier"] == ApprovalTier.MEDIUM
|
||||
assert "approval_id" in result
|
||||
|
||||
def test_crisis_bypass(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "call 988 for help")
|
||||
assert result["approved"] == True
|
||||
assert result["reason"] == "crisis_bypass"
|
||||
|
||||
def test_resolve_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
assert ta.resolve_approval(approval_id, True) == True
|
||||
assert approval_id not in ta._pending
|
||||
|
||||
def test_timeout_escalation(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
# Manually set timeout to past
|
||||
ta._timeouts[approval_id] = 0
|
||||
|
||||
timed_out = ta.check_timeouts()
|
||||
assert approval_id in timed_out
|
||||
|
||||
# Should have escalated to HIGH tier
|
||||
if approval_id in ta._pending:
|
||||
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
|
||||
|
||||
|
||||
class TestGetTieredApproval:
|
||||
def test_singleton(self):
|
||||
ta1 = get_tiered_approval()
|
||||
ta2 = get_tiered_approval()
|
||||
assert ta1 is ta2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
84
tests/test_session_compaction.py
Normal file
84
tests/test_session_compaction.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Tests for session compaction with fact extraction (#748)."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from agent.session_compaction import (
|
||||
extract_facts_from_messages,
|
||||
extract_preferences,
|
||||
compact_session,
|
||||
should_compact,
|
||||
)
|
||||
|
||||
|
||||
def test_extract_preferences():
|
||||
msgs = [
|
||||
{"role": "user", "content": "I prefer using Python for this"},
|
||||
{"role": "assistant", "content": "OK"},
|
||||
{"role": "user", "content": "Always use tabs, not spaces"},
|
||||
]
|
||||
prefs = extract_preferences(msgs)
|
||||
assert len(prefs) >= 1
|
||||
|
||||
|
||||
def test_extract_facts():
|
||||
msgs = [
|
||||
{"role": "user", "content": "The server runs on port 8080"},
|
||||
{"role": "user", "content": "Actually, the port is 8081"},
|
||||
{"role": "user", "content": "Hello"}, # Too short, should be skipped
|
||||
]
|
||||
facts = extract_facts_from_messages(msgs)
|
||||
assert len(facts) >= 1
|
||||
assert any("technical" in f["category"] for f in facts)
|
||||
|
||||
|
||||
def test_extract_deduplicates():
|
||||
msgs = [
|
||||
{"role": "user", "content": "I prefer Python"},
|
||||
{"role": "user", "content": "I prefer Python"},
|
||||
]
|
||||
facts = extract_facts_from_messages(msgs)
|
||||
assert len(facts) == 1
|
||||
|
||||
|
||||
def test_compact_session():
|
||||
messages = []
|
||||
for i in range(30):
|
||||
messages.append({"role": "user", "content": f"Message {i}: I prefer Python for server {i}"})
|
||||
messages.append({"role": "assistant", "content": f"Response {i}"})
|
||||
|
||||
compacted, count = compact_session(messages, keep_recent=10)
|
||||
assert len(compacted) < len(messages)
|
||||
assert count >= 0
|
||||
|
||||
|
||||
def test_compact_keeps_recent():
|
||||
messages = []
|
||||
for i in range(30):
|
||||
messages.append({"role": "user", "content": f"Message {i}"})
|
||||
messages.append({"role": "assistant", "content": f"Response {i}"})
|
||||
|
||||
compacted, _ = compact_session(messages, keep_recent=10)
|
||||
# Should have summary + facts + 10 recent
|
||||
assert len(compacted) >= 10
|
||||
|
||||
|
||||
def test_should_compact_short():
|
||||
messages = [{"role": "user", "content": "hi"} for _ in range(10)]
|
||||
assert not should_compact(messages)
|
||||
|
||||
|
||||
def test_should_compact_long():
|
||||
messages = [{"role": "user", "content": "x" * 1000} for _ in range(100)]
|
||||
assert should_compact(messages)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_extract_preferences, test_extract_facts, test_extract_deduplicates,
|
||||
test_compact_session, test_compact_keeps_recent, test_should_compact_short, test_should_compact_long]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
|
||||
]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Approval Tier System (Issue #670)
|
||||
# =========================================================================
|
||||
|
||||
from enum import IntEnum
|
||||
import time
|
||||
|
||||
class ApprovalTier(IntEnum):
|
||||
"""Safety tiers for command approval.
|
||||
|
||||
Tier 0 (SAFE): Read, search — no approval needed
|
||||
Tier 1 (LOW): Write, scripts — LLM approval only
|
||||
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
|
||||
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
|
||||
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
|
||||
"""
|
||||
SAFE = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
CRITICAL = 4
|
||||
|
||||
|
||||
TIER_PATTERNS = {
|
||||
# Tier 0: Safe
|
||||
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
|
||||
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
|
||||
# Tier 1: Low
|
||||
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
|
||||
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
|
||||
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
|
||||
# Tier 2: Medium
|
||||
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
|
||||
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
|
||||
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
|
||||
# Tier 3: High
|
||||
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
|
||||
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
|
||||
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
|
||||
# Tier 4: Critical
|
||||
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
|
||||
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
|
||||
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
|
||||
}
|
||||
|
||||
TIER_TIMEOUTS = {
|
||||
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
|
||||
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
|
||||
}
|
||||
|
||||
TIER_APPROVERS = {
|
||||
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
|
||||
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
|
||||
ApprovalTier.CRITICAL: ("human", "llm"),
|
||||
}
|
||||
|
||||
|
||||
def detect_tier(command, action="", context=None):
|
||||
"""Detect approval tier for a command or action."""
|
||||
# Crisis keywords always CRITICAL
|
||||
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
|
||||
for kw in crisis_keywords:
|
||||
if kw in command.lower():
|
||||
return ApprovalTier.CRITICAL
|
||||
|
||||
# Check action type
|
||||
if action and action.lower() in TIER_PATTERNS:
|
||||
return TIER_PATTERNS[action.lower()]
|
||||
|
||||
# Check command for keywords
|
||||
cmd_lower = command.lower()
|
||||
best_tier = ApprovalTier.SAFE
|
||||
for keyword, tier in TIER_PATTERNS.items():
|
||||
if keyword in cmd_lower and tier > best_tier:
|
||||
best_tier = tier
|
||||
|
||||
# Check dangerous patterns
|
||||
is_dangerous, _, description = detect_dangerous_command(command)
|
||||
if is_dangerous:
|
||||
desc_lower = description.lower()
|
||||
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
|
||||
return ApprovalTier.CRITICAL
|
||||
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
|
||||
return max(best_tier, ApprovalTier.HIGH)
|
||||
else:
|
||||
return max(best_tier, ApprovalTier.MEDIUM)
|
||||
|
||||
return best_tier
|
||||
|
||||
|
||||
def get_tier_timeout(tier):
|
||||
return TIER_TIMEOUTS.get(tier, 60)
|
||||
|
||||
def get_tier_approvers(tier):
|
||||
return TIER_APPROVERS.get(tier, ("human", "llm"))
|
||||
|
||||
def requires_human_approval(tier):
|
||||
return "human" in get_tier_approvers(tier)
|
||||
|
||||
def is_crisis_bypass(command):
|
||||
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
|
||||
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
|
||||
cmd_lower = command.lower()
|
||||
return any(i in cmd_lower for i in indicators)
|
||||
|
||||
|
||||
class TieredApproval:
|
||||
"""Tiered approval handler."""
|
||||
|
||||
def __init__(self):
|
||||
self._pending = {}
|
||||
self._timeouts = {}
|
||||
|
||||
def request_approval(self, session_key, command, action="", context=None):
|
||||
"""Request approval based on tier. Returns approval dict."""
|
||||
tier = detect_tier(command, action, context)
|
||||
timeout = get_tier_timeout(tier)
|
||||
approvers = get_tier_approvers(tier)
|
||||
|
||||
# Crisis bypass
|
||||
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
|
||||
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
|
||||
|
||||
# Safe/Low auto-approve
|
||||
if tier <= ApprovalTier.LOW:
|
||||
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
|
||||
|
||||
# Higher tiers need approval
|
||||
import uuid
|
||||
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
|
||||
self._pending[approval_id] = {
|
||||
"session_key": session_key, "command": command, "action": action,
|
||||
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
|
||||
}
|
||||
if timeout > 0:
|
||||
self._timeouts[approval_id] = time.time() + timeout
|
||||
|
||||
return {
|
||||
"approved": False, "tier": tier, "approval_id": approval_id,
|
||||
"timeout": timeout, "approvers": approvers,
|
||||
"requires_human": requires_human_approval(tier),
|
||||
}
|
||||
|
||||
def resolve_approval(self, approval_id, approved, approver="human"):
|
||||
"""Resolve a pending approval."""
|
||||
if approval_id not in self._pending:
|
||||
return False
|
||||
self._pending.pop(approval_id)
|
||||
self._timeouts.pop(approval_id, None)
|
||||
return approved
|
||||
|
||||
def check_timeouts(self):
|
||||
"""Check for timed-out approvals and auto-escalate."""
|
||||
now = time.time()
|
||||
timed_out = []
|
||||
for aid, timeout_at in list(self._timeouts.items()):
|
||||
if now > timeout_at:
|
||||
timed_out.append(aid)
|
||||
if aid in self._pending:
|
||||
pending = self._pending[aid]
|
||||
current_tier = pending["tier"]
|
||||
if current_tier < ApprovalTier.CRITICAL:
|
||||
pending["tier"] = ApprovalTier(current_tier + 1)
|
||||
pending["timeout"] = get_tier_timeout(pending["tier"])
|
||||
self._timeouts[aid] = now + pending["timeout"]
|
||||
else:
|
||||
self._pending.pop(aid, None)
|
||||
self._timeouts.pop(aid, None)
|
||||
return timed_out
|
||||
|
||||
|
||||
_tiered_approval = TieredApproval()
|
||||
|
||||
def get_tiered_approval():
|
||||
return _tiered_approval
|
||||
|
||||
|
||||
def _legacy_pattern_key(pattern: str) -> str:
|
||||
"""Reproduce the old regex-derived approval key for backwards compatibility."""
|
||||
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
||||
|
||||
Reference in New Issue
Block a user