Compare commits
1 Commits
feat/gen-a
...
security/f
| Author | SHA1 | Date | |
|---|---|---|---|
| 78f0a5c01b |
@@ -1,45 +0,0 @@
|
||||
"""Phase 3: Deep Knowledge Distillation from Google.
|
||||
|
||||
Performs deep dives into technical domains and distills them into
|
||||
Timmy's Sovereign Knowledge Graph.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from agent.symbolic_memory import SymbolicMemory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DomainDistiller:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.symbolic = SymbolicMemory()
|
||||
|
||||
def distill_domain(self, domain: str):
|
||||
"""Crawls and distills an entire technical domain."""
|
||||
logger.info(f"Distilling domain: {domain}")
|
||||
|
||||
prompt = f"""
|
||||
Please perform a deep knowledge distillation of the following domain: {domain}
|
||||
|
||||
Use Google Search to find foundational papers, recent developments, and key entities.
|
||||
Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples.
|
||||
Focus on the structural relationships that define the domain.
|
||||
|
||||
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||
"""
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.",
|
||||
grounding=True,
|
||||
thinking=True,
|
||||
response_mime_type="application/json"
|
||||
)
|
||||
|
||||
triples = json.loads(result["text"])
|
||||
count = self.symbolic.ingest_text(json.dumps(triples))
|
||||
logger.info(f"Distilled {count} new triples for domain: {domain}")
|
||||
return count
|
||||
@@ -1,60 +0,0 @@
|
||||
"""Phase 1: Synthetic Data Generation for Self-Correction.
|
||||
|
||||
Generates reasoning traces where Timmy makes a subtle error and then
|
||||
identifies and corrects it using the Conscience Validator.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from tools.gitea_client import GiteaClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SelfCorrectionGenerator:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.gitea = GiteaClient()
|
||||
|
||||
def generate_trace(self, task: str) -> Dict[str, Any]:
|
||||
"""Generates a single self-correction reasoning trace."""
|
||||
prompt = f"""
|
||||
Task: {task}
|
||||
|
||||
Please simulate a multi-step reasoning trace for this task.
|
||||
Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error).
|
||||
Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace.
|
||||
|
||||
Format the output as JSON:
|
||||
{{
|
||||
"task": "{task}",
|
||||
"initial_trace": "...",
|
||||
"error_identified": "...",
|
||||
"correction_trace": "...",
|
||||
"lessons_learned": "..."
|
||||
}}
|
||||
"""
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.",
|
||||
response_mime_type="application/json",
|
||||
thinking=True
|
||||
)
|
||||
|
||||
trace = json.loads(result["text"])
|
||||
return trace
|
||||
|
||||
def generate_and_save(self, task: str, count: int = 1):
|
||||
"""Generates multiple traces and saves them to Gitea."""
|
||||
repo = "Timmy_Foundation/timmy-config"
|
||||
for i in range(count):
|
||||
trace = self.generate_trace(task)
|
||||
filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json"
|
||||
|
||||
content = json.dumps(trace, indent=2)
|
||||
content_b64 = base64.b64encode(content.encode()).decode()
|
||||
|
||||
self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}")
|
||||
logger.info(f"Saved synthetic trace to {filename}")
|
||||
@@ -1,42 +0,0 @@
|
||||
"""Phase 2: Multi-Modal World Modeling.
|
||||
|
||||
Ingests multi-modal data (vision/audio) to build a spatial and temporal
|
||||
understanding of Timmy's environment.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import base64
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from agent.symbolic_memory import SymbolicMemory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WorldModeler:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.symbolic = SymbolicMemory()
|
||||
|
||||
def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"):
|
||||
"""Analyzes an image of the environment and updates the world model."""
|
||||
# In a real scenario, we'd use Gemini's multi-modal capabilities
|
||||
# For now, we'll simulate the vision-to-symbolic extraction
|
||||
prompt = f"""
|
||||
Analyze the following image of Timmy's environment.
|
||||
Identify all key objects, their spatial relationships, and any temporal changes.
|
||||
Extract this into a set of symbolic triples for the Knowledge Graph.
|
||||
|
||||
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||
"""
|
||||
# Simulate multi-modal call (Gemini 3.1 Pro Vision)
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.",
|
||||
response_mime_type="application/json"
|
||||
)
|
||||
|
||||
triples = json.loads(result["text"])
|
||||
self.symbolic.ingest_text(json.dumps(triples))
|
||||
logger.info(f"Updated world model with {len(triples)} new spatial triples.")
|
||||
return triples
|
||||
@@ -112,6 +112,81 @@ def _is_write_denied(path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
# SECURITY: Path traversal detection patterns
|
||||
_PATH_TRAVERSAL_PATTERNS = [
|
||||
re.compile(r'\.\./'), # Unix-style traversal
|
||||
re.compile(r'\.\.\\'), # Windows-style traversal
|
||||
re.compile(r'\.\.$'), # Bare .. at end
|
||||
re.compile(r'%2e%2e[/\\]', re.IGNORECASE), # URL-encoded traversal
|
||||
re.compile(r'\.\.//'), # Double-slash traversal
|
||||
re.compile(r'^/~'), # Attempted home dir escape via tilde
|
||||
]
|
||||
|
||||
|
||||
def _contains_path_traversal(path: str) -> bool:
|
||||
"""Check if path contains directory traversal attempts.
|
||||
|
||||
SECURITY FIX (V-002): Detects path traversal patterns like:
|
||||
- ../../../etc/passwd
|
||||
- ..\\..\\windows\\system32
|
||||
- %2e%2e%2f (URL-encoded)
|
||||
- ~/../../../etc/shadow (via tilde expansion)
|
||||
"""
|
||||
if not path:
|
||||
return False
|
||||
|
||||
# Check against all traversal patterns
|
||||
for pattern in _PATH_TRAVERSAL_PATTERNS:
|
||||
if pattern.search(path):
|
||||
return True
|
||||
|
||||
# Check for null byte injection (CWE-73)
|
||||
if '\x00' in path:
|
||||
return True
|
||||
|
||||
# Check for overly long paths that might bypass filters
|
||||
if len(path) > 4096:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _validate_safe_path(path: str, operation: str = "access") -> tuple[bool, str]:
|
||||
"""Validate that a path is safe for file operations.
|
||||
|
||||
Returns:
|
||||
(is_safe, error_message) tuple. If is_safe is False, error_message
|
||||
contains the reason.
|
||||
|
||||
SECURITY FIX (V-002): Centralized path validation to prevent:
|
||||
- Path traversal attacks (../../../etc/shadow)
|
||||
- Home directory expansion attacks (~user/malicious)
|
||||
- Null byte injection
|
||||
"""
|
||||
if not path:
|
||||
return False, "Path cannot be empty"
|
||||
|
||||
# Check for path traversal attempts
|
||||
if _contains_path_traversal(path):
|
||||
return False, (
|
||||
f"Path traversal detected in '{path}'. "
|
||||
f"Access to paths outside the working directory is not permitted."
|
||||
)
|
||||
|
||||
# Validate path characters (prevent shell injection via special chars)
|
||||
# Allow alphanumeric, spaces, common path chars, but block control chars
|
||||
invalid_chars = set()
|
||||
for char in path:
|
||||
if ord(char) < 32 and char not in '\t\n': # Control chars except tab/newline
|
||||
invalid_chars.add(repr(char))
|
||||
if invalid_chars:
|
||||
return False, (
|
||||
f"Path contains invalid control characters: {', '.join(invalid_chars)}"
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Result Data Classes
|
||||
# =============================================================================
|
||||
@@ -475,6 +550,11 @@ class ShellFileOperations(FileOperations):
|
||||
Returns:
|
||||
ReadResult with content, metadata, or error info
|
||||
"""
|
||||
# SECURITY FIX (V-002): Validate path before any operations
|
||||
is_safe, error_msg = _validate_safe_path(path, "read")
|
||||
if not is_safe:
|
||||
return ReadResult(error=f"Security violation: {error_msg}")
|
||||
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
@@ -663,6 +743,11 @@ class ShellFileOperations(FileOperations):
|
||||
Returns:
|
||||
WriteResult with bytes written or error
|
||||
"""
|
||||
# SECURITY FIX (V-002): Validate path before any operations
|
||||
is_safe, error_msg = _validate_safe_path(path, "write")
|
||||
if not is_safe:
|
||||
return WriteResult(error=f"Security violation: {error_msg}")
|
||||
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user