Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
13ef670c05 feat: session compaction with fact extraction (#748)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Successful in 29s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 33s
Tests / e2e (pull_request) Successful in 3m26s
Tests / test (pull_request) Failing after 1h28m50s
Before compressing conversation context, extract durable facts
(user preferences, corrections, project details) and save to
fact store so they survive compression.

New agent/session_compactor.py:
- extract_facts_from_messages(): scans user messages for
  preferences, corrections, project/infra facts using regex
- 3 pattern categories: user_pref (5 patterns), correction
  (3 patterns), project (4 patterns)
- ExtractedFact: category, entity, content, confidence, source_turn
- save_facts_to_store(): saves to fact store (callback or auto-detect)
- extract_and_save_facts(): one-call extraction + persistence
- Deduplication by category+content
- Skips tool results, short messages, system messages
- format_facts_summary(): human-readable summary

Tests: tests/test_session_compactor.py (9 tests)

Closes #748
2026-04-15 22:41:54 -04:00
4 changed files with 322 additions and 391 deletions

231
agent/session_compactor.py Normal file
View File

@@ -0,0 +1,231 @@
"""Session compaction with fact extraction.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class ExtractedFact:
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
extracted = _extract_from_text(content, turn_idx, role)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
return saved
def extract_and_save_facts(
messages: List[Dict[str, Any]],
fact_store_fn=None,
) -> Tuple[List[ExtractedFact], int]:
"""Extract facts from messages and save them.
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
saved = 0
return facts, saved
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)

View File

@@ -1,122 +0,0 @@
"""Tests for credential redaction — Issue #839."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.credential_redaction import (
redact_credentials, should_auto_mask, mask_config_values,
redact_tool_output, RedactionResult
)
class TestRedactCredentials:
def test_openai_key(self):
text = "API key: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
result = redact_credentials(text)
assert result.was_redacted
assert "sk-abc" not in result.text
assert "[REDACTED" in result.text
def test_github_pat(self):
text = "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"
result = redact_credentials(text)
assert result.was_redacted
assert "ghp_" not in result.text
def test_bearer_token(self):
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
result = redact_credentials(text)
assert result.was_redacted
assert "Bearer eyJ" not in result.text
def test_password_assignment(self):
text = 'password: "supersecret123"'
result = redact_credentials(text)
assert result.was_redacted
def test_clean_text(self):
text = "Hello world, no credentials here"
result = redact_credentials(text)
assert not result.was_redacted
assert result.text == text
def test_empty_text(self):
result = redact_credentials("")
assert not result.was_redacted
class TestShouldAutoMask:
def test_env_file(self):
assert should_auto_mask(".env") == True
def test_config_file(self):
assert should_auto_mask("config.yaml") == True
def test_token_file(self):
assert should_auto_mask("gitea_token") == True
def test_normal_file(self):
assert should_auto_mask("readme.md") == False
class TestMaskConfigValues:
def test_env_api_key(self):
text = "API_KEY=sk-abc123def456"
result = mask_config_values(text)
assert "sk-abc" not in result
assert "[REDACTED]" in result
def test_yaml_token(self):
text = 'token: "ghp_1234567890"'
result = mask_config_values(text)
assert "ghp_" not in result
assert "[REDACTED]" in result
def test_preserves_structure(self):
text = "API_KEY=secret\nOTHER=value"
result = mask_config_values(text)
assert "OTHER=value" in result # Non-credential preserved
class TestRedactToolOutput:
def test_string_output(self):
output = "Result: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
redacted, notice = redact_tool_output("file_read", output)
assert "sk-abc123" not in redacted
assert notice is not None
def test_dict_output(self):
output = {"content": "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"}
redacted, notice = redact_tool_output("file_read", output)
assert "ghp_" not in redacted["content"]
def test_clean_output(self):
output = "No credentials here"
redacted, notice = redact_tool_output("file_read", output)
assert redacted == output
assert notice is None
class TestRedactionResult:
def test_notice_singular(self):
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
assert "1 credential pattern" in result.notice()
def test_notice_plural(self):
result = RedactionResult("redacted", "original", [
{"pattern_name": "test1"},
{"pattern_name": "test2"},
])
assert "2 credential patterns" in result.notice()
def test_to_dict(self):
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
d = result.to_dict()
assert d["redacted"] == True
assert d["count"] == 1
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,91 @@
"""Tests for session compaction with fact extraction."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
extract_facts_from_messages,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
class TestFactExtraction:
def test_extract_preference(self):
messages = [
{"role": "user", "content": "I prefer Python over JavaScript for backend work."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
assert any("Python" in f.content for f in facts)
def test_extract_correction(self):
messages = [
{"role": "user", "content": "Actually the port is 8081 not 8080."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
assert any("8081" in f.content for f in facts)
def test_extract_project_fact(self):
messages = [
{"role": "user", "content": "The project uses Gitea for source control."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
def test_skip_tool_results(self):
messages = [
{"role": "assistant", "content": "Running command...", "tool_calls": [{"id": "1"}]},
{"role": "tool", "content": "output here"},
]
facts = extract_facts_from_messages(messages)
assert len(facts) == 0
def test_skip_short_messages(self):
messages = [
{"role": "user", "content": "ok"},
]
facts = extract_facts_from_messages(messages)
assert len(facts) == 0
def test_deduplication(self):
messages = [
{"role": "user", "content": "I prefer Python."},
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
facts = [ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0)]
count = save_facts_to_store(facts, fact_store_fn=mock_save)
assert count == 1
assert len(saved) == 1
class TestFormatSummary:
def test_empty(self):
assert "No facts" in format_facts_summary([])
def test_with_facts(self):
facts = [
ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0),
ExtractedFact("correction", "user", "port is 8081", 0.9, 1),
]
summary = format_facts_summary(facts)
assert "2 facts" in summary
assert "user_pref" in summary

View File

@@ -1,269 +0,0 @@
"""Credential Redaction — Poka-yoke for tool outputs.
Blocks silent credential exposure by redacting API keys, tokens, and
passwords from tool outputs before they enter agent context.
Issue #839: Poka-yoke: Block silent credential exposure in tool outputs
"""
import json
import logging
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Audit log path
_AUDIT_DIR = Path.home() / ".hermes" / "audit"
_AUDIT_LOG = _AUDIT_DIR / "redactions.jsonl"
# Credential patterns — order matters (most specific first)
_CREDENTIAL_PATTERNS = [
# API keys
(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED: OpenAI-style API key]'),
(r'sk-ant-[a-zA-Z0-9-]{20,}', '[REDACTED: Anthropic API key]'),
(r'ghp_[a-zA-Z0-9]{36}', '[REDACTED: GitHub PAT]'),
(r'gho_[a-zA-Z0-9]{36}', '[REDACTED: GitHub OAuth token]'),
(r'github_pat_[a-zA-Z0-9_]{82}', '[REDACTED: GitHub fine-grained PAT]'),
(r'glpat-[a-zA-Z0-9-]{20,}', '[REDACTED: GitLab PAT]'),
(r'syt_[a-zA-Z0-9_-]{40,}', '[REDACTED: Matrix access token]'),
(r'xoxb-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack bot token]'),
(r'xoxp-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack user token]'),
# Bearer tokens
(r'Bearer\s+[a-zA-Z0-9_.-]{20,}', '[REDACTED: Bearer token]'),
# Generic tokens/passwords in assignments
(r'(?:token|api_key|api_key|secret|password|passwd|pwd)\s*[:=]\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: credential]'),
# Environment variable assignments
(r'(?:export\s+)?(?:TOKEN|KEY|SECRET|PASSWORD|API_KEY)\s*=\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: env credential]'),
# Base64 encoded credentials (high entropy strings)
(r'(?:authorization|auth)\s*[:=]\s*(?:basic|bearer)\s+[a-zA-Z0-9+/=]{20,}', '[REDACTED: auth header]'),
# AWS credentials
(r'AKIA[0-9A-Z]{16}', '[REDACTED: AWS access key]'),
(r'(?<![A-Z0-9])[A-Za-z0-9/+=]{40}(?![A-Z0-9])', None), # Only match near context
# Private keys
(r'-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----', '[REDACTED: private key block]'),
]
class RedactionResult:
"""Result of credential redaction."""
def __init__(self, text: str, original: str, redactions: List[Dict[str, Any]]):
self.text = text
self.original = original
self.redactions = redactions
@property
def was_redacted(self) -> bool:
return len(self.redactions) > 0
@property
def count(self) -> int:
return len(self.redactions)
def notice(self) -> str:
"""Generate compact redaction notice."""
if not self.was_redacted:
return ""
return f"[REDACTED: {self.count} credential pattern{'s' if self.count > 1 else ''} found]"
def to_dict(self) -> Dict[str, Any]:
return {
"redacted": self.was_redacted,
"count": self.count,
"notice": self.notice(),
"patterns": [r["pattern_name"] for r in self.redactions],
}
def redact_credentials(text: str, source: str = "unknown") -> RedactionResult:
"""Redact credentials from text.
Args:
text: Text to redact
source: Source identifier for audit logging
Returns:
RedactionResult with redacted text and metadata
"""
if not text:
return RedactionResult(text, text, [])
redactions = []
result = text
for pattern, replacement in _CREDENTIAL_PATTERNS:
if replacement is None:
continue # Skip conditional patterns
matches = list(re.finditer(pattern, result, re.IGNORECASE))
for match in matches:
redactions.append({
"pattern_name": replacement,
"position": match.start(),
"length": len(match.group()),
"source": source,
"timestamp": time.time(),
})
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE)
redaction_result = RedactionResult(result, text, redactions)
# Log to audit trail
if redaction_result.was_redacted:
_log_redaction(redaction_result, source)
return redaction_result
def _log_redaction(result: RedactionResult, source: str) -> None:
"""Log redaction event to audit trail."""
try:
_AUDIT_DIR.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": time.time(),
"source": source,
"count": result.count,
"patterns": [r["pattern_name"] for r in result.redactions],
}
with open(_AUDIT_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception as e:
logger.debug(f"Failed to log redaction: {e}")
def should_auto_mask(file_path: str) -> bool:
"""Check if file should have credentials auto-masked."""
path_lower = file_path.lower()
sensitive_patterns = [
".env", "config", "token", "secret", "credential",
"key", "auth", "password", ".pem", ".key",
]
return any(p in path_lower for p in sensitive_patterns)
def mask_config_values(text: str) -> str:
"""Mask credential values in config/env files while preserving structure.
Transforms:
API_KEY=sk-abc123 → API_KEY=[REDACTED]
token: "ghp_xyz" → token: "[REDACTED]"
"""
lines = text.split("\n")
result = []
for line in lines:
# Match KEY=VALUE patterns
match = re.match(r'^(\s*(?:export\s+)?[A-Z_][A-Z0-9_]*)\s*=\s*(.*)', line)
if match:
key = match.group(1)
value = match.group(2).strip()
# Check if key looks credential-like
key_lower = key.lower()
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
if value and not value.startswith("[REDACTED]"):
# Preserve quotes
if value.startswith('"') and value.endswith('"'):
result.append(f'{key}="[REDACTED]"')
elif value.startswith("'") and value.endswith("'"):
result.append(f"{key}='[REDACTED]'")
else:
result.append(f"{key}=[REDACTED]")
continue
# Match YAML-style key: value
match = re.match(r'^(\s*[a-z_][a-z0-9_]*)\s*:\s*["\']?(.*?)["\']?\s*$', line)
if match:
key = match.group(1)
value = match.group(2).strip()
key_lower = key.lower()
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
if value and not value.startswith("[REDACTED]"):
result.append(f'{key}: "[REDACTED]"')
continue
result.append(line)
return "\n".join(result)
def redact_tool_output(
tool_name: str,
output: Any,
source: str = None,
) -> Tuple[Any, Optional[str]]:
"""Redact credentials from tool output.
Args:
tool_name: Name of the tool
output: Tool output (string or dict)
source: Source identifier (defaults to tool_name)
Returns:
Tuple of (redacted_output, notice)
"""
source = source or tool_name
if isinstance(output, str):
result = redact_credentials(output, source)
if result.was_redacted:
return result.text, result.notice()
return output, None
if isinstance(output, dict):
# Redact string values in dict
redacted = {}
notices = []
for key, value in output.items():
if isinstance(value, str):
r, n = redact_tool_output(tool_name, value, f"{source}.{key}")
redacted[key] = r
if n:
notices.append(n)
else:
redacted[key] = value
notice = "; ".join(notices) if notices else None
return redacted, notice
# Non-string, non-dict: pass through
return output, None
def get_redaction_stats() -> Dict[str, Any]:
"""Get redaction statistics from audit log."""
stats = {
"total_redactions": 0,
"by_source": {},
"by_pattern": {},
}
if not _AUDIT_LOG.exists():
return stats
try:
with open(_AUDIT_LOG, "r") as f:
for line in f:
entry = json.loads(line.strip())
stats["total_redactions"] += entry.get("count", 0)
source = entry.get("source", "unknown")
stats["by_source"][source] = stats["by_source"].get(source, 0) + 1
for pattern in entry.get("patterns", []):
stats["by_pattern"][pattern] = stats["by_pattern"].get(pattern, 0) + 1
except Exception:
pass
return stats