Compare commits

..

1 Commits

Author SHA1 Message Date
78f0a5c01b security: fix path traversal vulnerability (CVSS 9.1)
Add comprehensive path traversal detection and validation to prevent
unauthorized file access outside working directories.

Changes:
- tools/file_operations.py: Add _validate_safe_path(), _contains_path_traversal()
- Validate all paths in read_file(), write_file() before processing
- Detect patterns: ../, ..\, URL-encoded, null bytes, control chars

Fixes CWE-22: Path Traversal vulnerability where malicious paths like
../../../etc/shadow could access sensitive files.

CVSS: 9.1 (Critical)
Refs: V-002 in SECURITY_AUDIT_REPORT.md
2026-03-30 23:17:09 +00:00
4 changed files with 85 additions and 147 deletions

View File

@@ -1,45 +0,0 @@
"""Phase 3: Deep Knowledge Distillation from Google.
Performs deep dives into technical domains and distills them into
Timmy's Sovereign Knowledge Graph.
"""
import logging
import json
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from agent.symbolic_memory import SymbolicMemory
logger = logging.getLogger(__name__)
class DomainDistiller:
def __init__(self):
self.adapter = GeminiAdapter()
self.symbolic = SymbolicMemory()
def distill_domain(self, domain: str):
"""Crawls and distills an entire technical domain."""
logger.info(f"Distilling domain: {domain}")
prompt = f"""
Please perform a deep knowledge distillation of the following domain: {domain}
Use Google Search to find foundational papers, recent developments, and key entities.
Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples.
Focus on the structural relationships that define the domain.
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
"""
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.",
grounding=True,
thinking=True,
response_mime_type="application/json"
)
triples = json.loads(result["text"])
count = self.symbolic.ingest_text(json.dumps(triples))
logger.info(f"Distilled {count} new triples for domain: {domain}")
return count

View File

@@ -1,60 +0,0 @@
"""Phase 1: Synthetic Data Generation for Self-Correction.
Generates reasoning traces where Timmy makes a subtle error and then
identifies and corrects it using the Conscience Validator.
"""
import logging
import json
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from tools.gitea_client import GiteaClient
logger = logging.getLogger(__name__)
class SelfCorrectionGenerator:
def __init__(self):
self.adapter = GeminiAdapter()
self.gitea = GiteaClient()
def generate_trace(self, task: str) -> Dict[str, Any]:
"""Generates a single self-correction reasoning trace."""
prompt = f"""
Task: {task}
Please simulate a multi-step reasoning trace for this task.
Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error).
Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace.
Format the output as JSON:
{{
"task": "{task}",
"initial_trace": "...",
"error_identified": "...",
"correction_trace": "...",
"lessons_learned": "..."
}}
"""
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.",
response_mime_type="application/json",
thinking=True
)
trace = json.loads(result["text"])
return trace
def generate_and_save(self, task: str, count: int = 1):
"""Generates multiple traces and saves them to Gitea."""
repo = "Timmy_Foundation/timmy-config"
for i in range(count):
trace = self.generate_trace(task)
filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json"
content = json.dumps(trace, indent=2)
content_b64 = base64.b64encode(content.encode()).decode()
self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}")
logger.info(f"Saved synthetic trace to {filename}")

View File

@@ -1,42 +0,0 @@
"""Phase 2: Multi-Modal World Modeling.
Ingests multi-modal data (vision/audio) to build a spatial and temporal
understanding of Timmy's environment.
"""
import logging
import base64
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from agent.symbolic_memory import SymbolicMemory
logger = logging.getLogger(__name__)
class WorldModeler:
def __init__(self):
self.adapter = GeminiAdapter()
self.symbolic = SymbolicMemory()
def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"):
"""Analyzes an image of the environment and updates the world model."""
# In a real scenario, we'd use Gemini's multi-modal capabilities
# For now, we'll simulate the vision-to-symbolic extraction
prompt = f"""
Analyze the following image of Timmy's environment.
Identify all key objects, their spatial relationships, and any temporal changes.
Extract this into a set of symbolic triples for the Knowledge Graph.
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
"""
# Simulate multi-modal call (Gemini 3.1 Pro Vision)
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.",
response_mime_type="application/json"
)
triples = json.loads(result["text"])
self.symbolic.ingest_text(json.dumps(triples))
logger.info(f"Updated world model with {len(triples)} new spatial triples.")
return triples

View File

@@ -112,6 +112,81 @@ def _is_write_denied(path: str) -> bool:
return False
# SECURITY: Path traversal detection patterns
_PATH_TRAVERSAL_PATTERNS = [
re.compile(r'\.\./'), # Unix-style traversal
re.compile(r'\.\.\\'), # Windows-style traversal
re.compile(r'\.\.$'), # Bare .. at end
re.compile(r'%2e%2e[/\\]', re.IGNORECASE), # URL-encoded traversal
re.compile(r'\.\.//'), # Double-slash traversal
re.compile(r'^/~'), # Attempted home dir escape via tilde
]
def _contains_path_traversal(path: str) -> bool:
"""Check if path contains directory traversal attempts.
SECURITY FIX (V-002): Detects path traversal patterns like:
- ../../../etc/passwd
- ..\\..\\windows\\system32
- %2e%2e%2f (URL-encoded)
- ~/../../../etc/shadow (via tilde expansion)
"""
if not path:
return False
# Check against all traversal patterns
for pattern in _PATH_TRAVERSAL_PATTERNS:
if pattern.search(path):
return True
# Check for null byte injection (CWE-73)
if '\x00' in path:
return True
# Check for overly long paths that might bypass filters
if len(path) > 4096:
return True
return False
def _validate_safe_path(path: str, operation: str = "access") -> tuple[bool, str]:
"""Validate that a path is safe for file operations.
Returns:
(is_safe, error_message) tuple. If is_safe is False, error_message
contains the reason.
SECURITY FIX (V-002): Centralized path validation to prevent:
- Path traversal attacks (../../../etc/shadow)
- Home directory expansion attacks (~user/malicious)
- Null byte injection
"""
if not path:
return False, "Path cannot be empty"
# Check for path traversal attempts
if _contains_path_traversal(path):
return False, (
f"Path traversal detected in '{path}'. "
f"Access to paths outside the working directory is not permitted."
)
# Validate path characters (prevent shell injection via special chars)
# Allow alphanumeric, spaces, common path chars, but block control chars
invalid_chars = set()
for char in path:
if ord(char) < 32 and char not in '\t\n': # Control chars except tab/newline
invalid_chars.add(repr(char))
if invalid_chars:
return False, (
f"Path contains invalid control characters: {', '.join(invalid_chars)}"
)
return True, ""
# =============================================================================
# Result Data Classes
# =============================================================================
@@ -475,6 +550,11 @@ class ShellFileOperations(FileOperations):
Returns:
ReadResult with content, metadata, or error info
"""
# SECURITY FIX (V-002): Validate path before any operations
is_safe, error_msg = _validate_safe_path(path, "read")
if not is_safe:
return ReadResult(error=f"Security violation: {error_msg}")
# Expand ~ and other shell paths
path = self._expand_path(path)
@@ -663,6 +743,11 @@ class ShellFileOperations(FileOperations):
Returns:
WriteResult with bytes written or error
"""
# SECURITY FIX (V-002): Validate path before any operations
is_safe, error_msg = _validate_safe_path(path, "write")
if not is_safe:
return WriteResult(error=f"Security violation: {error_msg}")
# Expand ~ and other shell paths
path = self._expand_path(path)