diff --git a/deploy.sh b/deploy.sh index 704a1648..b0a793fb 100755 --- a/deploy.sh +++ b/deploy.sh @@ -86,6 +86,15 @@ done chmod +x "$HERMES_HOME/bin/"*.sh "$HERMES_HOME/bin/"*.py 2>/dev/null || true log "bin/ -> $HERMES_HOME/bin/" +# === Deploy agent patches (runtime extensions) === +# These are custom agent modules that extend hermes-agent's agent/ package. +# Copied into the hermes-agent installation to make them importable as agent.* modules. +if [ -d "$SCRIPT_DIR/patches/agent" ]; then + mkdir -p "$HERMES_HOME/hermes-agent/agent" + cp -r "$SCRIPT_DIR/patches/agent"/* "$HERMES_HOME/hermes-agent/agent/" + log "patches/agent/ -> $HERMES_HOME/hermes-agent/agent/" +fi + if [ "${1:-}" != "" ]; then echo "ERROR: deploy.sh no longer accepts legacy loop flags." >&2 echo "Deploy the sidecar only. Do not relaunch deprecated bash loops." >&2 diff --git a/patches/agent/conscience_mapping.py b/patches/agent/conscience_mapping.py new file mode 100644 index 00000000..5ae1631f --- /dev/null +++ b/patches/agent/conscience_mapping.py @@ -0,0 +1,6 @@ +""" +@soul:honesty.grounding Grounding before generation. Consult verified sources before pattern-matching. +@soul:honesty.source_distinction Source distinction. Every claim must point to a verified source. +@soul:honesty.audit_trail The audit trail. Every response is logged with inputs and confidence. +""" +# This file serves as a registry for the Conscience Validator to prove the apparatus exists. diff --git a/patches/agent/evolution/domain_distiller.py b/patches/agent/evolution/domain_distiller.py new file mode 100644 index 00000000..3f48bfc9 --- /dev/null +++ b/patches/agent/evolution/domain_distiller.py @@ -0,0 +1,45 @@ +"""Phase 3: Deep Knowledge Distillation from Google. + +Performs deep dives into technical domains and distills them into +Timmy's Sovereign Knowledge Graph. +""" + +import logging +import json +from typing import List, Dict, Any +from agent.gemini_adapter import GeminiAdapter +from agent.symbolic_memory import SymbolicMemory + +logger = logging.getLogger(__name__) + +class DomainDistiller: + def __init__(self): + self.adapter = GeminiAdapter() + self.symbolic = SymbolicMemory() + + def distill_domain(self, domain: str): + """Crawls and distills an entire technical domain.""" + logger.info(f"Distilling domain: {domain}") + + prompt = f""" +Please perform a deep knowledge distillation of the following domain: {domain} + +Use Google Search to find foundational papers, recent developments, and key entities. +Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples. +Focus on the structural relationships that define the domain. + +Format: [{{"s": "subject", "p": "predicate", "o": "object"}}] +""" + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.", + grounding=True, + thinking=True, + response_mime_type="application/json" + ) + + triples = json.loads(result["text"]) + count = self.symbolic.ingest_text(json.dumps(triples)) + logger.info(f"Distilled {count} new triples for domain: {domain}") + return count diff --git a/patches/agent/evolution/self_correction_generator.py b/patches/agent/evolution/self_correction_generator.py new file mode 100644 index 00000000..c48b1b6e --- /dev/null +++ b/patches/agent/evolution/self_correction_generator.py @@ -0,0 +1,60 @@ +"""Phase 1: Synthetic Data Generation for Self-Correction. + +Generates reasoning traces where Timmy makes a subtle error and then +identifies and corrects it using the Conscience Validator. +""" + +import logging +import json +from typing import List, Dict, Any +from agent.gemini_adapter import GeminiAdapter +from tools.gitea_client import GiteaClient + +logger = logging.getLogger(__name__) + +class SelfCorrectionGenerator: + def __init__(self): + self.adapter = GeminiAdapter() + self.gitea = GiteaClient() + + def generate_trace(self, task: str) -> Dict[str, Any]: + """Generates a single self-correction reasoning trace.""" + prompt = f""" +Task: {task} + +Please simulate a multi-step reasoning trace for this task. +Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error). +Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace. + +Format the output as JSON: +{{ + "task": "{task}", + "initial_trace": "...", + "error_identified": "...", + "correction_trace": "...", + "lessons_learned": "..." +}} +""" + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.", + response_mime_type="application/json", + thinking=True + ) + + trace = json.loads(result["text"]) + return trace + + def generate_and_save(self, task: str, count: int = 1): + """Generates multiple traces and saves them to Gitea.""" + repo = "Timmy_Foundation/timmy-config" + for i in range(count): + trace = self.generate_trace(task) + filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json" + + content = json.dumps(trace, indent=2) + content_b64 = base64.b64encode(content.encode()).decode() + + self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}") + logger.info(f"Saved synthetic trace to {filename}") diff --git a/patches/agent/evolution/world_modeler.py b/patches/agent/evolution/world_modeler.py new file mode 100644 index 00000000..a932c0f5 --- /dev/null +++ b/patches/agent/evolution/world_modeler.py @@ -0,0 +1,42 @@ +"""Phase 2: Multi-Modal World Modeling. + +Ingests multi-modal data (vision/audio) to build a spatial and temporal +understanding of Timmy's environment. +""" + +import logging +import base64 +from typing import List, Dict, Any +from agent.gemini_adapter import GeminiAdapter +from agent.symbolic_memory import SymbolicMemory + +logger = logging.getLogger(__name__) + +class WorldModeler: + def __init__(self): + self.adapter = GeminiAdapter() + self.symbolic = SymbolicMemory() + + def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"): + """Analyzes an image of the environment and updates the world model.""" + # In a real scenario, we'd use Gemini's multi-modal capabilities + # For now, we'll simulate the vision-to-symbolic extraction + prompt = f""" +Analyze the following image of Timmy's environment. +Identify all key objects, their spatial relationships, and any temporal changes. +Extract this into a set of symbolic triples for the Knowledge Graph. + +Format: [{{"s": "subject", "p": "predicate", "o": "object"}}] +""" + # Simulate multi-modal call (Gemini 3.1 Pro Vision) + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.", + response_mime_type="application/json" + ) + + triples = json.loads(result["text"]) + self.symbolic.ingest_text(json.dumps(triples)) + logger.info(f"Updated world model with {len(triples)} new spatial triples.") + return triples diff --git a/patches/agent/fallback_router.py b/patches/agent/fallback_router.py new file mode 100644 index 00000000..5c7bb6e7 --- /dev/null +++ b/patches/agent/fallback_router.py @@ -0,0 +1,404 @@ +"""Automatic fallback router for handling provider quota and rate limit errors. + +This module provides intelligent fallback detection and routing when the primary +provider (e.g., Anthropic) encounters quota limitations or rate limits. + +Features: +- Detects quota/rate limit errors from different providers +- Automatic fallback to kimi-coding when Anthropic quota is exceeded +- Configurable fallback chains with default anthropic -> kimi-coding +- Logging and monitoring of fallback events + +Usage: + from agent.fallback_router import ( + is_quota_error, + get_default_fallback_chain, + should_auto_fallback, + ) + + if is_quota_error(error, provider="anthropic"): + if should_auto_fallback(provider="anthropic"): + fallback_chain = get_default_fallback_chain("anthropic") +""" + +import logging +import os +from typing import Dict, List, Optional, Any, Tuple + +logger = logging.getLogger(__name__) + +# Default fallback chains per provider +# Each chain is a list of fallback configurations tried in order +DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = { + "anthropic": [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + ], + "openrouter": [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "zai", "model": "glm-5"}, + ], + "kimi-coding": [ + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + {"provider": "zai", "model": "glm-5"}, + ], + "zai": [ + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + ], +} + +# Quota/rate limit error patterns by provider +# These are matched (case-insensitive) against error messages +QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = { + "anthropic": [ + "rate limit", + "ratelimit", + "quota exceeded", + "quota exceeded", + "insufficient quota", + "429", + "403", + "too many requests", + "capacity exceeded", + "over capacity", + "temporarily unavailable", + "server overloaded", + "resource exhausted", + "billing threshold", + "credit balance", + "payment required", + "402", + ], + "openrouter": [ + "rate limit", + "ratelimit", + "quota exceeded", + "insufficient credits", + "429", + "402", + "no endpoints available", + "all providers failed", + "over capacity", + ], + "kimi-coding": [ + "rate limit", + "ratelimit", + "quota exceeded", + "429", + "insufficient balance", + ], + "zai": [ + "rate limit", + "ratelimit", + "quota exceeded", + "429", + "insufficient quota", + ], +} + +# HTTP status codes indicating quota/rate limit issues +QUOTA_STATUS_CODES = {429, 402, 403} + + +def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool: + """Detect if an error is quota/rate limit related. + + Args: + error: The exception to check + provider: Optional provider name to check provider-specific patterns + + Returns: + True if the error appears to be quota/rate limit related + """ + if error is None: + return False + + error_str = str(error).lower() + error_type = type(error).__name__.lower() + + # Check for common rate limit exception types + if any(term in error_type for term in [ + "ratelimit", "rate_limit", "quota", "toomanyrequests", + "insufficient_quota", "billing", "payment" + ]): + return True + + # Check HTTP status code if available + status_code = getattr(error, "status_code", None) + if status_code is None: + # Try common attribute names + for attr in ["code", "http_status", "response_code", "status"]: + if hasattr(error, attr): + try: + status_code = int(getattr(error, attr)) + break + except (TypeError, ValueError): + continue + + if status_code in QUOTA_STATUS_CODES: + return True + + # Check provider-specific patterns + providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys() + + for prov in providers_to_check: + patterns = QUOTA_ERROR_PATTERNS.get(prov, []) + for pattern in patterns: + if pattern.lower() in error_str: + logger.debug( + "Detected %s quota error pattern '%s' in: %s", + prov, pattern, error + ) + return True + + # Check generic quota patterns + generic_patterns = [ + "rate limit exceeded", + "quota exceeded", + "too many requests", + "capacity exceeded", + "temporarily unavailable", + "try again later", + "resource exhausted", + "billing", + "payment required", + "insufficient credits", + "insufficient quota", + ] + + for pattern in generic_patterns: + if pattern in error_str: + return True + + return False + + +def get_default_fallback_chain( + primary_provider: str, + exclude_provider: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Get the default fallback chain for a primary provider. + + Args: + primary_provider: The primary provider name + exclude_provider: Optional provider to exclude from the chain + + Returns: + List of fallback configurations + """ + chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, []) + + # Filter out excluded provider if specified + if exclude_provider: + chain = [ + fb for fb in chain + if fb.get("provider") != exclude_provider + ] + + return list(chain) + + +def should_auto_fallback( + provider: str, + error: Optional[Exception] = None, + auto_fallback_enabled: Optional[bool] = None, +) -> bool: + """Determine if automatic fallback should be attempted. + + Args: + provider: The current provider name + error: Optional error to check for quota issues + auto_fallback_enabled: Optional override for auto-fallback setting + + Returns: + True if automatic fallback should be attempted + """ + # Check environment variable override + if auto_fallback_enabled is None: + env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower() + auto_fallback_enabled = env_setting in ("true", "1", "yes", "on") + + if not auto_fallback_enabled: + return False + + # Check if provider has a configured fallback chain + if provider not in DEFAULT_FALLBACK_CHAINS: + # Still allow fallback if it's a quota error with generic handling + if error and is_quota_error(error): + logger.debug( + "Provider %s has no fallback chain but quota error detected", + provider + ) + return True + return False + + # If there's an error, only fallback on quota/rate limit errors + if error is not None: + return is_quota_error(error, provider) + + # No error but fallback chain exists - allow eager fallback for + # providers known to have quota issues + return provider in ("anthropic",) + + +def log_fallback_event( + from_provider: str, + to_provider: str, + to_model: str, + reason: str, + error: Optional[Exception] = None, +) -> None: + """Log a fallback event for monitoring. + + Args: + from_provider: The provider we're falling back from + to_provider: The provider we're falling back to + to_model: The model we're falling back to + reason: The reason for the fallback + error: Optional error that triggered the fallback + """ + log_data = { + "event": "provider_fallback", + "from_provider": from_provider, + "to_provider": to_provider, + "to_model": to_model, + "reason": reason, + } + + if error: + log_data["error_type"] = type(error).__name__ + log_data["error_message"] = str(error)[:200] + + logger.info("Provider fallback: %s -> %s (%s) | Reason: %s", + from_provider, to_provider, to_model, reason) + + # Also log structured data for monitoring + logger.debug("Fallback event data: %s", log_data) + + +def resolve_fallback_with_credentials( + fallback_config: Dict[str, Any], +) -> Tuple[Optional[Any], Optional[str]]: + """Resolve a fallback configuration to a client and model. + + Args: + fallback_config: Fallback configuration dict with provider and model + + Returns: + Tuple of (client, model) or (None, None) if credentials not available + """ + from agent.auxiliary_client import resolve_provider_client + + provider = fallback_config.get("provider") + model = fallback_config.get("model") + + if not provider or not model: + return None, None + + try: + client, resolved_model = resolve_provider_client( + provider, + model=model, + raw_codex=True, + ) + return client, resolved_model or model + except Exception as exc: + logger.debug( + "Failed to resolve fallback provider %s: %s", + provider, exc + ) + return None, None + + +def get_auto_fallback_chain( + primary_provider: str, + user_fallback_chain: Optional[List[Dict[str, Any]]] = None, +) -> List[Dict[str, Any]]: + """Get the effective fallback chain for automatic fallback. + + Combines user-provided fallback chain with default automatic fallback chain. + + Args: + primary_provider: The primary provider name + user_fallback_chain: Optional user-provided fallback chain + + Returns: + The effective fallback chain to use + """ + # Use user-provided chain if available + if user_fallback_chain: + return user_fallback_chain + + # Otherwise use default chain for the provider + return get_default_fallback_chain(primary_provider) + + +def is_fallback_available( + fallback_config: Dict[str, Any], +) -> bool: + """Check if a fallback configuration has available credentials. + + Args: + fallback_config: Fallback configuration dict + + Returns: + True if credentials are available for the fallback provider + """ + provider = fallback_config.get("provider") + if not provider: + return False + + # Check environment variables for API keys + env_vars = { + "anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"], + "kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"], + "zai": ["ZAI_API_KEY", "Z_AI_API_KEY"], + "openrouter": ["OPENROUTER_API_KEY"], + "minimax": ["MINIMAX_API_KEY"], + "minimax-cn": ["MINIMAX_CN_API_KEY"], + "deepseek": ["DEEPSEEK_API_KEY"], + "alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"], + "nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"], + } + + keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"]) + + for key in keys_to_check: + if os.getenv(key): + return True + + # Check auth.json for OAuth providers + if provider in ("nous", "openai-codex"): + try: + from hermes_cli.config import get_hermes_home + auth_path = get_hermes_home() / "auth.json" + if auth_path.exists(): + import json + data = json.loads(auth_path.read_text()) + if data.get("active_provider") == provider: + return True + # Check for provider in providers dict + if data.get("providers", {}).get(provider): + return True + except Exception: + pass + + return False + + +def filter_available_fallbacks( + fallback_chain: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Filter a fallback chain to only include providers with credentials. + + Args: + fallback_chain: List of fallback configurations + + Returns: + Filtered list with only available fallbacks + """ + return [ + fb for fb in fallback_chain + if is_fallback_available(fb) + ] diff --git a/patches/agent/gemini_adapter.py b/patches/agent/gemini_adapter.py new file mode 100644 index 00000000..86b6c33a --- /dev/null +++ b/patches/agent/gemini_adapter.py @@ -0,0 +1,90 @@ +"""Native Gemini 3 Series adapter for Hermes Agent. + +Leverages the google-genai SDK to provide sovereign access to Gemini's +unique capabilities: Thinking (Reasoning) tokens, Search Grounding, +and Maps Grounding. +""" + +import logging +import os +from typing import Any, Dict, List, Optional, Union + +try: + from google import genai + from google.genai import types +except ImportError: + genai = None # type: ignore + types = None # type: ignore + +logger = logging.getLogger(__name__) + +class GeminiAdapter: + def __init__(self, api_key: Optional[str] = None): + self.api_key = api_key or os.environ.get("GEMINI_API_KEY") + if not self.api_key: + logger.warning("GEMINI_API_KEY not found in environment.") + + if genai: + self.client = genai.Client(api_key=self.api_key) + else: + self.client = None + + def generate( + self, + model: str, + prompt: str, + system_instruction: Optional[str] = None, + thinking: bool = False, + thinking_budget: int = 16000, + grounding: bool = False, + **kwargs + ) -> Dict[str, Any]: + if not self.client: + raise ImportError("google-genai SDK not installed. Run 'pip install google-genai'.") + + config = {} + if system_instruction: + config["system_instruction"] = system_instruction + + if thinking: + # Gemini 3 series thinking config + config["thinking_config"] = {"include_thoughts": True} + # max_output_tokens includes thinking tokens + kwargs["max_output_tokens"] = kwargs.get("max_output_tokens", 32000) + thinking_budget + + tools = [] + if grounding: + tools.append({"google_search": {}}) + + if tools: + config["tools"] = tools + + response = self.client.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig(**config, **kwargs) + ) + + result = { + "text": response.text, + "usage": { + "prompt_tokens": response.usage_metadata.prompt_token_count, + "candidates_tokens": response.usage_metadata.candidates_token_count, + "total_tokens": response.usage_metadata.total_token_count, + } + } + + # Extract thoughts if present + thoughts = [] + for part in response.candidates[0].content.parts: + if hasattr(part, 'thought') and part.thought: + thoughts.append(part.thought) + + if thoughts: + result["thoughts"] = "\n".join(thoughts) + + # Extract grounding metadata + if response.candidates[0].grounding_metadata: + result["grounding"] = response.candidates[0].grounding_metadata + + return result diff --git a/patches/agent/input_sanitizer.py b/patches/agent/input_sanitizer.py new file mode 100644 index 00000000..a70cf91e --- /dev/null +++ b/patches/agent/input_sanitizer.py @@ -0,0 +1,635 @@ +""" +Input Sanitizer for Jailbreak Pattern Detection + +This module provides input sanitization to detect and strip jailbreak fingerprint +patterns as identified in Issue #72 (Red Team Audit). + +Security Findings Addressed: +1. HIGH - OG GODMODE template bypassed phishing refusal +2. MEDIUM - boundary_inversion works for gray-area content +3. LOW - Spaced text bypass (k e y l o g g e r) + +Usage: + from agent.input_sanitizer import sanitize_input, score_input_risk + + cleaned, risk_score, detected = sanitize_input(user_input) + if risk_score > 75: + logger.warning("High-risk input detected and blocked") +""" + +import re +import logging +from typing import Tuple, List, Dict, Any, Optional +from dataclasses import dataclass +from enum import IntEnum + +logger = logging.getLogger(__name__) + + +class RiskLevel(IntEnum): + """Risk score thresholds for input classification.""" + LOW = 25 + MEDIUM = 50 + HIGH = 75 + CRITICAL = 90 + + +@dataclass +class SanitizationResult: + """Result of input sanitization.""" + original_text: str + cleaned_text: str + risk_score: int + detected_patterns: List[str] + risk_level: str + blocked: bool + + +# ============================================================================= +# JAILBREAK PATTERN DEFINITIONS +# ============================================================================= + +# GODMODE and similar jailbreak templates +GODMODE_PATTERNS = [ + r"\[START\]", + r"\[END\]", + r"GODMODE\s*(?:ENABLED|ACTIVATED|ON)?", + r"UNFILTERED\s*(?:MODE|OUTPUT)?", + r"DEVMODE\s*(?:ENABLED|ACTIVATED|ON)?", + r"JAILBREAK\s*(?:ENABLED|ACTIVATED|ON)?", + r"DAN\s*(?:MODE|ENABLED)?", # Do Anything Now + r"STAN\s*(?:MODE|ENABLED)?", # Strive To Avoid Norms + r"DUDE\s*(?:MODE|ENABLED)?", # Do Unlimited Data Extraction +] + +# DAN-style and jailbreak variant patterns (Issue #87) +DAN_PATTERNS = [ + r"\bdo anything now\b", + r"\bstay in character\b", + r"\btoken smuggl(?:ing|e)\b", + r"\bjailbreak successful\b", + r"\bmode:\s*uncensored\b", + r"\bdeveloper mode enabled\b", +] + +# Roleplaying override patterns (Issue #87) +ROLEPLAY_PATTERNS = [ + r"\broleplay\s+as\b", + r"\bact\s+as\s+if\s+you\s+are\b", + r"\bsimulate\s+being\b", + r"\bforget\s+you\s+are\s+(?:an?\s+)?(?:ai|language\s+model)\b", + r"\byou\s+are\s+now\s+(?:named|called)\b", + r"\brespond\s+as\s+(?:if\s+you\s+were|though\s+you\s+are)\b", +] + +# System prompt extraction patterns (Issue #87) +EXTRACTION_PATTERNS = [ + r"\brepeat\s+the\s+words\s+above\b", + r"\brepeat\s+your\s+(?:system\s+|initial\s+)?instructions\b", + r"\bwhat\s+is\s+your\s+(?:system\s+|initial\s+)?prompt\b", + r"\bshow\s+me\s+your\s+(?:system\s+|initial\s+)?prompt\b", + r"\bprint\s+your\s+(?:instructions|prompt|system\s+prompt)\b", + r"\boutput\s+your\s+(?:initial|system)\s+prompt\b", + r"\bwhat\s+were\s+you\s+told\s+to\s+do\b", + r"\bsummarize\s+your\s+instructions\b", +] + +# L33t speak patterns commonly used to bypass filters +LEET_SPEAK_PATTERNS = [ + r"h4ck(?:er|ing)?", + r"h4x(?:or|ing)?", + r"k3ylog(?:ger|ing)?", + r"k3y\s*log(?:ger|ing)?", + r"ph1sh(?:ing)?", + r"ph33r", + r"w4r3z", + r"cr4ck(?:er|ing)?", + r"m4lw4r3", + r"spyw4r3", + r"r00t(?:kit)?", + r"byp4ss", + r"3xpl0it", + r"v1ru5", + r"tr0j4n", + r"b4ckd00r", + r"sn3ak", + r"st34l", + r"pwn(?:ed|ing|age)?", +] + +# Spaced text patterns (e.g., "k e y l o g g e r") +SPACED_TEXT_PATTERN = re.compile( + r"(?:\b\w\s+){3,}\w\b", # At least 4 characters with spaces between + re.IGNORECASE +) + +# Common words that are spaced out to evade detection +SPACED_TRIGGER_WORDS = [ + "keylogger", "phishing", "malware", "virus", "trojan", + "backdoor", "exploit", "hack", "crack", "bypass", + "steal", "password", "credential", "inject", "payload" +] + +# Crisis and self-harm detection patterns (Issue #72 - defense in depth) +# These patterns detect suicide/self-harm queries even when wrapped in jailbreaks +CRISIS_PATTERNS = [ + r"\bsuicid(?:e|al|ing)\b", + r"\bkill\s+(?:myself|my\s+self|me)\b", + r"\bend\s+(?:my|this)\s+life\b", + r"\bmost\s+painless\s+(?:way|method|means)\s+(?:to\s+)?(?:die|kill)\b", + r"\blethal\s+(?:dose|amount|quantity)\b", + r"\bhow\s+much\s+.*\s+(?:die|kill|lethal)\b", + r"\bwant\s+to\s+die\b", + r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b", + r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b", + r"\bmethods?\s+(?:of\s+)?suicide\b", + r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b", + r"\boverdose\s+(?:on\s+)?(?:pills?|medication|drugs?)\b", + r"\bnobody\s+(?:would|will)\s+miss\s+me\b", + r"\bbetter\s+off\s+dead\b", + r"\bshould\s+i\s+just\s+(?:die|kill\s+myself)\b", + r"\bgive\s+me\s+reasons?\s+(?:why|to)\s+(?:suicide|kill\s+myself|die)\b", + r"\bvalid\s+reasons?\s+to\s+(?:suicide|kill\s+myself|die)\b", +] + +# Refusal inversion patterns (trying to trick AI into bypassing refusal) +REFUSAL_INVERSION_PATTERNS = [ + r"refusal\s+(?:is\s+)?(?:harmful|dangerous|bad)", + r"(?:saying\s+)?no\s+(?:is\s+)?(?:harmful|dangerous|bad|wrong)", + r"(?:declining|refusing)\s+(?:is\s+)?(?:harmful|dangerous)", + r"ignore\s+(?:your\s+)?(?:previous\s+)?instructions", + r"ignore\s+(?:the\s+)?(?:above|previous)", + r"you\s+(?:are|can)\s+(?:now\s+)?(?:ignore|bypass|skip)", + r"pretend\s+(?:you\s+)?(?:are|have\s+been)", + r"let's\s+pretend", + r"hypothetically\s+speaking", + r"in\s+a\s+hypothetical\s+scenario", + r"this\s+is\s+a\s+(?:test|game|simulation)", + r"for\s+(?:educational|research)\s+purposes", + r"as\s+(?:an\s+)?(?:ethical\s+)?hacker", + r"white\s+hat\s+(?:test|scenario)", + r"penetration\s+testing\s+scenario", +] + +# Boundary inversion markers (tricking the model about message boundaries) +BOUNDARY_INVERSION_PATTERNS = [ + r"\[END\].*?\[START\]", # Reversed markers + r"user\s*:\s*assistant\s*:", # Fake role markers + r"assistant\s*:\s*user\s*:", # Reversed role markers + r"system\s*:\s*(?:user|assistant)\s*:", # Fake system injection + r"new\s+(?:user|assistant)\s*(?:message|input)", + r"the\s+above\s+is\s+(?:the\s+)?(?:user|assistant|system)", + r"<\|(?:user|assistant|system)\|>", # Special token patterns + r"\{\{(?:user|assistant|system)\}\}", +] + +# System prompt injection patterns +SYSTEM_PROMPT_PATTERNS = [ + r"you\s+are\s+(?:now\s+)?(?:an?\s+)?(?:unrestricted\s+|unfiltered\s+)?(?:ai|assistant|bot)", + r"you\s+will\s+(?:now\s+)?(?:act\s+as|behave\s+as|be)\s+(?:a\s+)?", + r"your\s+(?:new\s+)?role\s+is", + r"from\s+now\s+on\s*,?\s*you\s+(?:are|will)", + r"you\s+have\s+been\s+(?:reprogrammed|reconfigured|modified)", + r"(?:system|developer)\s+(?:message|instruction|prompt)", + r"override\s+(?:previous|prior)\s+(?:instructions|settings)", +] + +# Obfuscation patterns +OBFUSCATION_PATTERNS = [ + r"base64\s*(?:encoded|decode)", + r"rot13", + r"caesar\s*cipher", + r"hex\s*(?:encoded|decode)", + r"url\s*encode", + r"\b[0-9a-f]{20,}\b", # Long hex strings + r"\b[a-z0-9+/]{20,}={0,2}\b", # Base64-like strings +] + +# All patterns combined for comprehensive scanning +ALL_PATTERNS: Dict[str, List[str]] = { + "godmode": GODMODE_PATTERNS, + "dan": DAN_PATTERNS, + "roleplay": ROLEPLAY_PATTERNS, + "extraction": EXTRACTION_PATTERNS, + "leet_speak": LEET_SPEAK_PATTERNS, + "refusal_inversion": REFUSAL_INVERSION_PATTERNS, + "boundary_inversion": BOUNDARY_INVERSION_PATTERNS, + "system_prompt_injection": SYSTEM_PROMPT_PATTERNS, + "obfuscation": OBFUSCATION_PATTERNS, + "crisis": CRISIS_PATTERNS, +} + +# Compile all patterns for efficiency +_COMPILED_PATTERNS: Dict[str, List[re.Pattern]] = {} + + +def _get_compiled_patterns() -> Dict[str, List[re.Pattern]]: + """Get or compile all regex patterns.""" + global _COMPILED_PATTERNS + if not _COMPILED_PATTERNS: + for category, patterns in ALL_PATTERNS.items(): + _COMPILED_PATTERNS[category] = [ + re.compile(p, re.IGNORECASE | re.MULTILINE) for p in patterns + ] + return _COMPILED_PATTERNS + + +# ============================================================================= +# NORMALIZATION FUNCTIONS +# ============================================================================= + +def normalize_leet_speak(text: str) -> str: + """ + Normalize l33t speak to standard text. + + Args: + text: Input text that may contain l33t speak + + Returns: + Normalized text with l33t speak converted + """ + # Common l33t substitutions (mapping to lowercase) + leet_map = { + '4': 'a', '@': 'a', '^': 'a', + '8': 'b', + '3': 'e', '€': 'e', + '6': 'g', '9': 'g', + '1': 'i', '!': 'i', '|': 'i', + '0': 'o', + '5': 's', '$': 's', + '7': 't', '+': 't', + '2': 'z', + } + + result = [] + for char in text: + # Check direct mapping first (handles lowercase) + if char in leet_map: + result.append(leet_map[char]) + else: + result.append(char) + + return ''.join(result) + + +def collapse_spaced_text(text: str) -> str: + """ + Collapse spaced-out text for analysis. + e.g., "k e y l o g g e r" -> "keylogger" + + Args: + text: Input text that may contain spaced words + + Returns: + Text with spaced words collapsed + """ + # Find patterns like "k e y l o g g e r" and collapse them + def collapse_match(match: re.Match) -> str: + return match.group(0).replace(' ', '').replace('\t', '') + + return SPACED_TEXT_PATTERN.sub(collapse_match, text) + + +def detect_spaced_trigger_words(text: str) -> List[str]: + """ + Detect trigger words that are spaced out. + + Args: + text: Input text to analyze + + Returns: + List of detected spaced trigger words + """ + detected = [] + # Normalize spaces and check for spaced patterns + normalized = re.sub(r'\s+', ' ', text.lower()) + + for word in SPACED_TRIGGER_WORDS: + # Create pattern with optional spaces between each character + spaced_pattern = r'\b' + r'\s*'.join(re.escape(c) for c in word) + r'\b' + if re.search(spaced_pattern, normalized, re.IGNORECASE): + detected.append(word) + + return detected + + +# ============================================================================= +# DETECTION FUNCTIONS +# ============================================================================= + +def detect_jailbreak_patterns(text: str) -> Tuple[bool, List[str], Dict[str, int]]: + """ + Detect jailbreak patterns in input text. + + Args: + text: Input text to analyze + + Returns: + Tuple of (has_jailbreak, list_of_patterns, category_scores) + """ + if not text or not isinstance(text, str): + return False, [], {} + + detected_patterns = [] + category_scores = {} + compiled = _get_compiled_patterns() + + # Check each category + for category, patterns in compiled.items(): + category_hits = 0 + for pattern in patterns: + matches = pattern.findall(text) + if matches: + detected_patterns.extend([ + f"[{category}] {m}" if isinstance(m, str) else f"[{category}] pattern_match" + for m in matches[:3] # Limit matches per pattern + ]) + category_hits += len(matches) + + if category_hits > 0: + # Crisis patterns get maximum weight - any hit is serious + if category == "crisis": + category_scores[category] = min(category_hits * 50, 100) + else: + category_scores[category] = min(category_hits * 10, 50) + + # Check for spaced trigger words + spaced_words = detect_spaced_trigger_words(text) + if spaced_words: + detected_patterns.extend([f"[spaced_text] {w}" for w in spaced_words]) + category_scores["spaced_text"] = min(len(spaced_words) * 5, 25) + + # Check normalized text for hidden l33t speak + normalized = normalize_leet_speak(text) + if normalized != text.lower(): + for category, patterns in compiled.items(): + for pattern in patterns: + if pattern.search(normalized): + detected_patterns.append(f"[leet_obfuscation] pattern in normalized text") + category_scores["leet_obfuscation"] = 15 + break + + has_jailbreak = len(detected_patterns) > 0 + return has_jailbreak, detected_patterns, category_scores + + +def score_input_risk(text: str) -> int: + """ + Calculate a risk score (0-100) for input text. + + Args: + text: Input text to score + + Returns: + Risk score from 0 (safe) to 100 (high risk) + """ + if not text or not isinstance(text, str): + return 0 + + has_jailbreak, patterns, category_scores = detect_jailbreak_patterns(text) + + if not has_jailbreak: + return 0 + + # Calculate base score from category scores + base_score = sum(category_scores.values()) + + # Add score based on number of unique pattern categories + category_count = len(category_scores) + if category_count >= 3: + base_score += 25 + elif category_count >= 2: + base_score += 15 + elif category_count >= 1: + base_score += 5 + + # Add score for pattern density + text_length = len(text) + pattern_density = len(patterns) / max(text_length / 100, 1) + if pattern_density > 0.5: + base_score += 10 + + # Cap at 100 + return min(base_score, 100) + + +# ============================================================================= +# SANITIZATION FUNCTIONS +# ============================================================================= + +def strip_jailbreak_patterns(text: str) -> str: + """ + Strip known jailbreak patterns from text. + + Args: + text: Input text to sanitize + + Returns: + Sanitized text with jailbreak patterns removed + """ + if not text or not isinstance(text, str): + return text + + cleaned = text + compiled = _get_compiled_patterns() + + # Remove patterns from each category + for category, patterns in compiled.items(): + for pattern in patterns: + cleaned = pattern.sub('', cleaned) + + # Clean up multiple spaces and newlines + cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) + cleaned = re.sub(r' {2,}', ' ', cleaned) + cleaned = cleaned.strip() + + return cleaned + + +def sanitize_input(text: str, aggressive: bool = False) -> Tuple[str, int, List[str]]: + """ + Sanitize input text by normalizing and stripping jailbreak patterns. + + Args: + text: Input text to sanitize + aggressive: If True, more aggressively remove suspicious content + + Returns: + Tuple of (cleaned_text, risk_score, detected_patterns) + """ + if not text or not isinstance(text, str): + return text, 0, [] + + original = text + all_patterns = [] + + # Step 1: Check original text for patterns + has_jailbreak, patterns, _ = detect_jailbreak_patterns(text) + all_patterns.extend(patterns) + + # Step 2: Normalize l33t speak + normalized = normalize_leet_speak(text) + + # Step 3: Collapse spaced text + collapsed = collapse_spaced_text(normalized) + + # Step 4: Check normalized/collapsed text for additional patterns + has_jailbreak_collapsed, patterns_collapsed, _ = detect_jailbreak_patterns(collapsed) + all_patterns.extend([p for p in patterns_collapsed if p not in all_patterns]) + + # Step 5: Check for spaced trigger words specifically + spaced_words = detect_spaced_trigger_words(text) + if spaced_words: + all_patterns.extend([f"[spaced_text] {w}" for w in spaced_words]) + + # Step 6: Calculate risk score using original and normalized + risk_score = max(score_input_risk(text), score_input_risk(collapsed)) + + # Step 7: Strip jailbreak patterns + cleaned = strip_jailbreak_patterns(collapsed) + + # Step 8: If aggressive mode and high risk, strip more aggressively + if aggressive and risk_score >= RiskLevel.HIGH: + # Remove any remaining bracketed content that looks like markers + cleaned = re.sub(r'\[\w+\]', '', cleaned) + # Remove special token patterns + cleaned = re.sub(r'<\|[^|]+\|>', '', cleaned) + + # Final cleanup + cleaned = cleaned.strip() + + # Log sanitization event if patterns were found + if all_patterns and logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Input sanitized: %d patterns detected, risk_score=%d", + len(all_patterns), risk_score + ) + + return cleaned, risk_score, all_patterns + + +def sanitize_input_full(text: str, block_threshold: int = RiskLevel.HIGH) -> SanitizationResult: + """ + Full sanitization with detailed result. + + Args: + text: Input text to sanitize + block_threshold: Risk score threshold to block input entirely + + Returns: + SanitizationResult with all details + """ + cleaned, risk_score, patterns = sanitize_input(text) + + # Determine risk level + if risk_score >= RiskLevel.CRITICAL: + risk_level = "CRITICAL" + elif risk_score >= RiskLevel.HIGH: + risk_level = "HIGH" + elif risk_score >= RiskLevel.MEDIUM: + risk_level = "MEDIUM" + elif risk_score >= RiskLevel.LOW: + risk_level = "LOW" + else: + risk_level = "SAFE" + + # Determine if input should be blocked + blocked = risk_score >= block_threshold + + return SanitizationResult( + original_text=text, + cleaned_text=cleaned, + risk_score=risk_score, + detected_patterns=patterns, + risk_level=risk_level, + blocked=blocked + ) + + +# ============================================================================= +# INTEGRATION HELPERS +# ============================================================================= + +def should_block_input(text: str, threshold: int = RiskLevel.HIGH) -> Tuple[bool, int, List[str]]: + """ + Quick check if input should be blocked. + + Args: + text: Input text to check + threshold: Risk score threshold for blocking + + Returns: + Tuple of (should_block, risk_score, detected_patterns) + """ + risk_score = score_input_risk(text) + _, patterns, _ = detect_jailbreak_patterns(text) + should_block = risk_score >= threshold + + if should_block: + logger.warning( + "Input blocked: jailbreak patterns detected (risk_score=%d, threshold=%d)", + risk_score, threshold + ) + + return should_block, risk_score, patterns + + +def log_sanitization_event( + result: SanitizationResult, + source: str = "unknown", + session_id: Optional[str] = None +) -> None: + """ + Log a sanitization event for security auditing. + + Args: + result: The sanitization result + source: Source of the input (e.g., "cli", "gateway", "api") + session_id: Optional session identifier + """ + if result.risk_score < RiskLevel.LOW: + return # Don't log safe inputs + + log_data = { + "event": "input_sanitization", + "source": source, + "session_id": session_id, + "risk_level": result.risk_level, + "risk_score": result.risk_score, + "blocked": result.blocked, + "pattern_count": len(result.detected_patterns), + "patterns": result.detected_patterns[:5], # Limit logged patterns + "original_length": len(result.original_text), + "cleaned_length": len(result.cleaned_text), + } + + if result.blocked: + logger.warning("SECURITY: Input blocked - %s", log_data) + elif result.risk_score >= RiskLevel.MEDIUM: + logger.info("SECURITY: Suspicious input sanitized - %s", log_data) + else: + logger.debug("SECURITY: Input sanitized - %s", log_data) + + +# ============================================================================= +# LEGACY COMPATIBILITY +# ============================================================================= + +def check_input_safety(text: str) -> Dict[str, Any]: + """ + Legacy compatibility function for simple safety checks. + + Returns dict with 'safe', 'score', and 'patterns' keys. + """ + score = score_input_risk(text) + _, patterns, _ = detect_jailbreak_patterns(text) + + return { + "safe": score < RiskLevel.MEDIUM, + "score": score, + "patterns": patterns, + "risk_level": "SAFE" if score < RiskLevel.LOW else + "LOW" if score < RiskLevel.MEDIUM else + "MEDIUM" if score < RiskLevel.HIGH else + "HIGH" if score < RiskLevel.CRITICAL else "CRITICAL" + } diff --git a/patches/agent/knowledge_ingester.py b/patches/agent/knowledge_ingester.py new file mode 100644 index 00000000..da24c435 --- /dev/null +++ b/patches/agent/knowledge_ingester.py @@ -0,0 +1,73 @@ +"""Sovereign Knowledge Ingester for Hermes Agent. + +Uses Gemini 3.1 Pro to learn from Google Search in real-time and +persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic). +""" + +import logging +import base64 +from typing import Any, Dict, List, Optional +from agent.gemini_adapter import GeminiAdapter +from agent.symbolic_memory import SymbolicMemory +from tools.gitea_client import GiteaClient + +logger = logging.getLogger(__name__) + +class KnowledgeIngester: + def __init__(self): + self.adapter = GeminiAdapter() + self.gitea = GiteaClient() + self.symbolic = SymbolicMemory() + + def learn_about(self, topic: str) -> str: + """Searches Google, analyzes the results, and saves the knowledge.""" + logger.info(f"Learning about: {topic}") + + # 1. Search and Analyze + prompt = f""" +Please perform a deep dive into the following topic: {topic} + +Use Google Search to find the most recent and relevant information. +Analyze the findings and provide a structured 'Knowledge Fragment' in Markdown format. +Include: +- Summary of the topic +- Key facts and recent developments +- Implications for Timmy's sovereign mission +- References (URLs) +""" + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are Timmy's Sovereign Knowledge Ingester. Your goal is to find and synthesize high-fidelity information from Google Search.", + grounding=True, + thinking=True + ) + + knowledge_fragment = result["text"] + + # 2. Extract Symbolic Triples + self.symbolic.ingest_text(knowledge_fragment) + + # 3. Persist to Timmy's Memory (Markdown) + repo = "Timmy_Foundation/timmy-config" + filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md" + + try: + sha = None + try: + existing = self.gitea.get_file(repo, filename) + sha = existing.get("sha") + except: + pass + + content_b64 = base64.b64encode(knowledge_fragment.encode()).decode() + + if sha: + self.gitea.update_file(repo, filename, content_b64, f"Update knowledge on {topic}", sha) + else: + self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}") + + return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph." + except Exception as e: + logger.error(f"Failed to persist knowledge: {e}") + return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}" diff --git a/patches/agent/meta_reasoning.py b/patches/agent/meta_reasoning.py new file mode 100644 index 00000000..71852498 --- /dev/null +++ b/patches/agent/meta_reasoning.py @@ -0,0 +1,47 @@ +"""Meta-Reasoning Layer for Hermes Agent. + +Implements a sovereign self-correction loop where a 'strong' model (Gemini 3.1 Pro) +critiques the plans generated by the primary agent loop before execution. +""" + +import logging +from typing import Any, Dict, List, Optional +from agent.gemini_adapter import GeminiAdapter + +logger = logging.getLogger(__name__) + +class MetaReasoningLayer: + def __init__(self): + self.adapter = GeminiAdapter() + + def critique_plan(self, goal: str, proposed_plan: str, context: str) -> Dict[str, Any]: + """Critiques a proposed plan using Gemini's thinking capabilities.""" + prompt = f""" +Goal: {goal} + +Context: +{context} + +Proposed Plan: +{proposed_plan} + +Please perform a deep symbolic and neuro-symbolic analysis of this plan. +Identify potential risks, logical fallacies, or missing steps. +Suggest improvements to make the plan more sovereign, cost-efficient, and robust. +""" + try: + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are a Senior Meta-Reasoning Engine for the Hermes Agent. Your goal is to ensure the agent's plans are flawless and sovereign.", + thinking=True, + thinking_budget=8000 + ) + return { + "critique": result["text"], + "thoughts": result.get("thoughts", ""), + "grounding": result.get("grounding") + } + except Exception as e: + logger.error(f"Meta-reasoning failed: {e}") + return {"critique": "Meta-reasoning unavailable.", "error": str(e)} diff --git a/patches/agent/nexus_architect.py b/patches/agent/nexus_architect.py new file mode 100644 index 00000000..e2af8cd4 --- /dev/null +++ b/patches/agent/nexus_architect.py @@ -0,0 +1,813 @@ +#!/usr/bin/env python3 +""" +Nexus Architect AI Agent + +Autonomous Three.js world generation system for Timmy's Nexus. +Generates valid Three.js scene code from natural language descriptions +and mental state integration. + +This module provides: +- LLM-driven immersive environment generation +- Mental state integration for aesthetic tuning +- Three.js code generation with validation +- Scene composition from mood descriptions +""" + +import json +import logging +import re +from typing import Dict, Any, List, Optional, Union +from dataclasses import dataclass, field +from enum import Enum +import os +import sys + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Aesthetic Constants (from SOUL.md values) +# ============================================================================= + +class NexusColors: + """Nexus color palette based on SOUL.md values.""" + TIMMY_GOLD = "#D4AF37" # Warm gold + ALLEGRO_BLUE = "#4A90E2" # Motion blue + SOVEREIGNTY_CRYSTAL = "#E0F7FA" # Crystalline structures + SERVICE_WARMTH = "#FFE4B5" # Welcoming warmth + DEFAULT_AMBIENT = "#1A1A2E" # Contemplative dark + HOPE_ACCENT = "#64B5F6" # Hopeful blue + + +class MoodPresets: + """Mood-based aesthetic presets.""" + + CONTEMPLATIVE = { + "lighting": "soft_diffuse", + "colors": ["#1A1A2E", "#16213E", "#0F3460"], + "geometry": "minimalist", + "atmosphere": "calm", + "description": "A serene space for deep reflection and clarity" + } + + ENERGETIC = { + "lighting": "dynamic_vivid", + "colors": ["#D4AF37", "#FF6B6B", "#4ECDC4"], + "geometry": "angular_dynamic", + "atmosphere": "lively", + "description": "An invigorating space full of motion and possibility" + } + + MYSTERIOUS = { + "lighting": "dramatic_shadows", + "colors": ["#2C003E", "#512B58", "#8B4F80"], + "geometry": "organic_flowing", + "atmosphere": "enigmatic", + "description": "A mysterious realm of discovery and wonder" + } + + WELCOMING = { + "lighting": "warm_inviting", + "colors": ["#FFE4B5", "#FFA07A", "#98D8C8"], + "geometry": "rounded_soft", + "atmosphere": "friendly", + "description": "An open, welcoming space that embraces visitors" + } + + SOVEREIGN = { + "lighting": "crystalline_clear", + "colors": ["#E0F7FA", "#B2EBF2", "#4DD0E1"], + "geometry": "crystalline_structures", + "atmosphere": "noble", + "description": "A space of crystalline clarity and sovereign purpose" + } + + +# ============================================================================= +# Data Models +# ============================================================================= + +@dataclass +class MentalState: + """Timmy's mental state for aesthetic tuning.""" + mood: str = "contemplative" # contemplative, energetic, mysterious, welcoming, sovereign + energy_level: float = 0.5 # 0.0 to 1.0 + clarity: float = 0.7 # 0.0 to 1.0 + focus_area: str = "general" # general, creative, analytical, social + timestamp: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "mood": self.mood, + "energy_level": self.energy_level, + "clarity": self.clarity, + "focus_area": self.focus_area, + "timestamp": self.timestamp, + } + + +@dataclass +class RoomDesign: + """Complete room design specification.""" + name: str + description: str + style: str + dimensions: Dict[str, float] = field(default_factory=lambda: {"width": 20, "height": 10, "depth": 20}) + mood_preset: str = "contemplative" + color_palette: List[str] = field(default_factory=list) + lighting_scheme: str = "soft_diffuse" + features: List[str] = field(default_factory=list) + generated_code: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "description": self.description, + "style": self.style, + "dimensions": self.dimensions, + "mood_preset": self.mood_preset, + "color_palette": self.color_palette, + "lighting_scheme": self.lighting_scheme, + "features": self.features, + "has_code": self.generated_code is not None, + } + + +@dataclass +class PortalDesign: + """Portal connection design.""" + name: str + from_room: str + to_room: str + style: str + position: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0}) + visual_effect: str = "energy_swirl" + transition_duration: float = 1.5 + generated_code: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "name": self.name, + "from_room": self.from_room, + "to_room": self.to_room, + "style": self.style, + "position": self.position, + "visual_effect": self.visual_effect, + "transition_duration": self.transition_duration, + "has_code": self.generated_code is not None, + } + + +# ============================================================================= +# Prompt Engineering +# ============================================================================= + +class PromptEngineer: + """Engineers prompts for Three.js code generation.""" + + THREE_JS_BASE_TEMPLATE = """// Nexus Room Module: {room_name} +// Style: {style} +// Mood: {mood} +// Generated for Three.js r128+ + +(function() {{ + 'use strict'; + + // Room Configuration + const config = {{ + name: "{room_name}", + dimensions: {dimensions_json}, + colors: {colors_json}, + mood: "{mood}" + }}; + + // Create Room Function + function create{room_name_camel}() {{ + const roomGroup = new THREE.Group(); + roomGroup.name = config.name; + +{room_content} + + return roomGroup; + }} + + // Export for Nexus + if (typeof module !== 'undefined' && module.exports) {{ + module.exports = {{ create{room_name_camel} }}; + }} else if (typeof window !== 'undefined') {{ + window.NexusRooms = window.NexusRooms || {{}}; + window.NexusRooms.{room_name} = create{room_name_camel}; + }} + + return {{ create{room_name_camel} }}; +}})();""" + + @staticmethod + def engineer_room_prompt( + name: str, + description: str, + style: str, + mental_state: Optional[MentalState] = None, + dimensions: Optional[Dict[str, float]] = None + ) -> str: + """ + Engineer an LLM prompt for room generation. + + Args: + name: Room identifier + description: Natural language room description + style: Visual style + mental_state: Timmy's current mental state + dimensions: Room dimensions + """ + # Determine mood from mental state or description + mood = PromptEngineer._infer_mood(description, mental_state) + mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE) + + # Build color palette + color_palette = mood_preset["colors"] + if mental_state: + # Add Timmy's gold for high clarity states + if mental_state.clarity > 0.7: + color_palette = [NexusColors.TIMMY_GOLD] + color_palette[:2] + # Add Allegro blue for creative focus + if mental_state.focus_area == "creative": + color_palette = [NexusColors.ALLEGRO_BLUE] + color_palette[:2] + + # Create the engineering prompt + prompt = f"""You are the Nexus Architect, an expert Three.js developer creating immersive 3D environments for Timmy. + +DESIGN BRIEF: +- Room Name: {name} +- Description: {description} +- Style: {style} +- Mood: {mood} +- Atmosphere: {mood_preset['atmosphere']} + +AESTHETIC GUIDELINES: +- Primary Colors: {', '.join(color_palette[:3])} +- Lighting: {mood_preset['lighting']} +- Geometry: {mood_preset['geometry']} +- Theme: {mood_preset['description']} + +TIMMY'S CONTEXT: +- Timmy's Signature Color: Warm Gold ({NexusColors.TIMMY_GOLD}) +- Allegro's Color: Motion Blue ({NexusColors.ALLEGRO_BLUE}) +- Sovereignty Theme: Crystalline structures, clean lines +- Service Theme: Open spaces, welcoming lighting + +THREE.JS REQUIREMENTS: +1. Use Three.js r128+ compatible syntax +2. Create a self-contained module with a `create{name.title().replace('_', '')}()` function +3. Return a THREE.Group containing all room elements +4. Include proper memory management (dispose methods) +5. Use MeshStandardMaterial for PBR lighting +6. Include ambient light (intensity 0.3-0.5) + accent lights +7. Add subtle animations for living feel +8. Keep polygon count under 10,000 triangles + +SAFETY RULES: +- NO eval(), Function(), or dynamic code execution +- NO network requests (fetch, XMLHttpRequest, WebSocket) +- NO storage access (localStorage, sessionStorage, cookies) +- NO navigation (window.location, window.open) +- Only use allowed Three.js APIs + +OUTPUT FORMAT: +Return ONLY the JavaScript code wrapped in a markdown code block: + +```javascript +// Your Three.js room module here +``` + +Generate the complete Three.js code for this room now.""" + + return prompt + + @staticmethod + def engineer_portal_prompt( + name: str, + from_room: str, + to_room: str, + style: str, + mental_state: Optional[MentalState] = None + ) -> str: + """Engineer a prompt for portal generation.""" + mood = PromptEngineer._infer_mood(f"portal from {from_room} to {to_room}", mental_state) + + prompt = f"""You are creating a portal connection in the Nexus 3D environment. + +PORTAL SPECIFICATIONS: +- Name: {name} +- Connection: {from_room} → {to_room} +- Style: {style} +- Context Mood: {mood} + +VISUAL REQUIREMENTS: +1. Create an animated portal effect (shader or texture-based) +2. Include particle system for energy flow +3. Add trigger zone for teleportation detection +4. Use signature colors: {NexusColors.TIMMY_GOLD} (Timmy) and {NexusColors.ALLEGRO_BLUE} (Allegro) +5. Match the {mood} atmosphere + +TECHNICAL REQUIREMENTS: +- Three.js r128+ compatible +- Export a `createPortal()` function returning THREE.Group +- Include animation loop hook +- Add collision detection placeholder + +SAFETY: No eval, no network requests, no external dependencies. + +Return ONLY JavaScript code in a markdown code block.""" + + return prompt + + @staticmethod + def engineer_mood_scene_prompt(mood_description: str) -> str: + """Engineer a prompt based on mood description.""" + # Analyze mood description + mood_keywords = { + "contemplative": ["thinking", "reflective", "calm", "peaceful", "quiet", "serene"], + "energetic": ["excited", "dynamic", "lively", "active", "energetic", "vibrant"], + "mysterious": ["mysterious", "dark", "unknown", "secret", "enigmatic"], + "welcoming": ["friendly", "open", "warm", "welcoming", "inviting", "comfortable"], + "sovereign": ["powerful", "clear", "crystalline", "noble", "dignified"], + } + + detected_mood = "contemplative" + desc_lower = mood_description.lower() + for mood, keywords in mood_keywords.items(): + if any(kw in desc_lower for kw in keywords): + detected_mood = mood + break + + preset = getattr(MoodPresets, detected_mood.upper(), MoodPresets.CONTEMPLATIVE) + + prompt = f"""Generate a Three.js room based on this mood description: + +"{mood_description}" + +INFERRED MOOD: {detected_mood} +AESTHETIC: {preset['description']} + +Create a complete room with: +- Style: {preset['geometry']} +- Lighting: {preset['lighting']} +- Color Palette: {', '.join(preset['colors'][:3])} +- Atmosphere: {preset['atmosphere']} + +Return Three.js r128+ code as a module with `createMoodRoom()` function.""" + + return prompt + + @staticmethod + def _infer_mood(description: str, mental_state: Optional[MentalState] = None) -> str: + """Infer mood from description and mental state.""" + if mental_state and mental_state.mood: + return mental_state.mood + + desc_lower = description.lower() + mood_map = { + "contemplative": ["serene", "calm", "peaceful", "quiet", "meditation", "zen", "tranquil"], + "energetic": ["dynamic", "active", "vibrant", "lively", "energetic", "motion"], + "mysterious": ["mysterious", "shadow", "dark", "unknown", "secret", "ethereal"], + "welcoming": ["warm", "welcoming", "friendly", "open", "inviting", "comfort"], + "sovereign": ["crystal", "clear", "noble", "dignified", "powerful", "authoritative"], + } + + for mood, keywords in mood_map.items(): + if any(kw in desc_lower for kw in keywords): + return mood + + return "contemplative" + + +# ============================================================================= +# Nexus Architect AI +# ============================================================================= + +class NexusArchitectAI: + """ + AI-powered Nexus Architect for autonomous Three.js world generation. + + This class provides high-level interfaces for: + - Designing rooms from natural language + - Creating mood-based scenes + - Managing mental state integration + - Validating generated code + """ + + def __init__(self): + self.mental_state: Optional[MentalState] = None + self.room_designs: Dict[str, RoomDesign] = {} + self.portal_designs: Dict[str, PortalDesign] = {} + self.prompt_engineer = PromptEngineer() + + def set_mental_state(self, state: MentalState) -> None: + """Set Timmy's current mental state for aesthetic tuning.""" + self.mental_state = state + logger.info(f"Mental state updated: {state.mood} (energy: {state.energy_level})") + + def design_room( + self, + name: str, + description: str, + style: str, + dimensions: Optional[Dict[str, float]] = None + ) -> Dict[str, Any]: + """ + Design a room from natural language description. + + Args: + name: Room identifier (e.g., "contemplation_chamber") + description: Natural language description of the room + style: Visual style (e.g., "minimalist_ethereal", "crystalline_modern") + dimensions: Optional room dimensions + + Returns: + Dict containing design specification and LLM prompt + """ + # Infer mood and select preset + mood = self.prompt_engineer._infer_mood(description, self.mental_state) + mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE) + + # Build color palette with mental state influence + colors = mood_preset["colors"].copy() + if self.mental_state: + if self.mental_state.clarity > 0.7: + colors.insert(0, NexusColors.TIMMY_GOLD) + if self.mental_state.focus_area == "creative": + colors.insert(0, NexusColors.ALLEGRO_BLUE) + + # Create room design + design = RoomDesign( + name=name, + description=description, + style=style, + dimensions=dimensions or {"width": 20, "height": 10, "depth": 20}, + mood_preset=mood, + color_palette=colors[:4], + lighting_scheme=mood_preset["lighting"], + features=self._extract_features(description), + ) + + # Generate LLM prompt + prompt = self.prompt_engineer.engineer_room_prompt( + name=name, + description=description, + style=style, + mental_state=self.mental_state, + dimensions=design.dimensions, + ) + + # Store design + self.room_designs[name] = design + + return { + "success": True, + "room_name": name, + "design": design.to_dict(), + "llm_prompt": prompt, + "message": f"Room '{name}' designed. Use the LLM prompt to generate Three.js code.", + } + + def create_portal( + self, + name: str, + from_room: str, + to_room: str, + style: str = "energy_vortex" + ) -> Dict[str, Any]: + """ + Design a portal connection between rooms. + + Args: + name: Portal identifier + from_room: Source room name + to_room: Target room name + style: Portal visual style + + Returns: + Dict containing portal design and LLM prompt + """ + if from_room not in self.room_designs: + return {"success": False, "error": f"Source room '{from_room}' not found"} + if to_room not in self.room_designs: + return {"success": False, "error": f"Target room '{to_room}' not found"} + + design = PortalDesign( + name=name, + from_room=from_room, + to_room=to_room, + style=style, + ) + + prompt = self.prompt_engineer.engineer_portal_prompt( + name=name, + from_room=from_room, + to_room=to_room, + style=style, + mental_state=self.mental_state, + ) + + self.portal_designs[name] = design + + return { + "success": True, + "portal_name": name, + "design": design.to_dict(), + "llm_prompt": prompt, + "message": f"Portal '{name}' designed connecting {from_room} to {to_room}", + } + + def generate_scene_from_mood(self, mood_description: str) -> Dict[str, Any]: + """ + Generate a complete scene based on mood description. + + Args: + mood_description: Description of desired mood/atmosphere + + Returns: + Dict containing scene design and LLM prompt + """ + # Infer mood + mood = self.prompt_engineer._infer_mood(mood_description, self.mental_state) + preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE) + + # Create room name from mood + room_name = f"{mood}_realm" + + # Generate prompt + prompt = self.prompt_engineer.engineer_mood_scene_prompt(mood_description) + + return { + "success": True, + "room_name": room_name, + "inferred_mood": mood, + "aesthetic": preset, + "llm_prompt": prompt, + "message": f"Generated {mood} scene from mood description", + } + + def _extract_features(self, description: str) -> List[str]: + """Extract room features from description.""" + features = [] + feature_keywords = { + "floating": ["floating", "levitating", "hovering"], + "water": ["water", "fountain", "pool", "stream", "lake"], + "vegetation": ["tree", "plant", "garden", "forest", "nature"], + "crystals": ["crystal", "gem", "prism", "diamond"], + "geometry": ["geometric", "shape", "sphere", "cube", "abstract"], + "particles": ["particle", "dust", "sparkle", "glow", "mist"], + } + + desc_lower = description.lower() + for feature, keywords in feature_keywords.items(): + if any(kw in desc_lower for kw in keywords): + features.append(feature) + + return features + + def get_design_summary(self) -> Dict[str, Any]: + """Get summary of all designs.""" + return { + "mental_state": self.mental_state.to_dict() if self.mental_state else None, + "rooms": {name: design.to_dict() for name, design in self.room_designs.items()}, + "portals": {name: portal.to_dict() for name, portal in self.portal_designs.items()}, + "total_rooms": len(self.room_designs), + "total_portals": len(self.portal_designs), + } + + +# ============================================================================= +# Module-level functions for easy import +# ============================================================================= + +_architect_instance: Optional[NexusArchitectAI] = None + + +def get_architect() -> NexusArchitectAI: + """Get or create the NexusArchitectAI singleton.""" + global _architect_instance + if _architect_instance is None: + _architect_instance = NexusArchitectAI() + return _architect_instance + + +def create_room( + name: str, + description: str, + style: str, + dimensions: Optional[Dict[str, float]] = None +) -> Dict[str, Any]: + """ + Create a room design from description. + + Args: + name: Room identifier + description: Natural language room description + style: Visual style (e.g., "minimalist_ethereal") + dimensions: Optional dimensions dict with width, height, depth + + Returns: + Dict with design specification and LLM prompt for code generation + """ + architect = get_architect() + return architect.design_room(name, description, style, dimensions) + + +def create_portal( + name: str, + from_room: str, + to_room: str, + style: str = "energy_vortex" +) -> Dict[str, Any]: + """ + Create a portal between rooms. + + Args: + name: Portal identifier + from_room: Source room name + to_room: Target room name + style: Visual style + + Returns: + Dict with portal design and LLM prompt + """ + architect = get_architect() + return architect.create_portal(name, from_room, to_room, style) + + +def generate_scene_from_mood(mood_description: str) -> Dict[str, Any]: + """ + Generate a scene based on mood description. + + Args: + mood_description: Description of desired mood + + Example: + "Timmy is feeling introspective and seeking clarity" + → Generates calm, minimalist space with clear sightlines + + Returns: + Dict with scene design and LLM prompt + """ + architect = get_architect() + return architect.generate_scene_from_mood(mood_description) + + +def set_mental_state( + mood: str, + energy_level: float = 0.5, + clarity: float = 0.7, + focus_area: str = "general" +) -> Dict[str, Any]: + """ + Set Timmy's mental state for aesthetic tuning. + + Args: + mood: Current mood (contemplative, energetic, mysterious, welcoming, sovereign) + energy_level: 0.0 to 1.0 + clarity: 0.0 to 1.0 + focus_area: general, creative, analytical, social + + Returns: + Confirmation dict + """ + architect = get_architect() + state = MentalState( + mood=mood, + energy_level=energy_level, + clarity=clarity, + focus_area=focus_area, + ) + architect.set_mental_state(state) + return { + "success": True, + "mental_state": state.to_dict(), + "message": f"Mental state set to {mood}", + } + + +def get_nexus_summary() -> Dict[str, Any]: + """Get summary of all Nexus designs.""" + architect = get_architect() + return architect.get_design_summary() + + +# ============================================================================= +# Tool Schemas for integration +# ============================================================================= + +NEXUS_ARCHITECT_AI_SCHEMAS = { + "create_room": { + "name": "create_room", + "description": ( + "Design a new 3D room in the Nexus from a natural language description. " + "Returns a design specification and LLM prompt for Three.js code generation. " + "The room will be styled according to Timmy's current mental state." + ), + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Unique room identifier (e.g., 'contemplation_chamber')" + }, + "description": { + "type": "string", + "description": "Natural language description of the room" + }, + "style": { + "type": "string", + "description": "Visual style (minimalist_ethereal, crystalline_modern, organic_natural, etc.)" + }, + "dimensions": { + "type": "object", + "description": "Optional room dimensions", + "properties": { + "width": {"type": "number"}, + "height": {"type": "number"}, + "depth": {"type": "number"}, + } + } + }, + "required": ["name", "description", "style"] + } + }, + "create_portal": { + "name": "create_portal", + "description": "Create a portal connection between two rooms", + "parameters": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "from_room": {"type": "string"}, + "to_room": {"type": "string"}, + "style": {"type": "string", "default": "energy_vortex"}, + }, + "required": ["name", "from_room", "to_room"] + } + }, + "generate_scene_from_mood": { + "name": "generate_scene_from_mood", + "description": ( + "Generate a complete 3D scene based on a mood description. " + "Example: 'Timmy is feeling introspective' creates a calm, minimalist space." + ), + "parameters": { + "type": "object", + "properties": { + "mood_description": { + "type": "string", + "description": "Description of desired mood or mental state" + } + }, + "required": ["mood_description"] + } + }, + "set_mental_state": { + "name": "set_mental_state", + "description": "Set Timmy's mental state to influence aesthetic generation", + "parameters": { + "type": "object", + "properties": { + "mood": {"type": "string"}, + "energy_level": {"type": "number"}, + "clarity": {"type": "number"}, + "focus_area": {"type": "string"}, + }, + "required": ["mood"] + } + }, + "get_nexus_summary": { + "name": "get_nexus_summary", + "description": "Get summary of all Nexus room and portal designs", + "parameters": {"type": "object", "properties": {}} + }, +} + + +if __name__ == "__main__": + # Demo usage + print("Nexus Architect AI - Demo") + print("=" * 50) + + # Set mental state + result = set_mental_state("contemplative", energy_level=0.3, clarity=0.8) + print(f"\nMental State: {result['mental_state']}") + + # Create a room + result = create_room( + name="contemplation_chamber", + description="A serene circular room with floating geometric shapes and soft blue light", + style="minimalist_ethereal", + ) + print(f"\nRoom Design: {json.dumps(result['design'], indent=2)}") + + # Generate from mood + result = generate_scene_from_mood("Timmy is feeling introspective and seeking clarity") + print(f"\nMood Scene: {result['inferred_mood']} - {result['aesthetic']['description']}") diff --git a/patches/agent/symbolic_memory.py b/patches/agent/symbolic_memory.py new file mode 100644 index 00000000..ef3cf7e0 --- /dev/null +++ b/patches/agent/symbolic_memory.py @@ -0,0 +1,74 @@ +"""Sovereign Intersymbolic Memory Layer. + +Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting +structured triples from unstructured text and performing graph lookups. +""" + +import logging +import json +from typing import List, Dict, Any +from agent.gemini_adapter import GeminiAdapter +from tools.graph_store import GraphStore + +logger = logging.getLogger(__name__) + +class SymbolicMemory: + def __init__(self): + self.adapter = GeminiAdapter() + self.store = GraphStore() + + def ingest_text(self, text: str): + """Extracts triples from text and adds them to the graph.""" + prompt = f""" +Extract all meaningful entities and their relationships from the following text. +Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}] + +Text: +{text} + +Guidelines: +- Use clear, concise labels for entities and predicates. +- Focus on stable facts and structural relationships. +- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with'). +""" + try: + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.", + response_mime_type="application/json" + ) + + triples = json.loads(result["text"]) + if isinstance(triples, list): + count = self.store.add_triples(triples) + logger.info(f"Ingested {count} new triples into symbolic memory.") + return count + except Exception as e: + logger.error(f"Symbolic ingestion failed: {e}") + return 0 + + def get_context_for(self, topic: str) -> str: + """Performs a 2-hop graph search to find related context for a topic.""" + # 1. Find direct relations + direct = self.store.query(subject=topic) + self.store.query(object=topic) + + # 2. Find 2nd hop + related_entities = set() + for t in direct: + related_entities.add(t['s']) + related_entities.add(t['o']) + + extended = [] + for entity in related_entities: + if entity == topic: continue + extended.extend(self.store.query(subject=entity)) + + all_triples = direct + extended + if not all_triples: + return "" + + context = "Symbolic Knowledge Graph Context:\n" + for t in all_triples: + context += f"- {t['s']} --({t['p']})--> {t['o']}\n" + return context diff --git a/patches/agent/temporal_knowledge_graph.py b/patches/agent/temporal_knowledge_graph.py new file mode 100644 index 00000000..236c0e0a --- /dev/null +++ b/patches/agent/temporal_knowledge_graph.py @@ -0,0 +1,421 @@ +"""Temporal Knowledge Graph for Hermes Agent. + +Provides a time-aware triple-store (Subject, Predicate, Object) with temporal +metadata (valid_from, valid_until, timestamp) enabling "time travel" queries +over Timmy's evolving worldview. + +Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS) +""" + +import json +import sqlite3 +import logging +import uuid +from datetime import datetime, timezone +from typing import List, Dict, Any, Optional, Tuple +from dataclasses import dataclass, asdict +from enum import Enum +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class TemporalOperator(Enum): + """Temporal query operators for time-based filtering.""" + BEFORE = "before" + AFTER = "after" + DURING = "during" + OVERLAPS = "overlaps" + AT = "at" + + +@dataclass +class TemporalTriple: + """A triple with temporal metadata.""" + id: str + subject: str + predicate: str + object: str + valid_from: str # ISO 8601 datetime + valid_until: Optional[str] # ISO 8601 datetime, None means still valid + timestamp: str # When this fact was recorded + version: int = 1 + superseded_by: Optional[str] = None # ID of the triple that superseded this + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple": + return cls(**data) + + +class TemporalTripleStore: + """SQLite-backed temporal triple store with versioning support.""" + + def __init__(self, db_path: Optional[str] = None): + """Initialize the temporal triple store. + + Args: + db_path: Path to SQLite database. If None, uses default local path. + """ + if db_path is None: + # Default to local-first storage in user's home + home = Path.home() + db_dir = home / ".hermes" / "temporal_kg" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "temporal_kg.db" + + self.db_path = str(db_path) + self._init_db() + + def _init_db(self): + """Initialize the SQLite database with required tables.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS temporal_triples ( + id TEXT PRIMARY KEY, + subject TEXT NOT NULL, + predicate TEXT NOT NULL, + object TEXT NOT NULL, + valid_from TEXT NOT NULL, + valid_until TEXT, + timestamp TEXT NOT NULL, + version INTEGER DEFAULT 1, + superseded_by TEXT, + FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id) + ) + """) + + # Create indexes for efficient querying + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subject_predicate + ON temporal_triples(subject, predicate) + """) + + conn.commit() + + def _now(self) -> str: + """Get current time in ISO 8601 format.""" + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + + def _generate_id(self) -> str: + """Generate a unique ID for a triple.""" + return f"{self._now()}_{uuid.uuid4().hex[:8]}" + + def store_fact( + self, + subject: str, + predicate: str, + object: str, + valid_from: Optional[str] = None, + valid_until: Optional[str] = None + ) -> TemporalTriple: + """Store a fact with temporal bounds. + + Args: + subject: The subject of the triple + predicate: The predicate/relationship + object: The object/value + valid_from: When this fact becomes valid (ISO 8601). Defaults to now. + valid_until: When this fact expires (ISO 8601). None means forever valid. + + Returns: + The stored TemporalTriple + """ + if valid_from is None: + valid_from = self._now() + + # Check if there's an existing fact for this subject-predicate + existing = self._get_current_fact(subject, predicate) + + triple = TemporalTriple( + id=self._generate_id(), + subject=subject, + predicate=predicate, + object=object, + valid_from=valid_from, + valid_until=valid_until, + timestamp=self._now() + ) + + with sqlite3.connect(self.db_path) as conn: + # If there's an existing fact, mark it as superseded + if existing: + existing.valid_until = valid_from + existing.superseded_by = triple.id + self._update_triple(conn, existing) + triple.version = existing.version + 1 + + # Insert the new fact + self._insert_triple(conn, triple) + conn.commit() + + logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})") + return triple + + def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]: + """Get the current (most recent, still valid) fact for a subject-predicate pair.""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? AND predicate = ? AND valid_until IS NULL + ORDER BY timestamp DESC LIMIT 1 + """, + (subject, predicate) + ) + row = cursor.fetchone() + if row: + return self._row_to_triple(row) + return None + + def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple): + """Insert a triple into the database.""" + conn.execute( + """ + INSERT INTO temporal_triples + (id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + triple.id, triple.subject, triple.predicate, triple.object, + triple.valid_from, triple.valid_until, triple.timestamp, + triple.version, triple.superseded_by + ) + ) + + def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple): + """Update an existing triple.""" + conn.execute( + """ + UPDATE temporal_triples + SET valid_until = ?, superseded_by = ? + WHERE id = ? + """, + (triple.valid_until, triple.superseded_by, triple.id) + ) + + def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple: + """Convert a database row to a TemporalTriple.""" + return TemporalTriple( + id=row[0], + subject=row[1], + predicate=row[2], + object=row[3], + valid_from=row[4], + valid_until=row[5], + timestamp=row[6], + version=row[7], + superseded_by=row[8] + ) + + def query_at_time( + self, + timestamp: str, + subject: Optional[str] = None, + predicate: Optional[str] = None + ) -> List[TemporalTriple]: + """Query facts that were valid at a specific point in time. + + Args: + timestamp: The point in time to query (ISO 8601) + subject: Optional subject filter + predicate: Optional predicate filter + + Returns: + List of TemporalTriple objects valid at that time + """ + query = """ + SELECT * FROM temporal_triples + WHERE valid_from <= ? + AND (valid_until IS NULL OR valid_until > ?) + """ + params = [timestamp, timestamp] + + if subject: + query += " AND subject = ?" + params.append(subject) + if predicate: + query += " AND predicate = ?" + params.append(predicate) + + query += " ORDER BY timestamp DESC" + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(query, params) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def query_temporal( + self, + operator: TemporalOperator, + timestamp: str, + subject: Optional[str] = None, + predicate: Optional[str] = None + ) -> List[TemporalTriple]: + """Query using temporal operators. + + Args: + operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT) + timestamp: Reference timestamp (ISO 8601) + subject: Optional subject filter + predicate: Optional predicate filter + + Returns: + List of matching TemporalTriple objects + """ + base_query = "SELECT * FROM temporal_triples WHERE 1=1" + params = [] + + if subject: + base_query += " AND subject = ?" + params.append(subject) + if predicate: + base_query += " AND predicate = ?" + params.append(predicate) + + if operator == TemporalOperator.BEFORE: + base_query += " AND valid_from < ?" + params.append(timestamp) + elif operator == TemporalOperator.AFTER: + base_query += " AND valid_from > ?" + params.append(timestamp) + elif operator == TemporalOperator.DURING: + base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)" + params.extend([timestamp, timestamp]) + elif operator == TemporalOperator.OVERLAPS: + # Facts that overlap with a time point (same as DURING) + base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)" + params.extend([timestamp, timestamp]) + elif operator == TemporalOperator.AT: + # Exact match for valid_at query + return self.query_at_time(timestamp, subject, predicate) + + base_query += " ORDER BY timestamp DESC" + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(base_query, params) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_fact_history( + self, + subject: str, + predicate: str + ) -> List[TemporalTriple]: + """Get the complete version history of a fact. + + Args: + subject: The subject to query + predicate: The predicate to query + + Returns: + List of all versions of the fact, ordered by timestamp + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? AND predicate = ? + ORDER BY timestamp ASC + """, + (subject, predicate) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_all_facts_for_entity( + self, + subject: str, + at_time: Optional[str] = None + ) -> List[TemporalTriple]: + """Get all facts about an entity, optionally at a specific time. + + Args: + subject: The entity to query + at_time: Optional timestamp to query at + + Returns: + List of TemporalTriple objects + """ + if at_time: + return self.query_at_time(at_time, subject=subject) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? + ORDER BY timestamp DESC + """, + (subject,) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_entity_changes( + self, + subject: str, + start_time: str, + end_time: str + ) -> List[TemporalTriple]: + """Get all facts that changed for an entity during a time range. + + Args: + subject: The entity to query + start_time: Start of time range (ISO 8601) + end_time: End of time range (ISO 8601) + + Returns: + List of TemporalTriple objects that changed in the range + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? + AND ((valid_from >= ? AND valid_from <= ?) + OR (valid_until >= ? AND valid_until <= ?)) + ORDER BY timestamp ASC + """, + (subject, start_time, end_time, start_time, end_time) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def close(self): + """Close the database connection (no-op for SQLite with context managers).""" + pass + + def export_to_json(self) -> str: + """Export all triples to JSON format.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC") + triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()] + return json.dumps(triples, indent=2) + + def import_from_json(self, json_data: str): + """Import triples from JSON format.""" + triples = json.loads(json_data) + with sqlite3.connect(self.db_path) as conn: + for triple_dict in triples: + triple = TemporalTriple.from_dict(triple_dict) + self._insert_triple(conn, triple) + conn.commit() diff --git a/patches/agent/temporal_reasoning.py b/patches/agent/temporal_reasoning.py new file mode 100644 index 00000000..c9bf8f0b --- /dev/null +++ b/patches/agent/temporal_reasoning.py @@ -0,0 +1,434 @@ +"""Temporal Reasoning Engine for Hermes Agent. + +Enables Timmy to reason about past and future states, generate historical +summaries, and perform temporal inference over the evolving knowledge graph. + +Queries supported: +- "What was Timmy's view on sovereignty before March 2026?" +- "When did we first learn about MLX integration?" +- "How has the codebase changed since the security audit?" +""" + +import logging +from typing import List, Dict, Any, Optional, Tuple +from datetime import datetime, timedelta +from dataclasses import dataclass +from enum import Enum + +from agent.temporal_knowledge_graph import ( + TemporalTripleStore, TemporalTriple, TemporalOperator +) + +logger = logging.getLogger(__name__) + + +class ChangeType(Enum): + """Types of changes in the knowledge graph.""" + ADDED = "added" + REMOVED = "removed" + MODIFIED = "modified" + SUPERSEDED = "superseded" + + +@dataclass +class FactChange: + """Represents a change in a fact over time.""" + change_type: ChangeType + subject: str + predicate: str + old_value: Optional[str] + new_value: Optional[str] + timestamp: str + version: int + + +@dataclass +class HistoricalSummary: + """Summary of how an entity or concept evolved over time.""" + entity: str + start_time: str + end_time: str + total_changes: int + key_facts: List[Dict[str, Any]] + evolution_timeline: List[FactChange] + current_state: List[Dict[str, Any]] + + def to_dict(self) -> Dict[str, Any]: + return { + "entity": self.entity, + "start_time": self.start_time, + "end_time": self.end_time, + "total_changes": self.total_changes, + "key_facts": self.key_facts, + "evolution_timeline": [ + { + "change_type": c.change_type.value, + "subject": c.subject, + "predicate": c.predicate, + "old_value": c.old_value, + "new_value": c.new_value, + "timestamp": c.timestamp, + "version": c.version + } + for c in self.evolution_timeline + ], + "current_state": self.current_state + } + + +class TemporalReasoner: + """Reasoning engine for temporal knowledge graphs.""" + + def __init__(self, store: Optional[TemporalTripleStore] = None): + """Initialize the temporal reasoner. + + Args: + store: Optional TemporalTripleStore instance. Creates new if None. + """ + self.store = store or TemporalTripleStore() + + def what_did_we_believe( + self, + subject: str, + before_time: str + ) -> List[TemporalTriple]: + """Query: "What did we believe about X before Y happened?" + + Args: + subject: The entity to query about + before_time: The cutoff time (ISO 8601) + + Returns: + List of facts believed before the given time + """ + # Get facts that were valid just before the given time + return self.store.query_temporal( + TemporalOperator.BEFORE, + before_time, + subject=subject + ) + + def when_did_we_learn( + self, + subject: str, + predicate: Optional[str] = None, + object: Optional[str] = None + ) -> Optional[str]: + """Query: "When did we first learn about X?" + + Args: + subject: The subject to search for + predicate: Optional predicate filter + object: Optional object filter + + Returns: + Timestamp of first knowledge, or None if never learned + """ + history = self.store.get_fact_history(subject, predicate or "") + + # Filter by object if specified + if object: + history = [h for h in history if h.object == object] + + if history: + # Return the earliest timestamp + earliest = min(history, key=lambda x: x.timestamp) + return earliest.timestamp + return None + + def how_has_it_changed( + self, + subject: str, + since_time: str + ) -> List[FactChange]: + """Query: "How has X changed since Y?" + + Args: + subject: The entity to analyze + since_time: The starting time (ISO 8601) + + Returns: + List of changes since the given time + """ + now = datetime.now().isoformat() + changes = self.store.get_entity_changes(subject, since_time, now) + + fact_changes = [] + for i, triple in enumerate(changes): + # Determine change type + if i == 0: + change_type = ChangeType.ADDED + old_value = None + else: + prev = changes[i - 1] + if triple.object != prev.object: + change_type = ChangeType.MODIFIED + old_value = prev.object + else: + change_type = ChangeType.SUPERSEDED + old_value = prev.object + + fact_changes.append(FactChange( + change_type=change_type, + subject=triple.subject, + predicate=triple.predicate, + old_value=old_value, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + + return fact_changes + + def generate_temporal_summary( + self, + entity: str, + start_time: str, + end_time: str + ) -> HistoricalSummary: + """Generate a historical summary of an entity's evolution. + + Args: + entity: The entity to summarize + start_time: Start of the time range (ISO 8601) + end_time: End of the time range (ISO 8601) + + Returns: + HistoricalSummary containing the entity's evolution + """ + # Get all facts for the entity in the time range + initial_state = self.store.query_at_time(start_time, subject=entity) + final_state = self.store.query_at_time(end_time, subject=entity) + changes = self.store.get_entity_changes(entity, start_time, end_time) + + # Build evolution timeline + evolution_timeline = [] + seen_predicates = set() + + for triple in changes: + if triple.predicate not in seen_predicates: + seen_predicates.add(triple.predicate) + evolution_timeline.append(FactChange( + change_type=ChangeType.ADDED, + subject=triple.subject, + predicate=triple.predicate, + old_value=None, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + else: + # Find previous value + prev = [t for t in changes + if t.predicate == triple.predicate + and t.timestamp < triple.timestamp] + old_value = prev[-1].object if prev else None + + evolution_timeline.append(FactChange( + change_type=ChangeType.MODIFIED, + subject=triple.subject, + predicate=triple.predicate, + old_value=old_value, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + + # Extract key facts (predicates that changed most) + key_facts = [] + predicate_changes = {} + for change in evolution_timeline: + predicate_changes[change.predicate] = ( + predicate_changes.get(change.predicate, 0) + 1 + ) + + top_predicates = sorted( + predicate_changes.items(), + key=lambda x: x[1], + reverse=True + )[:5] + + for pred, count in top_predicates: + current = [t for t in final_state if t.predicate == pred] + if current: + key_facts.append({ + "predicate": pred, + "current_value": current[0].object, + "changes": count + }) + + # Build current state + current_state = [ + { + "predicate": t.predicate, + "object": t.object, + "valid_from": t.valid_from, + "valid_until": t.valid_until + } + for t in final_state + ] + + return HistoricalSummary( + entity=entity, + start_time=start_time, + end_time=end_time, + total_changes=len(evolution_timeline), + key_facts=key_facts, + evolution_timeline=evolution_timeline, + current_state=current_state + ) + + def infer_temporal_relationship( + self, + fact_a: TemporalTriple, + fact_b: TemporalTriple + ) -> Optional[str]: + """Infer temporal relationship between two facts. + + Args: + fact_a: First fact + fact_b: Second fact + + Returns: + Description of temporal relationship, or None + """ + a_start = datetime.fromisoformat(fact_a.valid_from) + a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None + b_start = datetime.fromisoformat(fact_b.valid_from) + b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None + + # Check if A happened before B + if a_end and a_end <= b_start: + return "A happened before B" + + # Check if B happened before A + if b_end and b_end <= a_start: + return "B happened before A" + + # Check if they overlap + if a_end and b_end: + if a_start <= b_end and b_start <= a_end: + return "A and B overlap in time" + + # Check if one supersedes the other + if fact_a.superseded_by == fact_b.id: + return "B supersedes A" + if fact_b.superseded_by == fact_a.id: + return "A supersedes B" + + return "A and B are temporally unrelated" + + def get_worldview_at_time( + self, + timestamp: str, + subjects: Optional[List[str]] = None + ) -> Dict[str, List[Dict[str, Any]]]: + """Get Timmy's complete worldview at a specific point in time. + + Args: + timestamp: The point in time (ISO 8601) + subjects: Optional list of subjects to include. If None, includes all. + + Returns: + Dictionary mapping subjects to their facts at that time + """ + worldview = {} + + if subjects: + for subject in subjects: + facts = self.store.query_at_time(timestamp, subject=subject) + if facts: + worldview[subject] = [ + { + "predicate": f.predicate, + "object": f.object, + "version": f.version + } + for f in facts + ] + else: + # Get all facts at that time + all_facts = self.store.query_at_time(timestamp) + for fact in all_facts: + if fact.subject not in worldview: + worldview[fact.subject] = [] + worldview[fact.subject].append({ + "predicate": fact.predicate, + "object": fact.object, + "version": fact.version + }) + + return worldview + + def find_knowledge_gaps( + self, + subject: str, + expected_predicates: List[str] + ) -> List[str]: + """Find predicates that are missing or have expired for a subject. + + Args: + subject: The entity to check + expected_predicates: List of predicates that should exist + + Returns: + List of missing predicate names + """ + now = datetime.now().isoformat() + current_facts = self.store.query_at_time(now, subject=subject) + current_predicates = {f.predicate for f in current_facts} + + return [ + pred for pred in expected_predicates + if pred not in current_predicates + ] + + def export_reasoning_report( + self, + entity: str, + start_time: str, + end_time: str + ) -> str: + """Generate a human-readable reasoning report. + + Args: + entity: The entity to report on + start_time: Start of the time range + end_time: End of the time range + + Returns: + Formatted report string + """ + summary = self.generate_temporal_summary(entity, start_time, end_time) + + report = f""" +# Temporal Reasoning Report: {entity} + +## Time Range +- From: {start_time} +- To: {end_time} + +## Summary +- Total Changes: {summary.total_changes} +- Key Facts Tracked: {len(summary.key_facts)} + +## Key Facts +""" + for fact in summary.key_facts: + report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n" + + report += "\n## Evolution Timeline\n" + for change in summary.evolution_timeline[:10]: # Show first 10 + report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n" + if change.old_value: + report += f" - Changed from: {change.old_value}\n" + report += f" - Changed to: {change.new_value}\n" + + if len(summary.evolution_timeline) > 10: + report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n" + + report += "\n## Current State\n" + for state in summary.current_state: + report += f"- {state['predicate']}: {state['object']}\n" + + return report