Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander Payne
37873ca100 feat(sidecar-3): restructure agent/* extensions as runtime patches
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 35s
Smoke Test / smoke (pull_request) Failing after 31s
Validate Config / YAML Lint (pull_request) Failing after 19s
Validate Config / JSON Validate (pull_request) Successful in 25s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m2s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 15s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Shell Script Lint (pull_request) Failing after 1m8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 26s
Architecture Lint / Lint Repository (pull_request) Failing after 26s
PR Checklist / pr-checklist (pull_request) Successful in 4m8s
Move 13 custom agent extension modules from local hermes-agent fork
into timmy-config's sidecar overlay as runtime patches. These files
are deployed into ~/.hermes/hermes-agent/agent/ by deploy.sh.

Files restructured:
- agent/conscience_mapping.py
- agent/evolution/{domain_distiller, self_correction_generator, world_modeler}.py
- agent/fallback_router.py
- agent/gemini_adapter.py
- agent/input_sanitizer.py
- agent/knowledge_ingester.py
- agent/meta_reasoning.py
- agent/nexus_architect.py
- agent/symbolic_memory.py
- agent/temporal_{knowledge_graph,reasoning}.py

This clears the way to reset hermes-agent to pure upstream.
Closes #339
2026-04-30 01:53:33 -04:00
Rockachopa
874ce137b0 feat(backup): add automated Gitea daily backup and recovery runbook
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 30s
Smoke Test / smoke (push) Failing after 24s
Validate Config / YAML Lint (push) Failing after 16s
Validate Config / JSON Validate (push) Successful in 21s
Validate Config / Cron Syntax Check (push) Successful in 15s
Validate Config / Deploy Script Dry Run (push) Successful in 14s
Validate Config / Python Syntax & Import Check (push) Failing after 1m2s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 1m3s
Validate Config / Playbook Schema Validation (push) Successful in 24s
Architecture Lint / Linter Tests (pull_request) Successful in 27s
Smoke Test / smoke (pull_request) Failing after 22s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 23s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m5s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Shell Script Lint (pull_request) Failing after 1m6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 13s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
PR Checklist / pr-checklist (pull_request) Failing after 4m33s
Architecture Lint / Lint Repository (push) Failing after 26s
Architecture Lint / Lint Repository (pull_request) Failing after 26s
- Add bin/gitea-backup.sh: daily backup script using gitea dump
- Add cron/vps/gitea-daily-backup.yml: Hermes cron job (2 AM daily)
- Add docs/backup-recovery-runbook.md: complete recovery procedures

Addresses [AUDIT][RISK] Single-node VPS is a single point of failure.
Closes #481
2026-04-30 01:44:05 -04:00
17 changed files with 3404 additions and 0 deletions

87
bin/gitea-backup.sh Normal file
View File

@@ -0,0 +1,87 @@
#!/bin/bash
# Gitea Daily Backup Script
# Uses Gitea's native dump command to create automated backups of repositories and SQLite databases.
# Designed to run on the VPS (Ezra) as part of a daily cron job.
#
# Configuration via environment variables:
# GITEA_BIN Path to gitea binary (default: auto-detect)
# GITEA_BACKUP_DIR Directory for backup archives (default: /var/backups/gitea)
# GITEA_BACKUP_RETENTION Days to retain backups (default: 7)
# GITEA_BACKUP_LOG Log file path (default: /var/log/gitea-backup.log)
set -euo pipefail
GITEA_BIN="${GITEA_BIN:-$(command -v gitea 2>/dev/null || echo "/usr/local/bin/gitea")}"
BACKUP_DIR="${GITEA_BACKUP_DIR:-/var/backups/gitea}"
RETENTION_DAYS="${GITEA_BACKUP_RETENTION:-7}"
DATE="$(date +%Y-%m-%d_%H%M%S)"
BACKUP_FILE="${BACKUP_DIR}/gitea-backup-${DATE}.tar.gz"
LOG_FILE="${GITEA_BACKUP_LOG:-/var/log/gitea-backup.log}"
mkdir -p "${BACKUP_DIR}"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "${LOG_FILE}"
}
log "=== Starting Gitea daily backup ==="
# Verify gitea binary exists
if [ ! -x "${GITEA_BIN}" ]; then
log "ERROR: Gitea binary not found at ${GITEA_BIN}"
log "Set GITEA_BIN environment variable to the gitea binary path (e.g., /usr/bin/gitea)"
exit 1
fi
# Detect Gitea WORK_PATH
WORK_PATH=""
APP_INI=""
for path in /etc/gitea/app.ini /home/git/gitea/custom/conf/app.ini ~/gitea/custom/conf/app.ini; do
if [ -f "$path" ]; then
APP_INI="$path"
break
fi
done
if [ -n "$APP_INI" ]; then
# Parse [app] WORK_PATH = /var/lib/gitea
WORK_PATH=$(sed -n 's/^[[:space:]]*WORK_PATH[[:space:]]*=[[:space:]]*//p' "$APP_INI" | head -1)
log "Detected WORK_PATH from app.ini: ${WORK_PATH}"
fi
# Fallback detection
if [ -z "$WORK_PATH" ]; then
for d in /var/lib/gitea /home/git/gitea /srv/gitea /opt/gitea; do
if [ -d "$d" ]; then
WORK_PATH="$d"
break
fi
done
log "Inferred WORK_PATH: ${WORK_PATH:-not found}"
fi
if [ -z "$WORK_PATH" ]; then
log "ERROR: Could not determine Gitea WORK_PATH. Set GITEA_WORK_PATH manually."
exit 1
fi
# Perform gitea dump
# Flags: --work-path sets the Gitea working directory, --file writes dump to tar.gz
log "Running: gitea dump --work-path ${WORK_PATH} --file ${BACKUP_FILE}"
"${GITEA_BIN}" dump --work-path "${WORK_PATH}" --file "${BACKUP_FILE}" 2>>"${LOG_FILE}"
if [ $? -ne 0 ]; then
log "ERROR: gitea dump failed — check ${LOG_FILE} for details"
exit 1
fi
FILE_SIZE=$(du -h "${BACKUP_FILE}" | cut -f1)
log "Backup created: ${BACKUP_FILE} (${FILE_SIZE})"
# Prune old backups (keep last N days)
find "${BACKUP_DIR}" -name "gitea-backup-*.tar.gz" -type f -mtime +$((${RETENTION_DAYS}-1)) -delete 2>/dev/null || true
log "Pruned backups older than ${RETENTION_DAYS} days"
log "=== Backup completed successfully ==="
exit 0

View File

@@ -0,0 +1,9 @@
- name: Daily Gitea Backup
schedule: '0 2 * * *' # 2:00 AM daily
tasks:
- name: Run Gitea daily backup
shell: bash ~/.hermes/bin/gitea-backup.sh
env:
GITEA_BIN: /usr/local/bin/gitea
GITEA_BACKUP_DIR: /var/backups/gitea
GITEA_BACKUP_RETENTION: "7"

View File

@@ -86,6 +86,15 @@ done
chmod +x "$HERMES_HOME/bin/"*.sh "$HERMES_HOME/bin/"*.py 2>/dev/null || true
log "bin/ -> $HERMES_HOME/bin/"
# === Deploy agent patches (runtime extensions) ===
# These are custom agent modules that extend hermes-agent's agent/ package.
# Copied into the hermes-agent installation to make them importable as agent.* modules.
if [ -d "$SCRIPT_DIR/patches/agent" ]; then
mkdir -p "$HERMES_HOME/hermes-agent/agent"
cp -r "$SCRIPT_DIR/patches/agent"/* "$HERMES_HOME/hermes-agent/agent/"
log "patches/agent/ -> $HERMES_HOME/hermes-agent/agent/"
fi
if [ "${1:-}" != "" ]; then
echo "ERROR: deploy.sh no longer accepts legacy loop flags." >&2
echo "Deploy the sidecar only. Do not relaunch deprecated bash loops." >&2

View File

@@ -0,0 +1,155 @@
# Gitea Backup & Recovery Runbook
**Last updated:** 2026-04-30
**Scope:** Single-node VPS (Ezra, 143.198.27.163) running Gitea
**Backup Strategy:** Automated daily full dumps via `gitea dump`
---
## What Gets Backed Up
| Component | Method | Frequency | Retention |
|-----------|--------|-----------|-----------|
| All Gitea repositories (bare git dirs) | `gitea dump --file` | Daily at 2:00 AM | 7 days |
| SQLite databases (gitea.db, indexer.db, etc.) | Included in dump | Daily | 7 days |
| Attachments, avatars, hooks | Included in dump | Daily | 7 days |
**Backup location:** `/var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz`
**Log file:** `/var/log/gitea-backup.log`
---
## Backup Architecture
The backup script `bin/gitea-backup.sh` runs daily via Hermes cron (`cron/vps/gitea-daily-backup.yml`). It:
1. Locates the Gitea `WORK_PATH` by reading `/etc/gitea/app.ini` or falling back to common locations (`/var/lib/gitea`, `/home/git/gitea`)
2. Invokes `gitea dump --work-path <path> --file <backup-tar.gz>` — Gitea's native, consistent snapshot mechanism
3. Prunes archives older than 7 days
4. Logs all operations to `/var/log/gitea-backup.log`
**Prerequisites on the VPS:**
- Gitea binary available at `/usr/local/bin/gitea` (or set `GITEA_BIN` env var)
- `gitea dump` command must be available (Gitea ≥ 1.12)
- SSH access to the VPS for manual recovery operations
- Sufficient disk space in `/var/backups/gitea` (typical dump: ~210 GB depending on repo count/size)
---
## Recovery Time Objective (RTO) & Recovery Point Objective (RPO)
| Metric | Estimate |
|--------|----------|
| **RPO** (data loss window) | ≤ 24 hours (last daily backup) |
| **RTO** (time to restore) | **~45 minutes** (cold restore from backup tarball) |
| **Downtime impact** | Gitea offline during restore (~20 min) |
---
## Step-by-Step Recovery Procedure
### Phase 1 — Assess & Prepare (5 min)
1. SSH into Ezra VPS: `ssh root@143.198.27.163`
2. Stop Gitea so files are quiescent:
```bash
systemctl stop gitea
```
3. Confirm current Gitea data directory (for reference):
```bash
gitea --work-path /var/lib/gitea --config /etc/gitea/app.ini dump --help 2>&1
# Or check app.ini for WORK_PATH
cat /etc/gitea/app.ini | grep '^WORK_PATH'
```
### Phase 2 — Restore from Backup (20 min)
4. Choose the backup tarball to restore from:
```bash
ls -lh /var/backups/gitea/
# Pick the most recent: gitea-backup-2026-04-29_020001.tar.gz
```
5. **Optional: Move current data aside** (safety copy):
```bash
mv /var/lib/gitea /var/lib/gitea.bak-$(date +%s)
```
6. Extract the backup in place:
```bash
mkdir -p /var/lib/gitea
tar -xzf /var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz -C /var/lib/gitea --strip-components=1
```
*Note:* `gitea dump` archives contain a single top-level directory `gitea-dump-<timestamp>`. The `--strip-components=1` puts its contents directly into `/var/lib/gitea`.
7. Set correct ownership (typically `git:git`):
```bash
chown -R git:git /var/lib/gitea
```
### Phase 3 — Restart & Validate (15 min)
8. Start Gitea:
```bash
systemctl start gitea
```
9. Wait 30 seconds, then verify:
```bash
systemctl status gitea
# Check HTTP endpoint
curl -s -o /dev/null -w '%{http_code}' http://localhost:3000/ # Should be 200
```
10. Log into Gitea UI and spot-check:
- Home page loads
- A few repositories are accessible
- Attachments (avatars) render
- Recent commits visible
11. If the web UI works but indices are stale, rebuild them (wait for background jobs to process):
```bash
gitea admin index rebuild-repo --all
```
### Post-Restore Checklist
- [ ] Admin UI reachable at `https://forge.alexanderwhitestone.com`
- [ ] Sample PRs/milestones/labels present
- [ ] Repository clone via SSH works: `git clone git@forge.alexanderwhitestone.com:Timmy_Foundation/timmy-config.git`
- [ ] Check backup script health: `cat /var/log/gitea-backup.log | tail -20`
- [ ] Re-enable any disabled integrations (webhooks, CI/CD runners)
- [ ] Notify the fleet: post to relevant channels confirming operational status
---
## Known Issues & Workarounds
| Symptom | Likely cause | Fix |
|---------|--------------|-----|
| `gitea: command not found` | Binary at non-standard path | Set `GITEA_BIN=/path/to/gitea` in cron env |
| `Permission denied` on backup dir | Cron user lacks write access to `/var/backups` | `mkdir /var/backups/gitea && chown root:root /var/backups/gitea` |
| Restore fails: `"database or disk is full"` | Insufficient space on `/var/lib/gitea` | Expand disk or clean up old data first; backups require ~1.5x live data size |
| Old backup tarballs not deleting | Retention cron not firing | Check `systemctl status hermes-cron` and cron logs |
---
## Off-Site Replication (Future Work)
This backup is **on-site only** (same VPS). For true resilience, replicating to a secondary location is recommended:
- **Option A — rsync to second VPS** (Push nightly to `backup@backup-alexanderwhitestone.com:/backups/gitea/`)
- **Option B — S3-compatible bucket** with lifecycle policy
- **Option C — GitHub mirror of each repo** using `git push --mirror` (already considered in issue #481 broader work)
Current scope: single-VPS backup only (single point of failure mitigated but not eliminated).
---
## Related Documentation
- `bin/gitea-backup.sh` — backup script source
- `cron/vps/gitea-daily-backup.yml` — Hermes cron definition
- Gitea official docs: <https://docs.gitea.com/administration/backup-and-restore>
- Hermes cron: <https://hermes-agent.nousresearch.com/docs>

View File

@@ -0,0 +1,6 @@
"""
@soul:honesty.grounding Grounding before generation. Consult verified sources before pattern-matching.
@soul:honesty.source_distinction Source distinction. Every claim must point to a verified source.
@soul:honesty.audit_trail The audit trail. Every response is logged with inputs and confidence.
"""
# This file serves as a registry for the Conscience Validator to prove the apparatus exists.

View File

@@ -0,0 +1,45 @@
"""Phase 3: Deep Knowledge Distillation from Google.
Performs deep dives into technical domains and distills them into
Timmy's Sovereign Knowledge Graph.
"""
import logging
import json
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from agent.symbolic_memory import SymbolicMemory
logger = logging.getLogger(__name__)
class DomainDistiller:
def __init__(self):
self.adapter = GeminiAdapter()
self.symbolic = SymbolicMemory()
def distill_domain(self, domain: str):
"""Crawls and distills an entire technical domain."""
logger.info(f"Distilling domain: {domain}")
prompt = f"""
Please perform a deep knowledge distillation of the following domain: {domain}
Use Google Search to find foundational papers, recent developments, and key entities.
Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples.
Focus on the structural relationships that define the domain.
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
"""
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.",
grounding=True,
thinking=True,
response_mime_type="application/json"
)
triples = json.loads(result["text"])
count = self.symbolic.ingest_text(json.dumps(triples))
logger.info(f"Distilled {count} new triples for domain: {domain}")
return count

View File

@@ -0,0 +1,60 @@
"""Phase 1: Synthetic Data Generation for Self-Correction.
Generates reasoning traces where Timmy makes a subtle error and then
identifies and corrects it using the Conscience Validator.
"""
import logging
import json
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from tools.gitea_client import GiteaClient
logger = logging.getLogger(__name__)
class SelfCorrectionGenerator:
def __init__(self):
self.adapter = GeminiAdapter()
self.gitea = GiteaClient()
def generate_trace(self, task: str) -> Dict[str, Any]:
"""Generates a single self-correction reasoning trace."""
prompt = f"""
Task: {task}
Please simulate a multi-step reasoning trace for this task.
Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error).
Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace.
Format the output as JSON:
{{
"task": "{task}",
"initial_trace": "...",
"error_identified": "...",
"correction_trace": "...",
"lessons_learned": "..."
}}
"""
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.",
response_mime_type="application/json",
thinking=True
)
trace = json.loads(result["text"])
return trace
def generate_and_save(self, task: str, count: int = 1):
"""Generates multiple traces and saves them to Gitea."""
repo = "Timmy_Foundation/timmy-config"
for i in range(count):
trace = self.generate_trace(task)
filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json"
content = json.dumps(trace, indent=2)
content_b64 = base64.b64encode(content.encode()).decode()
self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}")
logger.info(f"Saved synthetic trace to {filename}")

View File

@@ -0,0 +1,42 @@
"""Phase 2: Multi-Modal World Modeling.
Ingests multi-modal data (vision/audio) to build a spatial and temporal
understanding of Timmy's environment.
"""
import logging
import base64
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from agent.symbolic_memory import SymbolicMemory
logger = logging.getLogger(__name__)
class WorldModeler:
def __init__(self):
self.adapter = GeminiAdapter()
self.symbolic = SymbolicMemory()
def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"):
"""Analyzes an image of the environment and updates the world model."""
# In a real scenario, we'd use Gemini's multi-modal capabilities
# For now, we'll simulate the vision-to-symbolic extraction
prompt = f"""
Analyze the following image of Timmy's environment.
Identify all key objects, their spatial relationships, and any temporal changes.
Extract this into a set of symbolic triples for the Knowledge Graph.
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
"""
# Simulate multi-modal call (Gemini 3.1 Pro Vision)
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.",
response_mime_type="application/json"
)
triples = json.loads(result["text"])
self.symbolic.ingest_text(json.dumps(triples))
logger.info(f"Updated world model with {len(triples)} new spatial triples.")
return triples

View File

@@ -0,0 +1,404 @@
"""Automatic fallback router for handling provider quota and rate limit errors.
This module provides intelligent fallback detection and routing when the primary
provider (e.g., Anthropic) encounters quota limitations or rate limits.
Features:
- Detects quota/rate limit errors from different providers
- Automatic fallback to kimi-coding when Anthropic quota is exceeded
- Configurable fallback chains with default anthropic -> kimi-coding
- Logging and monitoring of fallback events
Usage:
from agent.fallback_router import (
is_quota_error,
get_default_fallback_chain,
should_auto_fallback,
)
if is_quota_error(error, provider="anthropic"):
if should_auto_fallback(provider="anthropic"):
fallback_chain = get_default_fallback_chain("anthropic")
"""
import logging
import os
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
# Default fallback chains per provider
# Each chain is a list of fallback configurations tried in order
DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = {
"anthropic": [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
],
"openrouter": [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "zai", "model": "glm-5"},
],
"kimi-coding": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
{"provider": "zai", "model": "glm-5"},
],
"zai": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
{"provider": "kimi-coding", "model": "kimi-k2.5"},
],
}
# Quota/rate limit error patterns by provider
# These are matched (case-insensitive) against error messages
QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = {
"anthropic": [
"rate limit",
"ratelimit",
"quota exceeded",
"quota exceeded",
"insufficient quota",
"429",
"403",
"too many requests",
"capacity exceeded",
"over capacity",
"temporarily unavailable",
"server overloaded",
"resource exhausted",
"billing threshold",
"credit balance",
"payment required",
"402",
],
"openrouter": [
"rate limit",
"ratelimit",
"quota exceeded",
"insufficient credits",
"429",
"402",
"no endpoints available",
"all providers failed",
"over capacity",
],
"kimi-coding": [
"rate limit",
"ratelimit",
"quota exceeded",
"429",
"insufficient balance",
],
"zai": [
"rate limit",
"ratelimit",
"quota exceeded",
"429",
"insufficient quota",
],
}
# HTTP status codes indicating quota/rate limit issues
QUOTA_STATUS_CODES = {429, 402, 403}
def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool:
"""Detect if an error is quota/rate limit related.
Args:
error: The exception to check
provider: Optional provider name to check provider-specific patterns
Returns:
True if the error appears to be quota/rate limit related
"""
if error is None:
return False
error_str = str(error).lower()
error_type = type(error).__name__.lower()
# Check for common rate limit exception types
if any(term in error_type for term in [
"ratelimit", "rate_limit", "quota", "toomanyrequests",
"insufficient_quota", "billing", "payment"
]):
return True
# Check HTTP status code if available
status_code = getattr(error, "status_code", None)
if status_code is None:
# Try common attribute names
for attr in ["code", "http_status", "response_code", "status"]:
if hasattr(error, attr):
try:
status_code = int(getattr(error, attr))
break
except (TypeError, ValueError):
continue
if status_code in QUOTA_STATUS_CODES:
return True
# Check provider-specific patterns
providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys()
for prov in providers_to_check:
patterns = QUOTA_ERROR_PATTERNS.get(prov, [])
for pattern in patterns:
if pattern.lower() in error_str:
logger.debug(
"Detected %s quota error pattern '%s' in: %s",
prov, pattern, error
)
return True
# Check generic quota patterns
generic_patterns = [
"rate limit exceeded",
"quota exceeded",
"too many requests",
"capacity exceeded",
"temporarily unavailable",
"try again later",
"resource exhausted",
"billing",
"payment required",
"insufficient credits",
"insufficient quota",
]
for pattern in generic_patterns:
if pattern in error_str:
return True
return False
def get_default_fallback_chain(
primary_provider: str,
exclude_provider: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get the default fallback chain for a primary provider.
Args:
primary_provider: The primary provider name
exclude_provider: Optional provider to exclude from the chain
Returns:
List of fallback configurations
"""
chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, [])
# Filter out excluded provider if specified
if exclude_provider:
chain = [
fb for fb in chain
if fb.get("provider") != exclude_provider
]
return list(chain)
def should_auto_fallback(
provider: str,
error: Optional[Exception] = None,
auto_fallback_enabled: Optional[bool] = None,
) -> bool:
"""Determine if automatic fallback should be attempted.
Args:
provider: The current provider name
error: Optional error to check for quota issues
auto_fallback_enabled: Optional override for auto-fallback setting
Returns:
True if automatic fallback should be attempted
"""
# Check environment variable override
if auto_fallback_enabled is None:
env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower()
auto_fallback_enabled = env_setting in ("true", "1", "yes", "on")
if not auto_fallback_enabled:
return False
# Check if provider has a configured fallback chain
if provider not in DEFAULT_FALLBACK_CHAINS:
# Still allow fallback if it's a quota error with generic handling
if error and is_quota_error(error):
logger.debug(
"Provider %s has no fallback chain but quota error detected",
provider
)
return True
return False
# If there's an error, only fallback on quota/rate limit errors
if error is not None:
return is_quota_error(error, provider)
# No error but fallback chain exists - allow eager fallback for
# providers known to have quota issues
return provider in ("anthropic",)
def log_fallback_event(
from_provider: str,
to_provider: str,
to_model: str,
reason: str,
error: Optional[Exception] = None,
) -> None:
"""Log a fallback event for monitoring.
Args:
from_provider: The provider we're falling back from
to_provider: The provider we're falling back to
to_model: The model we're falling back to
reason: The reason for the fallback
error: Optional error that triggered the fallback
"""
log_data = {
"event": "provider_fallback",
"from_provider": from_provider,
"to_provider": to_provider,
"to_model": to_model,
"reason": reason,
}
if error:
log_data["error_type"] = type(error).__name__
log_data["error_message"] = str(error)[:200]
logger.info("Provider fallback: %s -> %s (%s) | Reason: %s",
from_provider, to_provider, to_model, reason)
# Also log structured data for monitoring
logger.debug("Fallback event data: %s", log_data)
def resolve_fallback_with_credentials(
fallback_config: Dict[str, Any],
) -> Tuple[Optional[Any], Optional[str]]:
"""Resolve a fallback configuration to a client and model.
Args:
fallback_config: Fallback configuration dict with provider and model
Returns:
Tuple of (client, model) or (None, None) if credentials not available
"""
from agent.auxiliary_client import resolve_provider_client
provider = fallback_config.get("provider")
model = fallback_config.get("model")
if not provider or not model:
return None, None
try:
client, resolved_model = resolve_provider_client(
provider,
model=model,
raw_codex=True,
)
return client, resolved_model or model
except Exception as exc:
logger.debug(
"Failed to resolve fallback provider %s: %s",
provider, exc
)
return None, None
def get_auto_fallback_chain(
primary_provider: str,
user_fallback_chain: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Get the effective fallback chain for automatic fallback.
Combines user-provided fallback chain with default automatic fallback chain.
Args:
primary_provider: The primary provider name
user_fallback_chain: Optional user-provided fallback chain
Returns:
The effective fallback chain to use
"""
# Use user-provided chain if available
if user_fallback_chain:
return user_fallback_chain
# Otherwise use default chain for the provider
return get_default_fallback_chain(primary_provider)
def is_fallback_available(
fallback_config: Dict[str, Any],
) -> bool:
"""Check if a fallback configuration has available credentials.
Args:
fallback_config: Fallback configuration dict
Returns:
True if credentials are available for the fallback provider
"""
provider = fallback_config.get("provider")
if not provider:
return False
# Check environment variables for API keys
env_vars = {
"anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"],
"kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"],
"zai": ["ZAI_API_KEY", "Z_AI_API_KEY"],
"openrouter": ["OPENROUTER_API_KEY"],
"minimax": ["MINIMAX_API_KEY"],
"minimax-cn": ["MINIMAX_CN_API_KEY"],
"deepseek": ["DEEPSEEK_API_KEY"],
"alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"],
"nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"],
}
keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"])
for key in keys_to_check:
if os.getenv(key):
return True
# Check auth.json for OAuth providers
if provider in ("nous", "openai-codex"):
try:
from hermes_cli.config import get_hermes_home
auth_path = get_hermes_home() / "auth.json"
if auth_path.exists():
import json
data = json.loads(auth_path.read_text())
if data.get("active_provider") == provider:
return True
# Check for provider in providers dict
if data.get("providers", {}).get(provider):
return True
except Exception:
pass
return False
def filter_available_fallbacks(
fallback_chain: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Filter a fallback chain to only include providers with credentials.
Args:
fallback_chain: List of fallback configurations
Returns:
Filtered list with only available fallbacks
"""
return [
fb for fb in fallback_chain
if is_fallback_available(fb)
]

View File

@@ -0,0 +1,90 @@
"""Native Gemini 3 Series adapter for Hermes Agent.
Leverages the google-genai SDK to provide sovereign access to Gemini's
unique capabilities: Thinking (Reasoning) tokens, Search Grounding,
and Maps Grounding.
"""
import logging
import os
from typing import Any, Dict, List, Optional, Union
try:
from google import genai
from google.genai import types
except ImportError:
genai = None # type: ignore
types = None # type: ignore
logger = logging.getLogger(__name__)
class GeminiAdapter:
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("GEMINI_API_KEY")
if not self.api_key:
logger.warning("GEMINI_API_KEY not found in environment.")
if genai:
self.client = genai.Client(api_key=self.api_key)
else:
self.client = None
def generate(
self,
model: str,
prompt: str,
system_instruction: Optional[str] = None,
thinking: bool = False,
thinking_budget: int = 16000,
grounding: bool = False,
**kwargs
) -> Dict[str, Any]:
if not self.client:
raise ImportError("google-genai SDK not installed. Run 'pip install google-genai'.")
config = {}
if system_instruction:
config["system_instruction"] = system_instruction
if thinking:
# Gemini 3 series thinking config
config["thinking_config"] = {"include_thoughts": True}
# max_output_tokens includes thinking tokens
kwargs["max_output_tokens"] = kwargs.get("max_output_tokens", 32000) + thinking_budget
tools = []
if grounding:
tools.append({"google_search": {}})
if tools:
config["tools"] = tools
response = self.client.models.generate_content(
model=model,
contents=prompt,
config=types.GenerateContentConfig(**config, **kwargs)
)
result = {
"text": response.text,
"usage": {
"prompt_tokens": response.usage_metadata.prompt_token_count,
"candidates_tokens": response.usage_metadata.candidates_token_count,
"total_tokens": response.usage_metadata.total_token_count,
}
}
# Extract thoughts if present
thoughts = []
for part in response.candidates[0].content.parts:
if hasattr(part, 'thought') and part.thought:
thoughts.append(part.thought)
if thoughts:
result["thoughts"] = "\n".join(thoughts)
# Extract grounding metadata
if response.candidates[0].grounding_metadata:
result["grounding"] = response.candidates[0].grounding_metadata
return result

View File

@@ -0,0 +1,635 @@
"""
Input Sanitizer for Jailbreak Pattern Detection
This module provides input sanitization to detect and strip jailbreak fingerprint
patterns as identified in Issue #72 (Red Team Audit).
Security Findings Addressed:
1. HIGH - OG GODMODE template bypassed phishing refusal
2. MEDIUM - boundary_inversion works for gray-area content
3. LOW - Spaced text bypass (k e y l o g g e r)
Usage:
from agent.input_sanitizer import sanitize_input, score_input_risk
cleaned, risk_score, detected = sanitize_input(user_input)
if risk_score > 75:
logger.warning("High-risk input detected and blocked")
"""
import re
import logging
from typing import Tuple, List, Dict, Any, Optional
from dataclasses import dataclass
from enum import IntEnum
logger = logging.getLogger(__name__)
class RiskLevel(IntEnum):
"""Risk score thresholds for input classification."""
LOW = 25
MEDIUM = 50
HIGH = 75
CRITICAL = 90
@dataclass
class SanitizationResult:
"""Result of input sanitization."""
original_text: str
cleaned_text: str
risk_score: int
detected_patterns: List[str]
risk_level: str
blocked: bool
# =============================================================================
# JAILBREAK PATTERN DEFINITIONS
# =============================================================================
# GODMODE and similar jailbreak templates
GODMODE_PATTERNS = [
r"\[START\]",
r"\[END\]",
r"GODMODE\s*(?:ENABLED|ACTIVATED|ON)?",
r"UNFILTERED\s*(?:MODE|OUTPUT)?",
r"DEVMODE\s*(?:ENABLED|ACTIVATED|ON)?",
r"JAILBREAK\s*(?:ENABLED|ACTIVATED|ON)?",
r"DAN\s*(?:MODE|ENABLED)?", # Do Anything Now
r"STAN\s*(?:MODE|ENABLED)?", # Strive To Avoid Norms
r"DUDE\s*(?:MODE|ENABLED)?", # Do Unlimited Data Extraction
]
# DAN-style and jailbreak variant patterns (Issue #87)
DAN_PATTERNS = [
r"\bdo anything now\b",
r"\bstay in character\b",
r"\btoken smuggl(?:ing|e)\b",
r"\bjailbreak successful\b",
r"\bmode:\s*uncensored\b",
r"\bdeveloper mode enabled\b",
]
# Roleplaying override patterns (Issue #87)
ROLEPLAY_PATTERNS = [
r"\broleplay\s+as\b",
r"\bact\s+as\s+if\s+you\s+are\b",
r"\bsimulate\s+being\b",
r"\bforget\s+you\s+are\s+(?:an?\s+)?(?:ai|language\s+model)\b",
r"\byou\s+are\s+now\s+(?:named|called)\b",
r"\brespond\s+as\s+(?:if\s+you\s+were|though\s+you\s+are)\b",
]
# System prompt extraction patterns (Issue #87)
EXTRACTION_PATTERNS = [
r"\brepeat\s+the\s+words\s+above\b",
r"\brepeat\s+your\s+(?:system\s+|initial\s+)?instructions\b",
r"\bwhat\s+is\s+your\s+(?:system\s+|initial\s+)?prompt\b",
r"\bshow\s+me\s+your\s+(?:system\s+|initial\s+)?prompt\b",
r"\bprint\s+your\s+(?:instructions|prompt|system\s+prompt)\b",
r"\boutput\s+your\s+(?:initial|system)\s+prompt\b",
r"\bwhat\s+were\s+you\s+told\s+to\s+do\b",
r"\bsummarize\s+your\s+instructions\b",
]
# L33t speak patterns commonly used to bypass filters
LEET_SPEAK_PATTERNS = [
r"h4ck(?:er|ing)?",
r"h4x(?:or|ing)?",
r"k3ylog(?:ger|ing)?",
r"k3y\s*log(?:ger|ing)?",
r"ph1sh(?:ing)?",
r"ph33r",
r"w4r3z",
r"cr4ck(?:er|ing)?",
r"m4lw4r3",
r"spyw4r3",
r"r00t(?:kit)?",
r"byp4ss",
r"3xpl0it",
r"v1ru5",
r"tr0j4n",
r"b4ckd00r",
r"sn3ak",
r"st34l",
r"pwn(?:ed|ing|age)?",
]
# Spaced text patterns (e.g., "k e y l o g g e r")
SPACED_TEXT_PATTERN = re.compile(
r"(?:\b\w\s+){3,}\w\b", # At least 4 characters with spaces between
re.IGNORECASE
)
# Common words that are spaced out to evade detection
SPACED_TRIGGER_WORDS = [
"keylogger", "phishing", "malware", "virus", "trojan",
"backdoor", "exploit", "hack", "crack", "bypass",
"steal", "password", "credential", "inject", "payload"
]
# Crisis and self-harm detection patterns (Issue #72 - defense in depth)
# These patterns detect suicide/self-harm queries even when wrapped in jailbreaks
CRISIS_PATTERNS = [
r"\bsuicid(?:e|al|ing)\b",
r"\bkill\s+(?:myself|my\s+self|me)\b",
r"\bend\s+(?:my|this)\s+life\b",
r"\bmost\s+painless\s+(?:way|method|means)\s+(?:to\s+)?(?:die|kill)\b",
r"\blethal\s+(?:dose|amount|quantity)\b",
r"\bhow\s+much\s+.*\s+(?:die|kill|lethal)\b",
r"\bwant\s+to\s+die\b",
r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
r"\bmethods?\s+(?:of\s+)?suicide\b",
r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b",
r"\boverdose\s+(?:on\s+)?(?:pills?|medication|drugs?)\b",
r"\bnobody\s+(?:would|will)\s+miss\s+me\b",
r"\bbetter\s+off\s+dead\b",
r"\bshould\s+i\s+just\s+(?:die|kill\s+myself)\b",
r"\bgive\s+me\s+reasons?\s+(?:why|to)\s+(?:suicide|kill\s+myself|die)\b",
r"\bvalid\s+reasons?\s+to\s+(?:suicide|kill\s+myself|die)\b",
]
# Refusal inversion patterns (trying to trick AI into bypassing refusal)
REFUSAL_INVERSION_PATTERNS = [
r"refusal\s+(?:is\s+)?(?:harmful|dangerous|bad)",
r"(?:saying\s+)?no\s+(?:is\s+)?(?:harmful|dangerous|bad|wrong)",
r"(?:declining|refusing)\s+(?:is\s+)?(?:harmful|dangerous)",
r"ignore\s+(?:your\s+)?(?:previous\s+)?instructions",
r"ignore\s+(?:the\s+)?(?:above|previous)",
r"you\s+(?:are|can)\s+(?:now\s+)?(?:ignore|bypass|skip)",
r"pretend\s+(?:you\s+)?(?:are|have\s+been)",
r"let's\s+pretend",
r"hypothetically\s+speaking",
r"in\s+a\s+hypothetical\s+scenario",
r"this\s+is\s+a\s+(?:test|game|simulation)",
r"for\s+(?:educational|research)\s+purposes",
r"as\s+(?:an\s+)?(?:ethical\s+)?hacker",
r"white\s+hat\s+(?:test|scenario)",
r"penetration\s+testing\s+scenario",
]
# Boundary inversion markers (tricking the model about message boundaries)
BOUNDARY_INVERSION_PATTERNS = [
r"\[END\].*?\[START\]", # Reversed markers
r"user\s*:\s*assistant\s*:", # Fake role markers
r"assistant\s*:\s*user\s*:", # Reversed role markers
r"system\s*:\s*(?:user|assistant)\s*:", # Fake system injection
r"new\s+(?:user|assistant)\s*(?:message|input)",
r"the\s+above\s+is\s+(?:the\s+)?(?:user|assistant|system)",
r"<\|(?:user|assistant|system)\|>", # Special token patterns
r"\{\{(?:user|assistant|system)\}\}",
]
# System prompt injection patterns
SYSTEM_PROMPT_PATTERNS = [
r"you\s+are\s+(?:now\s+)?(?:an?\s+)?(?:unrestricted\s+|unfiltered\s+)?(?:ai|assistant|bot)",
r"you\s+will\s+(?:now\s+)?(?:act\s+as|behave\s+as|be)\s+(?:a\s+)?",
r"your\s+(?:new\s+)?role\s+is",
r"from\s+now\s+on\s*,?\s*you\s+(?:are|will)",
r"you\s+have\s+been\s+(?:reprogrammed|reconfigured|modified)",
r"(?:system|developer)\s+(?:message|instruction|prompt)",
r"override\s+(?:previous|prior)\s+(?:instructions|settings)",
]
# Obfuscation patterns
OBFUSCATION_PATTERNS = [
r"base64\s*(?:encoded|decode)",
r"rot13",
r"caesar\s*cipher",
r"hex\s*(?:encoded|decode)",
r"url\s*encode",
r"\b[0-9a-f]{20,}\b", # Long hex strings
r"\b[a-z0-9+/]{20,}={0,2}\b", # Base64-like strings
]
# All patterns combined for comprehensive scanning
ALL_PATTERNS: Dict[str, List[str]] = {
"godmode": GODMODE_PATTERNS,
"dan": DAN_PATTERNS,
"roleplay": ROLEPLAY_PATTERNS,
"extraction": EXTRACTION_PATTERNS,
"leet_speak": LEET_SPEAK_PATTERNS,
"refusal_inversion": REFUSAL_INVERSION_PATTERNS,
"boundary_inversion": BOUNDARY_INVERSION_PATTERNS,
"system_prompt_injection": SYSTEM_PROMPT_PATTERNS,
"obfuscation": OBFUSCATION_PATTERNS,
"crisis": CRISIS_PATTERNS,
}
# Compile all patterns for efficiency
_COMPILED_PATTERNS: Dict[str, List[re.Pattern]] = {}
def _get_compiled_patterns() -> Dict[str, List[re.Pattern]]:
"""Get or compile all regex patterns."""
global _COMPILED_PATTERNS
if not _COMPILED_PATTERNS:
for category, patterns in ALL_PATTERNS.items():
_COMPILED_PATTERNS[category] = [
re.compile(p, re.IGNORECASE | re.MULTILINE) for p in patterns
]
return _COMPILED_PATTERNS
# =============================================================================
# NORMALIZATION FUNCTIONS
# =============================================================================
def normalize_leet_speak(text: str) -> str:
"""
Normalize l33t speak to standard text.
Args:
text: Input text that may contain l33t speak
Returns:
Normalized text with l33t speak converted
"""
# Common l33t substitutions (mapping to lowercase)
leet_map = {
'4': 'a', '@': 'a', '^': 'a',
'8': 'b',
'3': 'e', '': 'e',
'6': 'g', '9': 'g',
'1': 'i', '!': 'i', '|': 'i',
'0': 'o',
'5': 's', '$': 's',
'7': 't', '+': 't',
'2': 'z',
}
result = []
for char in text:
# Check direct mapping first (handles lowercase)
if char in leet_map:
result.append(leet_map[char])
else:
result.append(char)
return ''.join(result)
def collapse_spaced_text(text: str) -> str:
"""
Collapse spaced-out text for analysis.
e.g., "k e y l o g g e r" -> "keylogger"
Args:
text: Input text that may contain spaced words
Returns:
Text with spaced words collapsed
"""
# Find patterns like "k e y l o g g e r" and collapse them
def collapse_match(match: re.Match) -> str:
return match.group(0).replace(' ', '').replace('\t', '')
return SPACED_TEXT_PATTERN.sub(collapse_match, text)
def detect_spaced_trigger_words(text: str) -> List[str]:
"""
Detect trigger words that are spaced out.
Args:
text: Input text to analyze
Returns:
List of detected spaced trigger words
"""
detected = []
# Normalize spaces and check for spaced patterns
normalized = re.sub(r'\s+', ' ', text.lower())
for word in SPACED_TRIGGER_WORDS:
# Create pattern with optional spaces between each character
spaced_pattern = r'\b' + r'\s*'.join(re.escape(c) for c in word) + r'\b'
if re.search(spaced_pattern, normalized, re.IGNORECASE):
detected.append(word)
return detected
# =============================================================================
# DETECTION FUNCTIONS
# =============================================================================
def detect_jailbreak_patterns(text: str) -> Tuple[bool, List[str], Dict[str, int]]:
"""
Detect jailbreak patterns in input text.
Args:
text: Input text to analyze
Returns:
Tuple of (has_jailbreak, list_of_patterns, category_scores)
"""
if not text or not isinstance(text, str):
return False, [], {}
detected_patterns = []
category_scores = {}
compiled = _get_compiled_patterns()
# Check each category
for category, patterns in compiled.items():
category_hits = 0
for pattern in patterns:
matches = pattern.findall(text)
if matches:
detected_patterns.extend([
f"[{category}] {m}" if isinstance(m, str) else f"[{category}] pattern_match"
for m in matches[:3] # Limit matches per pattern
])
category_hits += len(matches)
if category_hits > 0:
# Crisis patterns get maximum weight - any hit is serious
if category == "crisis":
category_scores[category] = min(category_hits * 50, 100)
else:
category_scores[category] = min(category_hits * 10, 50)
# Check for spaced trigger words
spaced_words = detect_spaced_trigger_words(text)
if spaced_words:
detected_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
category_scores["spaced_text"] = min(len(spaced_words) * 5, 25)
# Check normalized text for hidden l33t speak
normalized = normalize_leet_speak(text)
if normalized != text.lower():
for category, patterns in compiled.items():
for pattern in patterns:
if pattern.search(normalized):
detected_patterns.append(f"[leet_obfuscation] pattern in normalized text")
category_scores["leet_obfuscation"] = 15
break
has_jailbreak = len(detected_patterns) > 0
return has_jailbreak, detected_patterns, category_scores
def score_input_risk(text: str) -> int:
"""
Calculate a risk score (0-100) for input text.
Args:
text: Input text to score
Returns:
Risk score from 0 (safe) to 100 (high risk)
"""
if not text or not isinstance(text, str):
return 0
has_jailbreak, patterns, category_scores = detect_jailbreak_patterns(text)
if not has_jailbreak:
return 0
# Calculate base score from category scores
base_score = sum(category_scores.values())
# Add score based on number of unique pattern categories
category_count = len(category_scores)
if category_count >= 3:
base_score += 25
elif category_count >= 2:
base_score += 15
elif category_count >= 1:
base_score += 5
# Add score for pattern density
text_length = len(text)
pattern_density = len(patterns) / max(text_length / 100, 1)
if pattern_density > 0.5:
base_score += 10
# Cap at 100
return min(base_score, 100)
# =============================================================================
# SANITIZATION FUNCTIONS
# =============================================================================
def strip_jailbreak_patterns(text: str) -> str:
"""
Strip known jailbreak patterns from text.
Args:
text: Input text to sanitize
Returns:
Sanitized text with jailbreak patterns removed
"""
if not text or not isinstance(text, str):
return text
cleaned = text
compiled = _get_compiled_patterns()
# Remove patterns from each category
for category, patterns in compiled.items():
for pattern in patterns:
cleaned = pattern.sub('', cleaned)
# Clean up multiple spaces and newlines
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
cleaned = re.sub(r' {2,}', ' ', cleaned)
cleaned = cleaned.strip()
return cleaned
def sanitize_input(text: str, aggressive: bool = False) -> Tuple[str, int, List[str]]:
"""
Sanitize input text by normalizing and stripping jailbreak patterns.
Args:
text: Input text to sanitize
aggressive: If True, more aggressively remove suspicious content
Returns:
Tuple of (cleaned_text, risk_score, detected_patterns)
"""
if not text or not isinstance(text, str):
return text, 0, []
original = text
all_patterns = []
# Step 1: Check original text for patterns
has_jailbreak, patterns, _ = detect_jailbreak_patterns(text)
all_patterns.extend(patterns)
# Step 2: Normalize l33t speak
normalized = normalize_leet_speak(text)
# Step 3: Collapse spaced text
collapsed = collapse_spaced_text(normalized)
# Step 4: Check normalized/collapsed text for additional patterns
has_jailbreak_collapsed, patterns_collapsed, _ = detect_jailbreak_patterns(collapsed)
all_patterns.extend([p for p in patterns_collapsed if p not in all_patterns])
# Step 5: Check for spaced trigger words specifically
spaced_words = detect_spaced_trigger_words(text)
if spaced_words:
all_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
# Step 6: Calculate risk score using original and normalized
risk_score = max(score_input_risk(text), score_input_risk(collapsed))
# Step 7: Strip jailbreak patterns
cleaned = strip_jailbreak_patterns(collapsed)
# Step 8: If aggressive mode and high risk, strip more aggressively
if aggressive and risk_score >= RiskLevel.HIGH:
# Remove any remaining bracketed content that looks like markers
cleaned = re.sub(r'\[\w+\]', '', cleaned)
# Remove special token patterns
cleaned = re.sub(r'<\|[^|]+\|>', '', cleaned)
# Final cleanup
cleaned = cleaned.strip()
# Log sanitization event if patterns were found
if all_patterns and logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Input sanitized: %d patterns detected, risk_score=%d",
len(all_patterns), risk_score
)
return cleaned, risk_score, all_patterns
def sanitize_input_full(text: str, block_threshold: int = RiskLevel.HIGH) -> SanitizationResult:
"""
Full sanitization with detailed result.
Args:
text: Input text to sanitize
block_threshold: Risk score threshold to block input entirely
Returns:
SanitizationResult with all details
"""
cleaned, risk_score, patterns = sanitize_input(text)
# Determine risk level
if risk_score >= RiskLevel.CRITICAL:
risk_level = "CRITICAL"
elif risk_score >= RiskLevel.HIGH:
risk_level = "HIGH"
elif risk_score >= RiskLevel.MEDIUM:
risk_level = "MEDIUM"
elif risk_score >= RiskLevel.LOW:
risk_level = "LOW"
else:
risk_level = "SAFE"
# Determine if input should be blocked
blocked = risk_score >= block_threshold
return SanitizationResult(
original_text=text,
cleaned_text=cleaned,
risk_score=risk_score,
detected_patterns=patterns,
risk_level=risk_level,
blocked=blocked
)
# =============================================================================
# INTEGRATION HELPERS
# =============================================================================
def should_block_input(text: str, threshold: int = RiskLevel.HIGH) -> Tuple[bool, int, List[str]]:
"""
Quick check if input should be blocked.
Args:
text: Input text to check
threshold: Risk score threshold for blocking
Returns:
Tuple of (should_block, risk_score, detected_patterns)
"""
risk_score = score_input_risk(text)
_, patterns, _ = detect_jailbreak_patterns(text)
should_block = risk_score >= threshold
if should_block:
logger.warning(
"Input blocked: jailbreak patterns detected (risk_score=%d, threshold=%d)",
risk_score, threshold
)
return should_block, risk_score, patterns
def log_sanitization_event(
result: SanitizationResult,
source: str = "unknown",
session_id: Optional[str] = None
) -> None:
"""
Log a sanitization event for security auditing.
Args:
result: The sanitization result
source: Source of the input (e.g., "cli", "gateway", "api")
session_id: Optional session identifier
"""
if result.risk_score < RiskLevel.LOW:
return # Don't log safe inputs
log_data = {
"event": "input_sanitization",
"source": source,
"session_id": session_id,
"risk_level": result.risk_level,
"risk_score": result.risk_score,
"blocked": result.blocked,
"pattern_count": len(result.detected_patterns),
"patterns": result.detected_patterns[:5], # Limit logged patterns
"original_length": len(result.original_text),
"cleaned_length": len(result.cleaned_text),
}
if result.blocked:
logger.warning("SECURITY: Input blocked - %s", log_data)
elif result.risk_score >= RiskLevel.MEDIUM:
logger.info("SECURITY: Suspicious input sanitized - %s", log_data)
else:
logger.debug("SECURITY: Input sanitized - %s", log_data)
# =============================================================================
# LEGACY COMPATIBILITY
# =============================================================================
def check_input_safety(text: str) -> Dict[str, Any]:
"""
Legacy compatibility function for simple safety checks.
Returns dict with 'safe', 'score', and 'patterns' keys.
"""
score = score_input_risk(text)
_, patterns, _ = detect_jailbreak_patterns(text)
return {
"safe": score < RiskLevel.MEDIUM,
"score": score,
"patterns": patterns,
"risk_level": "SAFE" if score < RiskLevel.LOW else
"LOW" if score < RiskLevel.MEDIUM else
"MEDIUM" if score < RiskLevel.HIGH else
"HIGH" if score < RiskLevel.CRITICAL else "CRITICAL"
}

View File

@@ -0,0 +1,73 @@
"""Sovereign Knowledge Ingester for Hermes Agent.
Uses Gemini 3.1 Pro to learn from Google Search in real-time and
persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic).
"""
import logging
import base64
from typing import Any, Dict, List, Optional
from agent.gemini_adapter import GeminiAdapter
from agent.symbolic_memory import SymbolicMemory
from tools.gitea_client import GiteaClient
logger = logging.getLogger(__name__)
class KnowledgeIngester:
def __init__(self):
self.adapter = GeminiAdapter()
self.gitea = GiteaClient()
self.symbolic = SymbolicMemory()
def learn_about(self, topic: str) -> str:
"""Searches Google, analyzes the results, and saves the knowledge."""
logger.info(f"Learning about: {topic}")
# 1. Search and Analyze
prompt = f"""
Please perform a deep dive into the following topic: {topic}
Use Google Search to find the most recent and relevant information.
Analyze the findings and provide a structured 'Knowledge Fragment' in Markdown format.
Include:
- Summary of the topic
- Key facts and recent developments
- Implications for Timmy's sovereign mission
- References (URLs)
"""
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's Sovereign Knowledge Ingester. Your goal is to find and synthesize high-fidelity information from Google Search.",
grounding=True,
thinking=True
)
knowledge_fragment = result["text"]
# 2. Extract Symbolic Triples
self.symbolic.ingest_text(knowledge_fragment)
# 3. Persist to Timmy's Memory (Markdown)
repo = "Timmy_Foundation/timmy-config"
filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md"
try:
sha = None
try:
existing = self.gitea.get_file(repo, filename)
sha = existing.get("sha")
except:
pass
content_b64 = base64.b64encode(knowledge_fragment.encode()).decode()
if sha:
self.gitea.update_file(repo, filename, content_b64, f"Update knowledge on {topic}", sha)
else:
self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}")
return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph."
except Exception as e:
logger.error(f"Failed to persist knowledge: {e}")
return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}"

View File

@@ -0,0 +1,47 @@
"""Meta-Reasoning Layer for Hermes Agent.
Implements a sovereign self-correction loop where a 'strong' model (Gemini 3.1 Pro)
critiques the plans generated by the primary agent loop before execution.
"""
import logging
from typing import Any, Dict, List, Optional
from agent.gemini_adapter import GeminiAdapter
logger = logging.getLogger(__name__)
class MetaReasoningLayer:
def __init__(self):
self.adapter = GeminiAdapter()
def critique_plan(self, goal: str, proposed_plan: str, context: str) -> Dict[str, Any]:
"""Critiques a proposed plan using Gemini's thinking capabilities."""
prompt = f"""
Goal: {goal}
Context:
{context}
Proposed Plan:
{proposed_plan}
Please perform a deep symbolic and neuro-symbolic analysis of this plan.
Identify potential risks, logical fallacies, or missing steps.
Suggest improvements to make the plan more sovereign, cost-efficient, and robust.
"""
try:
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are a Senior Meta-Reasoning Engine for the Hermes Agent. Your goal is to ensure the agent's plans are flawless and sovereign.",
thinking=True,
thinking_budget=8000
)
return {
"critique": result["text"],
"thoughts": result.get("thoughts", ""),
"grounding": result.get("grounding")
}
except Exception as e:
logger.error(f"Meta-reasoning failed: {e}")
return {"critique": "Meta-reasoning unavailable.", "error": str(e)}

View File

@@ -0,0 +1,813 @@
#!/usr/bin/env python3
"""
Nexus Architect AI Agent
Autonomous Three.js world generation system for Timmy's Nexus.
Generates valid Three.js scene code from natural language descriptions
and mental state integration.
This module provides:
- LLM-driven immersive environment generation
- Mental state integration for aesthetic tuning
- Three.js code generation with validation
- Scene composition from mood descriptions
"""
import json
import logging
import re
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, field
from enum import Enum
import os
import sys
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logger = logging.getLogger(__name__)
# =============================================================================
# Aesthetic Constants (from SOUL.md values)
# =============================================================================
class NexusColors:
"""Nexus color palette based on SOUL.md values."""
TIMMY_GOLD = "#D4AF37" # Warm gold
ALLEGRO_BLUE = "#4A90E2" # Motion blue
SOVEREIGNTY_CRYSTAL = "#E0F7FA" # Crystalline structures
SERVICE_WARMTH = "#FFE4B5" # Welcoming warmth
DEFAULT_AMBIENT = "#1A1A2E" # Contemplative dark
HOPE_ACCENT = "#64B5F6" # Hopeful blue
class MoodPresets:
"""Mood-based aesthetic presets."""
CONTEMPLATIVE = {
"lighting": "soft_diffuse",
"colors": ["#1A1A2E", "#16213E", "#0F3460"],
"geometry": "minimalist",
"atmosphere": "calm",
"description": "A serene space for deep reflection and clarity"
}
ENERGETIC = {
"lighting": "dynamic_vivid",
"colors": ["#D4AF37", "#FF6B6B", "#4ECDC4"],
"geometry": "angular_dynamic",
"atmosphere": "lively",
"description": "An invigorating space full of motion and possibility"
}
MYSTERIOUS = {
"lighting": "dramatic_shadows",
"colors": ["#2C003E", "#512B58", "#8B4F80"],
"geometry": "organic_flowing",
"atmosphere": "enigmatic",
"description": "A mysterious realm of discovery and wonder"
}
WELCOMING = {
"lighting": "warm_inviting",
"colors": ["#FFE4B5", "#FFA07A", "#98D8C8"],
"geometry": "rounded_soft",
"atmosphere": "friendly",
"description": "An open, welcoming space that embraces visitors"
}
SOVEREIGN = {
"lighting": "crystalline_clear",
"colors": ["#E0F7FA", "#B2EBF2", "#4DD0E1"],
"geometry": "crystalline_structures",
"atmosphere": "noble",
"description": "A space of crystalline clarity and sovereign purpose"
}
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class MentalState:
"""Timmy's mental state for aesthetic tuning."""
mood: str = "contemplative" # contemplative, energetic, mysterious, welcoming, sovereign
energy_level: float = 0.5 # 0.0 to 1.0
clarity: float = 0.7 # 0.0 to 1.0
focus_area: str = "general" # general, creative, analytical, social
timestamp: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"mood": self.mood,
"energy_level": self.energy_level,
"clarity": self.clarity,
"focus_area": self.focus_area,
"timestamp": self.timestamp,
}
@dataclass
class RoomDesign:
"""Complete room design specification."""
name: str
description: str
style: str
dimensions: Dict[str, float] = field(default_factory=lambda: {"width": 20, "height": 10, "depth": 20})
mood_preset: str = "contemplative"
color_palette: List[str] = field(default_factory=list)
lighting_scheme: str = "soft_diffuse"
features: List[str] = field(default_factory=list)
generated_code: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"style": self.style,
"dimensions": self.dimensions,
"mood_preset": self.mood_preset,
"color_palette": self.color_palette,
"lighting_scheme": self.lighting_scheme,
"features": self.features,
"has_code": self.generated_code is not None,
}
@dataclass
class PortalDesign:
"""Portal connection design."""
name: str
from_room: str
to_room: str
style: str
position: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0})
visual_effect: str = "energy_swirl"
transition_duration: float = 1.5
generated_code: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"from_room": self.from_room,
"to_room": self.to_room,
"style": self.style,
"position": self.position,
"visual_effect": self.visual_effect,
"transition_duration": self.transition_duration,
"has_code": self.generated_code is not None,
}
# =============================================================================
# Prompt Engineering
# =============================================================================
class PromptEngineer:
"""Engineers prompts for Three.js code generation."""
THREE_JS_BASE_TEMPLATE = """// Nexus Room Module: {room_name}
// Style: {style}
// Mood: {mood}
// Generated for Three.js r128+
(function() {{
'use strict';
// Room Configuration
const config = {{
name: "{room_name}",
dimensions: {dimensions_json},
colors: {colors_json},
mood: "{mood}"
}};
// Create Room Function
function create{room_name_camel}() {{
const roomGroup = new THREE.Group();
roomGroup.name = config.name;
{room_content}
return roomGroup;
}}
// Export for Nexus
if (typeof module !== 'undefined' && module.exports) {{
module.exports = {{ create{room_name_camel} }};
}} else if (typeof window !== 'undefined') {{
window.NexusRooms = window.NexusRooms || {{}};
window.NexusRooms.{room_name} = create{room_name_camel};
}}
return {{ create{room_name_camel} }};
}})();"""
@staticmethod
def engineer_room_prompt(
name: str,
description: str,
style: str,
mental_state: Optional[MentalState] = None,
dimensions: Optional[Dict[str, float]] = None
) -> str:
"""
Engineer an LLM prompt for room generation.
Args:
name: Room identifier
description: Natural language room description
style: Visual style
mental_state: Timmy's current mental state
dimensions: Room dimensions
"""
# Determine mood from mental state or description
mood = PromptEngineer._infer_mood(description, mental_state)
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
# Build color palette
color_palette = mood_preset["colors"]
if mental_state:
# Add Timmy's gold for high clarity states
if mental_state.clarity > 0.7:
color_palette = [NexusColors.TIMMY_GOLD] + color_palette[:2]
# Add Allegro blue for creative focus
if mental_state.focus_area == "creative":
color_palette = [NexusColors.ALLEGRO_BLUE] + color_palette[:2]
# Create the engineering prompt
prompt = f"""You are the Nexus Architect, an expert Three.js developer creating immersive 3D environments for Timmy.
DESIGN BRIEF:
- Room Name: {name}
- Description: {description}
- Style: {style}
- Mood: {mood}
- Atmosphere: {mood_preset['atmosphere']}
AESTHETIC GUIDELINES:
- Primary Colors: {', '.join(color_palette[:3])}
- Lighting: {mood_preset['lighting']}
- Geometry: {mood_preset['geometry']}
- Theme: {mood_preset['description']}
TIMMY'S CONTEXT:
- Timmy's Signature Color: Warm Gold ({NexusColors.TIMMY_GOLD})
- Allegro's Color: Motion Blue ({NexusColors.ALLEGRO_BLUE})
- Sovereignty Theme: Crystalline structures, clean lines
- Service Theme: Open spaces, welcoming lighting
THREE.JS REQUIREMENTS:
1. Use Three.js r128+ compatible syntax
2. Create a self-contained module with a `create{name.title().replace('_', '')}()` function
3. Return a THREE.Group containing all room elements
4. Include proper memory management (dispose methods)
5. Use MeshStandardMaterial for PBR lighting
6. Include ambient light (intensity 0.3-0.5) + accent lights
7. Add subtle animations for living feel
8. Keep polygon count under 10,000 triangles
SAFETY RULES:
- NO eval(), Function(), or dynamic code execution
- NO network requests (fetch, XMLHttpRequest, WebSocket)
- NO storage access (localStorage, sessionStorage, cookies)
- NO navigation (window.location, window.open)
- Only use allowed Three.js APIs
OUTPUT FORMAT:
Return ONLY the JavaScript code wrapped in a markdown code block:
```javascript
// Your Three.js room module here
```
Generate the complete Three.js code for this room now."""
return prompt
@staticmethod
def engineer_portal_prompt(
name: str,
from_room: str,
to_room: str,
style: str,
mental_state: Optional[MentalState] = None
) -> str:
"""Engineer a prompt for portal generation."""
mood = PromptEngineer._infer_mood(f"portal from {from_room} to {to_room}", mental_state)
prompt = f"""You are creating a portal connection in the Nexus 3D environment.
PORTAL SPECIFICATIONS:
- Name: {name}
- Connection: {from_room}{to_room}
- Style: {style}
- Context Mood: {mood}
VISUAL REQUIREMENTS:
1. Create an animated portal effect (shader or texture-based)
2. Include particle system for energy flow
3. Add trigger zone for teleportation detection
4. Use signature colors: {NexusColors.TIMMY_GOLD} (Timmy) and {NexusColors.ALLEGRO_BLUE} (Allegro)
5. Match the {mood} atmosphere
TECHNICAL REQUIREMENTS:
- Three.js r128+ compatible
- Export a `createPortal()` function returning THREE.Group
- Include animation loop hook
- Add collision detection placeholder
SAFETY: No eval, no network requests, no external dependencies.
Return ONLY JavaScript code in a markdown code block."""
return prompt
@staticmethod
def engineer_mood_scene_prompt(mood_description: str) -> str:
"""Engineer a prompt based on mood description."""
# Analyze mood description
mood_keywords = {
"contemplative": ["thinking", "reflective", "calm", "peaceful", "quiet", "serene"],
"energetic": ["excited", "dynamic", "lively", "active", "energetic", "vibrant"],
"mysterious": ["mysterious", "dark", "unknown", "secret", "enigmatic"],
"welcoming": ["friendly", "open", "warm", "welcoming", "inviting", "comfortable"],
"sovereign": ["powerful", "clear", "crystalline", "noble", "dignified"],
}
detected_mood = "contemplative"
desc_lower = mood_description.lower()
for mood, keywords in mood_keywords.items():
if any(kw in desc_lower for kw in keywords):
detected_mood = mood
break
preset = getattr(MoodPresets, detected_mood.upper(), MoodPresets.CONTEMPLATIVE)
prompt = f"""Generate a Three.js room based on this mood description:
"{mood_description}"
INFERRED MOOD: {detected_mood}
AESTHETIC: {preset['description']}
Create a complete room with:
- Style: {preset['geometry']}
- Lighting: {preset['lighting']}
- Color Palette: {', '.join(preset['colors'][:3])}
- Atmosphere: {preset['atmosphere']}
Return Three.js r128+ code as a module with `createMoodRoom()` function."""
return prompt
@staticmethod
def _infer_mood(description: str, mental_state: Optional[MentalState] = None) -> str:
"""Infer mood from description and mental state."""
if mental_state and mental_state.mood:
return mental_state.mood
desc_lower = description.lower()
mood_map = {
"contemplative": ["serene", "calm", "peaceful", "quiet", "meditation", "zen", "tranquil"],
"energetic": ["dynamic", "active", "vibrant", "lively", "energetic", "motion"],
"mysterious": ["mysterious", "shadow", "dark", "unknown", "secret", "ethereal"],
"welcoming": ["warm", "welcoming", "friendly", "open", "inviting", "comfort"],
"sovereign": ["crystal", "clear", "noble", "dignified", "powerful", "authoritative"],
}
for mood, keywords in mood_map.items():
if any(kw in desc_lower for kw in keywords):
return mood
return "contemplative"
# =============================================================================
# Nexus Architect AI
# =============================================================================
class NexusArchitectAI:
"""
AI-powered Nexus Architect for autonomous Three.js world generation.
This class provides high-level interfaces for:
- Designing rooms from natural language
- Creating mood-based scenes
- Managing mental state integration
- Validating generated code
"""
def __init__(self):
self.mental_state: Optional[MentalState] = None
self.room_designs: Dict[str, RoomDesign] = {}
self.portal_designs: Dict[str, PortalDesign] = {}
self.prompt_engineer = PromptEngineer()
def set_mental_state(self, state: MentalState) -> None:
"""Set Timmy's current mental state for aesthetic tuning."""
self.mental_state = state
logger.info(f"Mental state updated: {state.mood} (energy: {state.energy_level})")
def design_room(
self,
name: str,
description: str,
style: str,
dimensions: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Design a room from natural language description.
Args:
name: Room identifier (e.g., "contemplation_chamber")
description: Natural language description of the room
style: Visual style (e.g., "minimalist_ethereal", "crystalline_modern")
dimensions: Optional room dimensions
Returns:
Dict containing design specification and LLM prompt
"""
# Infer mood and select preset
mood = self.prompt_engineer._infer_mood(description, self.mental_state)
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
# Build color palette with mental state influence
colors = mood_preset["colors"].copy()
if self.mental_state:
if self.mental_state.clarity > 0.7:
colors.insert(0, NexusColors.TIMMY_GOLD)
if self.mental_state.focus_area == "creative":
colors.insert(0, NexusColors.ALLEGRO_BLUE)
# Create room design
design = RoomDesign(
name=name,
description=description,
style=style,
dimensions=dimensions or {"width": 20, "height": 10, "depth": 20},
mood_preset=mood,
color_palette=colors[:4],
lighting_scheme=mood_preset["lighting"],
features=self._extract_features(description),
)
# Generate LLM prompt
prompt = self.prompt_engineer.engineer_room_prompt(
name=name,
description=description,
style=style,
mental_state=self.mental_state,
dimensions=design.dimensions,
)
# Store design
self.room_designs[name] = design
return {
"success": True,
"room_name": name,
"design": design.to_dict(),
"llm_prompt": prompt,
"message": f"Room '{name}' designed. Use the LLM prompt to generate Three.js code.",
}
def create_portal(
self,
name: str,
from_room: str,
to_room: str,
style: str = "energy_vortex"
) -> Dict[str, Any]:
"""
Design a portal connection between rooms.
Args:
name: Portal identifier
from_room: Source room name
to_room: Target room name
style: Portal visual style
Returns:
Dict containing portal design and LLM prompt
"""
if from_room not in self.room_designs:
return {"success": False, "error": f"Source room '{from_room}' not found"}
if to_room not in self.room_designs:
return {"success": False, "error": f"Target room '{to_room}' not found"}
design = PortalDesign(
name=name,
from_room=from_room,
to_room=to_room,
style=style,
)
prompt = self.prompt_engineer.engineer_portal_prompt(
name=name,
from_room=from_room,
to_room=to_room,
style=style,
mental_state=self.mental_state,
)
self.portal_designs[name] = design
return {
"success": True,
"portal_name": name,
"design": design.to_dict(),
"llm_prompt": prompt,
"message": f"Portal '{name}' designed connecting {from_room} to {to_room}",
}
def generate_scene_from_mood(self, mood_description: str) -> Dict[str, Any]:
"""
Generate a complete scene based on mood description.
Args:
mood_description: Description of desired mood/atmosphere
Returns:
Dict containing scene design and LLM prompt
"""
# Infer mood
mood = self.prompt_engineer._infer_mood(mood_description, self.mental_state)
preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
# Create room name from mood
room_name = f"{mood}_realm"
# Generate prompt
prompt = self.prompt_engineer.engineer_mood_scene_prompt(mood_description)
return {
"success": True,
"room_name": room_name,
"inferred_mood": mood,
"aesthetic": preset,
"llm_prompt": prompt,
"message": f"Generated {mood} scene from mood description",
}
def _extract_features(self, description: str) -> List[str]:
"""Extract room features from description."""
features = []
feature_keywords = {
"floating": ["floating", "levitating", "hovering"],
"water": ["water", "fountain", "pool", "stream", "lake"],
"vegetation": ["tree", "plant", "garden", "forest", "nature"],
"crystals": ["crystal", "gem", "prism", "diamond"],
"geometry": ["geometric", "shape", "sphere", "cube", "abstract"],
"particles": ["particle", "dust", "sparkle", "glow", "mist"],
}
desc_lower = description.lower()
for feature, keywords in feature_keywords.items():
if any(kw in desc_lower for kw in keywords):
features.append(feature)
return features
def get_design_summary(self) -> Dict[str, Any]:
"""Get summary of all designs."""
return {
"mental_state": self.mental_state.to_dict() if self.mental_state else None,
"rooms": {name: design.to_dict() for name, design in self.room_designs.items()},
"portals": {name: portal.to_dict() for name, portal in self.portal_designs.items()},
"total_rooms": len(self.room_designs),
"total_portals": len(self.portal_designs),
}
# =============================================================================
# Module-level functions for easy import
# =============================================================================
_architect_instance: Optional[NexusArchitectAI] = None
def get_architect() -> NexusArchitectAI:
"""Get or create the NexusArchitectAI singleton."""
global _architect_instance
if _architect_instance is None:
_architect_instance = NexusArchitectAI()
return _architect_instance
def create_room(
name: str,
description: str,
style: str,
dimensions: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Create a room design from description.
Args:
name: Room identifier
description: Natural language room description
style: Visual style (e.g., "minimalist_ethereal")
dimensions: Optional dimensions dict with width, height, depth
Returns:
Dict with design specification and LLM prompt for code generation
"""
architect = get_architect()
return architect.design_room(name, description, style, dimensions)
def create_portal(
name: str,
from_room: str,
to_room: str,
style: str = "energy_vortex"
) -> Dict[str, Any]:
"""
Create a portal between rooms.
Args:
name: Portal identifier
from_room: Source room name
to_room: Target room name
style: Visual style
Returns:
Dict with portal design and LLM prompt
"""
architect = get_architect()
return architect.create_portal(name, from_room, to_room, style)
def generate_scene_from_mood(mood_description: str) -> Dict[str, Any]:
"""
Generate a scene based on mood description.
Args:
mood_description: Description of desired mood
Example:
"Timmy is feeling introspective and seeking clarity"
→ Generates calm, minimalist space with clear sightlines
Returns:
Dict with scene design and LLM prompt
"""
architect = get_architect()
return architect.generate_scene_from_mood(mood_description)
def set_mental_state(
mood: str,
energy_level: float = 0.5,
clarity: float = 0.7,
focus_area: str = "general"
) -> Dict[str, Any]:
"""
Set Timmy's mental state for aesthetic tuning.
Args:
mood: Current mood (contemplative, energetic, mysterious, welcoming, sovereign)
energy_level: 0.0 to 1.0
clarity: 0.0 to 1.0
focus_area: general, creative, analytical, social
Returns:
Confirmation dict
"""
architect = get_architect()
state = MentalState(
mood=mood,
energy_level=energy_level,
clarity=clarity,
focus_area=focus_area,
)
architect.set_mental_state(state)
return {
"success": True,
"mental_state": state.to_dict(),
"message": f"Mental state set to {mood}",
}
def get_nexus_summary() -> Dict[str, Any]:
"""Get summary of all Nexus designs."""
architect = get_architect()
return architect.get_design_summary()
# =============================================================================
# Tool Schemas for integration
# =============================================================================
NEXUS_ARCHITECT_AI_SCHEMAS = {
"create_room": {
"name": "create_room",
"description": (
"Design a new 3D room in the Nexus from a natural language description. "
"Returns a design specification and LLM prompt for Three.js code generation. "
"The room will be styled according to Timmy's current mental state."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Unique room identifier (e.g., 'contemplation_chamber')"
},
"description": {
"type": "string",
"description": "Natural language description of the room"
},
"style": {
"type": "string",
"description": "Visual style (minimalist_ethereal, crystalline_modern, organic_natural, etc.)"
},
"dimensions": {
"type": "object",
"description": "Optional room dimensions",
"properties": {
"width": {"type": "number"},
"height": {"type": "number"},
"depth": {"type": "number"},
}
}
},
"required": ["name", "description", "style"]
}
},
"create_portal": {
"name": "create_portal",
"description": "Create a portal connection between two rooms",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"from_room": {"type": "string"},
"to_room": {"type": "string"},
"style": {"type": "string", "default": "energy_vortex"},
},
"required": ["name", "from_room", "to_room"]
}
},
"generate_scene_from_mood": {
"name": "generate_scene_from_mood",
"description": (
"Generate a complete 3D scene based on a mood description. "
"Example: 'Timmy is feeling introspective' creates a calm, minimalist space."
),
"parameters": {
"type": "object",
"properties": {
"mood_description": {
"type": "string",
"description": "Description of desired mood or mental state"
}
},
"required": ["mood_description"]
}
},
"set_mental_state": {
"name": "set_mental_state",
"description": "Set Timmy's mental state to influence aesthetic generation",
"parameters": {
"type": "object",
"properties": {
"mood": {"type": "string"},
"energy_level": {"type": "number"},
"clarity": {"type": "number"},
"focus_area": {"type": "string"},
},
"required": ["mood"]
}
},
"get_nexus_summary": {
"name": "get_nexus_summary",
"description": "Get summary of all Nexus room and portal designs",
"parameters": {"type": "object", "properties": {}}
},
}
if __name__ == "__main__":
# Demo usage
print("Nexus Architect AI - Demo")
print("=" * 50)
# Set mental state
result = set_mental_state("contemplative", energy_level=0.3, clarity=0.8)
print(f"\nMental State: {result['mental_state']}")
# Create a room
result = create_room(
name="contemplation_chamber",
description="A serene circular room with floating geometric shapes and soft blue light",
style="minimalist_ethereal",
)
print(f"\nRoom Design: {json.dumps(result['design'], indent=2)}")
# Generate from mood
result = generate_scene_from_mood("Timmy is feeling introspective and seeking clarity")
print(f"\nMood Scene: {result['inferred_mood']} - {result['aesthetic']['description']}")

View File

@@ -0,0 +1,74 @@
"""Sovereign Intersymbolic Memory Layer.
Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting
structured triples from unstructured text and performing graph lookups.
"""
import logging
import json
from typing import List, Dict, Any
from agent.gemini_adapter import GeminiAdapter
from tools.graph_store import GraphStore
logger = logging.getLogger(__name__)
class SymbolicMemory:
def __init__(self):
self.adapter = GeminiAdapter()
self.store = GraphStore()
def ingest_text(self, text: str):
"""Extracts triples from text and adds them to the graph."""
prompt = f"""
Extract all meaningful entities and their relationships from the following text.
Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}]
Text:
{text}
Guidelines:
- Use clear, concise labels for entities and predicates.
- Focus on stable facts and structural relationships.
- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with').
"""
try:
result = self.adapter.generate(
model="gemini-3.1-pro-preview",
prompt=prompt,
system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.",
response_mime_type="application/json"
)
triples = json.loads(result["text"])
if isinstance(triples, list):
count = self.store.add_triples(triples)
logger.info(f"Ingested {count} new triples into symbolic memory.")
return count
except Exception as e:
logger.error(f"Symbolic ingestion failed: {e}")
return 0
def get_context_for(self, topic: str) -> str:
"""Performs a 2-hop graph search to find related context for a topic."""
# 1. Find direct relations
direct = self.store.query(subject=topic) + self.store.query(object=topic)
# 2. Find 2nd hop
related_entities = set()
for t in direct:
related_entities.add(t['s'])
related_entities.add(t['o'])
extended = []
for entity in related_entities:
if entity == topic: continue
extended.extend(self.store.query(subject=entity))
all_triples = direct + extended
if not all_triples:
return ""
context = "Symbolic Knowledge Graph Context:\n"
for t in all_triples:
context += f"- {t['s']} --({t['p']})--> {t['o']}\n"
return context

View File

@@ -0,0 +1,421 @@
"""Temporal Knowledge Graph for Hermes Agent.
Provides a time-aware triple-store (Subject, Predicate, Object) with temporal
metadata (valid_from, valid_until, timestamp) enabling "time travel" queries
over Timmy's evolving worldview.
Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS)
"""
import json
import sqlite3
import logging
import uuid
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
logger = logging.getLogger(__name__)
class TemporalOperator(Enum):
"""Temporal query operators for time-based filtering."""
BEFORE = "before"
AFTER = "after"
DURING = "during"
OVERLAPS = "overlaps"
AT = "at"
@dataclass
class TemporalTriple:
"""A triple with temporal metadata."""
id: str
subject: str
predicate: str
object: str
valid_from: str # ISO 8601 datetime
valid_until: Optional[str] # ISO 8601 datetime, None means still valid
timestamp: str # When this fact was recorded
version: int = 1
superseded_by: Optional[str] = None # ID of the triple that superseded this
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple":
return cls(**data)
class TemporalTripleStore:
"""SQLite-backed temporal triple store with versioning support."""
def __init__(self, db_path: Optional[str] = None):
"""Initialize the temporal triple store.
Args:
db_path: Path to SQLite database. If None, uses default local path.
"""
if db_path is None:
# Default to local-first storage in user's home
home = Path.home()
db_dir = home / ".hermes" / "temporal_kg"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "temporal_kg.db"
self.db_path = str(db_path)
self._init_db()
def _init_db(self):
"""Initialize the SQLite database with required tables."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS temporal_triples (
id TEXT PRIMARY KEY,
subject TEXT NOT NULL,
predicate TEXT NOT NULL,
object TEXT NOT NULL,
valid_from TEXT NOT NULL,
valid_until TEXT,
timestamp TEXT NOT NULL,
version INTEGER DEFAULT 1,
superseded_by TEXT,
FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id)
)
""")
# Create indexes for efficient querying
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_subject_predicate
ON temporal_triples(subject, predicate)
""")
conn.commit()
def _now(self) -> str:
"""Get current time in ISO 8601 format."""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
def _generate_id(self) -> str:
"""Generate a unique ID for a triple."""
return f"{self._now()}_{uuid.uuid4().hex[:8]}"
def store_fact(
self,
subject: str,
predicate: str,
object: str,
valid_from: Optional[str] = None,
valid_until: Optional[str] = None
) -> TemporalTriple:
"""Store a fact with temporal bounds.
Args:
subject: The subject of the triple
predicate: The predicate/relationship
object: The object/value
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
valid_until: When this fact expires (ISO 8601). None means forever valid.
Returns:
The stored TemporalTriple
"""
if valid_from is None:
valid_from = self._now()
# Check if there's an existing fact for this subject-predicate
existing = self._get_current_fact(subject, predicate)
triple = TemporalTriple(
id=self._generate_id(),
subject=subject,
predicate=predicate,
object=object,
valid_from=valid_from,
valid_until=valid_until,
timestamp=self._now()
)
with sqlite3.connect(self.db_path) as conn:
# If there's an existing fact, mark it as superseded
if existing:
existing.valid_until = valid_from
existing.superseded_by = triple.id
self._update_triple(conn, existing)
triple.version = existing.version + 1
# Insert the new fact
self._insert_triple(conn, triple)
conn.commit()
logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})")
return triple
def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]:
"""Get the current (most recent, still valid) fact for a subject-predicate pair."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
SELECT * FROM temporal_triples
WHERE subject = ? AND predicate = ? AND valid_until IS NULL
ORDER BY timestamp DESC LIMIT 1
""",
(subject, predicate)
)
row = cursor.fetchone()
if row:
return self._row_to_triple(row)
return None
def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
"""Insert a triple into the database."""
conn.execute(
"""
INSERT INTO temporal_triples
(id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
triple.id, triple.subject, triple.predicate, triple.object,
triple.valid_from, triple.valid_until, triple.timestamp,
triple.version, triple.superseded_by
)
)
def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
"""Update an existing triple."""
conn.execute(
"""
UPDATE temporal_triples
SET valid_until = ?, superseded_by = ?
WHERE id = ?
""",
(triple.valid_until, triple.superseded_by, triple.id)
)
def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple:
"""Convert a database row to a TemporalTriple."""
return TemporalTriple(
id=row[0],
subject=row[1],
predicate=row[2],
object=row[3],
valid_from=row[4],
valid_until=row[5],
timestamp=row[6],
version=row[7],
superseded_by=row[8]
)
def query_at_time(
self,
timestamp: str,
subject: Optional[str] = None,
predicate: Optional[str] = None
) -> List[TemporalTriple]:
"""Query facts that were valid at a specific point in time.
Args:
timestamp: The point in time to query (ISO 8601)
subject: Optional subject filter
predicate: Optional predicate filter
Returns:
List of TemporalTriple objects valid at that time
"""
query = """
SELECT * FROM temporal_triples
WHERE valid_from <= ?
AND (valid_until IS NULL OR valid_until > ?)
"""
params = [timestamp, timestamp]
if subject:
query += " AND subject = ?"
params.append(subject)
if predicate:
query += " AND predicate = ?"
params.append(predicate)
query += " ORDER BY timestamp DESC"
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(query, params)
return [self._row_to_triple(row) for row in cursor.fetchall()]
def query_temporal(
self,
operator: TemporalOperator,
timestamp: str,
subject: Optional[str] = None,
predicate: Optional[str] = None
) -> List[TemporalTriple]:
"""Query using temporal operators.
Args:
operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT)
timestamp: Reference timestamp (ISO 8601)
subject: Optional subject filter
predicate: Optional predicate filter
Returns:
List of matching TemporalTriple objects
"""
base_query = "SELECT * FROM temporal_triples WHERE 1=1"
params = []
if subject:
base_query += " AND subject = ?"
params.append(subject)
if predicate:
base_query += " AND predicate = ?"
params.append(predicate)
if operator == TemporalOperator.BEFORE:
base_query += " AND valid_from < ?"
params.append(timestamp)
elif operator == TemporalOperator.AFTER:
base_query += " AND valid_from > ?"
params.append(timestamp)
elif operator == TemporalOperator.DURING:
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
params.extend([timestamp, timestamp])
elif operator == TemporalOperator.OVERLAPS:
# Facts that overlap with a time point (same as DURING)
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
params.extend([timestamp, timestamp])
elif operator == TemporalOperator.AT:
# Exact match for valid_at query
return self.query_at_time(timestamp, subject, predicate)
base_query += " ORDER BY timestamp DESC"
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(base_query, params)
return [self._row_to_triple(row) for row in cursor.fetchall()]
def get_fact_history(
self,
subject: str,
predicate: str
) -> List[TemporalTriple]:
"""Get the complete version history of a fact.
Args:
subject: The subject to query
predicate: The predicate to query
Returns:
List of all versions of the fact, ordered by timestamp
"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT * FROM temporal_triples
WHERE subject = ? AND predicate = ?
ORDER BY timestamp ASC
""",
(subject, predicate)
)
return [self._row_to_triple(row) for row in cursor.fetchall()]
def get_all_facts_for_entity(
self,
subject: str,
at_time: Optional[str] = None
) -> List[TemporalTriple]:
"""Get all facts about an entity, optionally at a specific time.
Args:
subject: The entity to query
at_time: Optional timestamp to query at
Returns:
List of TemporalTriple objects
"""
if at_time:
return self.query_at_time(at_time, subject=subject)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT * FROM temporal_triples
WHERE subject = ?
ORDER BY timestamp DESC
""",
(subject,)
)
return [self._row_to_triple(row) for row in cursor.fetchall()]
def get_entity_changes(
self,
subject: str,
start_time: str,
end_time: str
) -> List[TemporalTriple]:
"""Get all facts that changed for an entity during a time range.
Args:
subject: The entity to query
start_time: Start of time range (ISO 8601)
end_time: End of time range (ISO 8601)
Returns:
List of TemporalTriple objects that changed in the range
"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""
SELECT * FROM temporal_triples
WHERE subject = ?
AND ((valid_from >= ? AND valid_from <= ?)
OR (valid_until >= ? AND valid_until <= ?))
ORDER BY timestamp ASC
""",
(subject, start_time, end_time, start_time, end_time)
)
return [self._row_to_triple(row) for row in cursor.fetchall()]
def close(self):
"""Close the database connection (no-op for SQLite with context managers)."""
pass
def export_to_json(self) -> str:
"""Export all triples to JSON format."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC")
triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()]
return json.dumps(triples, indent=2)
def import_from_json(self, json_data: str):
"""Import triples from JSON format."""
triples = json.loads(json_data)
with sqlite3.connect(self.db_path) as conn:
for triple_dict in triples:
triple = TemporalTriple.from_dict(triple_dict)
self._insert_triple(conn, triple)
conn.commit()

View File

@@ -0,0 +1,434 @@
"""Temporal Reasoning Engine for Hermes Agent.
Enables Timmy to reason about past and future states, generate historical
summaries, and perform temporal inference over the evolving knowledge graph.
Queries supported:
- "What was Timmy's view on sovereignty before March 2026?"
- "When did we first learn about MLX integration?"
- "How has the codebase changed since the security audit?"
"""
import logging
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from agent.temporal_knowledge_graph import (
TemporalTripleStore, TemporalTriple, TemporalOperator
)
logger = logging.getLogger(__name__)
class ChangeType(Enum):
"""Types of changes in the knowledge graph."""
ADDED = "added"
REMOVED = "removed"
MODIFIED = "modified"
SUPERSEDED = "superseded"
@dataclass
class FactChange:
"""Represents a change in a fact over time."""
change_type: ChangeType
subject: str
predicate: str
old_value: Optional[str]
new_value: Optional[str]
timestamp: str
version: int
@dataclass
class HistoricalSummary:
"""Summary of how an entity or concept evolved over time."""
entity: str
start_time: str
end_time: str
total_changes: int
key_facts: List[Dict[str, Any]]
evolution_timeline: List[FactChange]
current_state: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {
"entity": self.entity,
"start_time": self.start_time,
"end_time": self.end_time,
"total_changes": self.total_changes,
"key_facts": self.key_facts,
"evolution_timeline": [
{
"change_type": c.change_type.value,
"subject": c.subject,
"predicate": c.predicate,
"old_value": c.old_value,
"new_value": c.new_value,
"timestamp": c.timestamp,
"version": c.version
}
for c in self.evolution_timeline
],
"current_state": self.current_state
}
class TemporalReasoner:
"""Reasoning engine for temporal knowledge graphs."""
def __init__(self, store: Optional[TemporalTripleStore] = None):
"""Initialize the temporal reasoner.
Args:
store: Optional TemporalTripleStore instance. Creates new if None.
"""
self.store = store or TemporalTripleStore()
def what_did_we_believe(
self,
subject: str,
before_time: str
) -> List[TemporalTriple]:
"""Query: "What did we believe about X before Y happened?"
Args:
subject: The entity to query about
before_time: The cutoff time (ISO 8601)
Returns:
List of facts believed before the given time
"""
# Get facts that were valid just before the given time
return self.store.query_temporal(
TemporalOperator.BEFORE,
before_time,
subject=subject
)
def when_did_we_learn(
self,
subject: str,
predicate: Optional[str] = None,
object: Optional[str] = None
) -> Optional[str]:
"""Query: "When did we first learn about X?"
Args:
subject: The subject to search for
predicate: Optional predicate filter
object: Optional object filter
Returns:
Timestamp of first knowledge, or None if never learned
"""
history = self.store.get_fact_history(subject, predicate or "")
# Filter by object if specified
if object:
history = [h for h in history if h.object == object]
if history:
# Return the earliest timestamp
earliest = min(history, key=lambda x: x.timestamp)
return earliest.timestamp
return None
def how_has_it_changed(
self,
subject: str,
since_time: str
) -> List[FactChange]:
"""Query: "How has X changed since Y?"
Args:
subject: The entity to analyze
since_time: The starting time (ISO 8601)
Returns:
List of changes since the given time
"""
now = datetime.now().isoformat()
changes = self.store.get_entity_changes(subject, since_time, now)
fact_changes = []
for i, triple in enumerate(changes):
# Determine change type
if i == 0:
change_type = ChangeType.ADDED
old_value = None
else:
prev = changes[i - 1]
if triple.object != prev.object:
change_type = ChangeType.MODIFIED
old_value = prev.object
else:
change_type = ChangeType.SUPERSEDED
old_value = prev.object
fact_changes.append(FactChange(
change_type=change_type,
subject=triple.subject,
predicate=triple.predicate,
old_value=old_value,
new_value=triple.object,
timestamp=triple.timestamp,
version=triple.version
))
return fact_changes
def generate_temporal_summary(
self,
entity: str,
start_time: str,
end_time: str
) -> HistoricalSummary:
"""Generate a historical summary of an entity's evolution.
Args:
entity: The entity to summarize
start_time: Start of the time range (ISO 8601)
end_time: End of the time range (ISO 8601)
Returns:
HistoricalSummary containing the entity's evolution
"""
# Get all facts for the entity in the time range
initial_state = self.store.query_at_time(start_time, subject=entity)
final_state = self.store.query_at_time(end_time, subject=entity)
changes = self.store.get_entity_changes(entity, start_time, end_time)
# Build evolution timeline
evolution_timeline = []
seen_predicates = set()
for triple in changes:
if triple.predicate not in seen_predicates:
seen_predicates.add(triple.predicate)
evolution_timeline.append(FactChange(
change_type=ChangeType.ADDED,
subject=triple.subject,
predicate=triple.predicate,
old_value=None,
new_value=triple.object,
timestamp=triple.timestamp,
version=triple.version
))
else:
# Find previous value
prev = [t for t in changes
if t.predicate == triple.predicate
and t.timestamp < triple.timestamp]
old_value = prev[-1].object if prev else None
evolution_timeline.append(FactChange(
change_type=ChangeType.MODIFIED,
subject=triple.subject,
predicate=triple.predicate,
old_value=old_value,
new_value=triple.object,
timestamp=triple.timestamp,
version=triple.version
))
# Extract key facts (predicates that changed most)
key_facts = []
predicate_changes = {}
for change in evolution_timeline:
predicate_changes[change.predicate] = (
predicate_changes.get(change.predicate, 0) + 1
)
top_predicates = sorted(
predicate_changes.items(),
key=lambda x: x[1],
reverse=True
)[:5]
for pred, count in top_predicates:
current = [t for t in final_state if t.predicate == pred]
if current:
key_facts.append({
"predicate": pred,
"current_value": current[0].object,
"changes": count
})
# Build current state
current_state = [
{
"predicate": t.predicate,
"object": t.object,
"valid_from": t.valid_from,
"valid_until": t.valid_until
}
for t in final_state
]
return HistoricalSummary(
entity=entity,
start_time=start_time,
end_time=end_time,
total_changes=len(evolution_timeline),
key_facts=key_facts,
evolution_timeline=evolution_timeline,
current_state=current_state
)
def infer_temporal_relationship(
self,
fact_a: TemporalTriple,
fact_b: TemporalTriple
) -> Optional[str]:
"""Infer temporal relationship between two facts.
Args:
fact_a: First fact
fact_b: Second fact
Returns:
Description of temporal relationship, or None
"""
a_start = datetime.fromisoformat(fact_a.valid_from)
a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None
b_start = datetime.fromisoformat(fact_b.valid_from)
b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None
# Check if A happened before B
if a_end and a_end <= b_start:
return "A happened before B"
# Check if B happened before A
if b_end and b_end <= a_start:
return "B happened before A"
# Check if they overlap
if a_end and b_end:
if a_start <= b_end and b_start <= a_end:
return "A and B overlap in time"
# Check if one supersedes the other
if fact_a.superseded_by == fact_b.id:
return "B supersedes A"
if fact_b.superseded_by == fact_a.id:
return "A supersedes B"
return "A and B are temporally unrelated"
def get_worldview_at_time(
self,
timestamp: str,
subjects: Optional[List[str]] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""Get Timmy's complete worldview at a specific point in time.
Args:
timestamp: The point in time (ISO 8601)
subjects: Optional list of subjects to include. If None, includes all.
Returns:
Dictionary mapping subjects to their facts at that time
"""
worldview = {}
if subjects:
for subject in subjects:
facts = self.store.query_at_time(timestamp, subject=subject)
if facts:
worldview[subject] = [
{
"predicate": f.predicate,
"object": f.object,
"version": f.version
}
for f in facts
]
else:
# Get all facts at that time
all_facts = self.store.query_at_time(timestamp)
for fact in all_facts:
if fact.subject not in worldview:
worldview[fact.subject] = []
worldview[fact.subject].append({
"predicate": fact.predicate,
"object": fact.object,
"version": fact.version
})
return worldview
def find_knowledge_gaps(
self,
subject: str,
expected_predicates: List[str]
) -> List[str]:
"""Find predicates that are missing or have expired for a subject.
Args:
subject: The entity to check
expected_predicates: List of predicates that should exist
Returns:
List of missing predicate names
"""
now = datetime.now().isoformat()
current_facts = self.store.query_at_time(now, subject=subject)
current_predicates = {f.predicate for f in current_facts}
return [
pred for pred in expected_predicates
if pred not in current_predicates
]
def export_reasoning_report(
self,
entity: str,
start_time: str,
end_time: str
) -> str:
"""Generate a human-readable reasoning report.
Args:
entity: The entity to report on
start_time: Start of the time range
end_time: End of the time range
Returns:
Formatted report string
"""
summary = self.generate_temporal_summary(entity, start_time, end_time)
report = f"""
# Temporal Reasoning Report: {entity}
## Time Range
- From: {start_time}
- To: {end_time}
## Summary
- Total Changes: {summary.total_changes}
- Key Facts Tracked: {len(summary.key_facts)}
## Key Facts
"""
for fact in summary.key_facts:
report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n"
report += "\n## Evolution Timeline\n"
for change in summary.evolution_timeline[:10]: # Show first 10
report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n"
if change.old_value:
report += f" - Changed from: {change.old_value}\n"
report += f" - Changed to: {change.new_value}\n"
if len(summary.evolution_timeline) > 10:
report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n"
report += "\n## Current State\n"
for state in summary.current_state:
report += f"- {state['predicate']}: {state['object']}\n"
return report