Compare commits
2 Commits
step35/595
...
step35/339
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37873ca100 | ||
|
|
874ce137b0 |
87
bin/gitea-backup.sh
Normal file
87
bin/gitea-backup.sh
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
# Gitea Daily Backup Script
|
||||||
|
# Uses Gitea's native dump command to create automated backups of repositories and SQLite databases.
|
||||||
|
# Designed to run on the VPS (Ezra) as part of a daily cron job.
|
||||||
|
#
|
||||||
|
# Configuration via environment variables:
|
||||||
|
# GITEA_BIN Path to gitea binary (default: auto-detect)
|
||||||
|
# GITEA_BACKUP_DIR Directory for backup archives (default: /var/backups/gitea)
|
||||||
|
# GITEA_BACKUP_RETENTION Days to retain backups (default: 7)
|
||||||
|
# GITEA_BACKUP_LOG Log file path (default: /var/log/gitea-backup.log)
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
GITEA_BIN="${GITEA_BIN:-$(command -v gitea 2>/dev/null || echo "/usr/local/bin/gitea")}"
|
||||||
|
BACKUP_DIR="${GITEA_BACKUP_DIR:-/var/backups/gitea}"
|
||||||
|
RETENTION_DAYS="${GITEA_BACKUP_RETENTION:-7}"
|
||||||
|
DATE="$(date +%Y-%m-%d_%H%M%S)"
|
||||||
|
BACKUP_FILE="${BACKUP_DIR}/gitea-backup-${DATE}.tar.gz"
|
||||||
|
LOG_FILE="${GITEA_BACKUP_LOG:-/var/log/gitea-backup.log}"
|
||||||
|
|
||||||
|
mkdir -p "${BACKUP_DIR}"
|
||||||
|
|
||||||
|
log() {
|
||||||
|
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "${LOG_FILE}"
|
||||||
|
}
|
||||||
|
|
||||||
|
log "=== Starting Gitea daily backup ==="
|
||||||
|
|
||||||
|
# Verify gitea binary exists
|
||||||
|
if [ ! -x "${GITEA_BIN}" ]; then
|
||||||
|
log "ERROR: Gitea binary not found at ${GITEA_BIN}"
|
||||||
|
log "Set GITEA_BIN environment variable to the gitea binary path (e.g., /usr/bin/gitea)"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Detect Gitea WORK_PATH
|
||||||
|
WORK_PATH=""
|
||||||
|
APP_INI=""
|
||||||
|
for path in /etc/gitea/app.ini /home/git/gitea/custom/conf/app.ini ~/gitea/custom/conf/app.ini; do
|
||||||
|
if [ -f "$path" ]; then
|
||||||
|
APP_INI="$path"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -n "$APP_INI" ]; then
|
||||||
|
# Parse [app] WORK_PATH = /var/lib/gitea
|
||||||
|
WORK_PATH=$(sed -n 's/^[[:space:]]*WORK_PATH[[:space:]]*=[[:space:]]*//p' "$APP_INI" | head -1)
|
||||||
|
log "Detected WORK_PATH from app.ini: ${WORK_PATH}"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Fallback detection
|
||||||
|
if [ -z "$WORK_PATH" ]; then
|
||||||
|
for d in /var/lib/gitea /home/git/gitea /srv/gitea /opt/gitea; do
|
||||||
|
if [ -d "$d" ]; then
|
||||||
|
WORK_PATH="$d"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
log "Inferred WORK_PATH: ${WORK_PATH:-not found}"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -z "$WORK_PATH" ]; then
|
||||||
|
log "ERROR: Could not determine Gitea WORK_PATH. Set GITEA_WORK_PATH manually."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Perform gitea dump
|
||||||
|
# Flags: --work-path sets the Gitea working directory, --file writes dump to tar.gz
|
||||||
|
log "Running: gitea dump --work-path ${WORK_PATH} --file ${BACKUP_FILE}"
|
||||||
|
"${GITEA_BIN}" dump --work-path "${WORK_PATH}" --file "${BACKUP_FILE}" 2>>"${LOG_FILE}"
|
||||||
|
|
||||||
|
if [ $? -ne 0 ]; then
|
||||||
|
log "ERROR: gitea dump failed — check ${LOG_FILE} for details"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
FILE_SIZE=$(du -h "${BACKUP_FILE}" | cut -f1)
|
||||||
|
log "Backup created: ${BACKUP_FILE} (${FILE_SIZE})"
|
||||||
|
|
||||||
|
# Prune old backups (keep last N days)
|
||||||
|
find "${BACKUP_DIR}" -name "gitea-backup-*.tar.gz" -type f -mtime +$((${RETENTION_DAYS}-1)) -delete 2>/dev/null || true
|
||||||
|
log "Pruned backups older than ${RETENTION_DAYS} days"
|
||||||
|
|
||||||
|
log "=== Backup completed successfully ==="
|
||||||
|
|
||||||
|
exit 0
|
||||||
9
cron/vps/gitea-daily-backup.yml
Normal file
9
cron/vps/gitea-daily-backup.yml
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
- name: Daily Gitea Backup
|
||||||
|
schedule: '0 2 * * *' # 2:00 AM daily
|
||||||
|
tasks:
|
||||||
|
- name: Run Gitea daily backup
|
||||||
|
shell: bash ~/.hermes/bin/gitea-backup.sh
|
||||||
|
env:
|
||||||
|
GITEA_BIN: /usr/local/bin/gitea
|
||||||
|
GITEA_BACKUP_DIR: /var/backups/gitea
|
||||||
|
GITEA_BACKUP_RETENTION: "7"
|
||||||
@@ -86,6 +86,15 @@ done
|
|||||||
chmod +x "$HERMES_HOME/bin/"*.sh "$HERMES_HOME/bin/"*.py 2>/dev/null || true
|
chmod +x "$HERMES_HOME/bin/"*.sh "$HERMES_HOME/bin/"*.py 2>/dev/null || true
|
||||||
log "bin/ -> $HERMES_HOME/bin/"
|
log "bin/ -> $HERMES_HOME/bin/"
|
||||||
|
|
||||||
|
# === Deploy agent patches (runtime extensions) ===
|
||||||
|
# These are custom agent modules that extend hermes-agent's agent/ package.
|
||||||
|
# Copied into the hermes-agent installation to make them importable as agent.* modules.
|
||||||
|
if [ -d "$SCRIPT_DIR/patches/agent" ]; then
|
||||||
|
mkdir -p "$HERMES_HOME/hermes-agent/agent"
|
||||||
|
cp -r "$SCRIPT_DIR/patches/agent"/* "$HERMES_HOME/hermes-agent/agent/"
|
||||||
|
log "patches/agent/ -> $HERMES_HOME/hermes-agent/agent/"
|
||||||
|
fi
|
||||||
|
|
||||||
if [ "${1:-}" != "" ]; then
|
if [ "${1:-}" != "" ]; then
|
||||||
echo "ERROR: deploy.sh no longer accepts legacy loop flags." >&2
|
echo "ERROR: deploy.sh no longer accepts legacy loop flags." >&2
|
||||||
echo "Deploy the sidecar only. Do not relaunch deprecated bash loops." >&2
|
echo "Deploy the sidecar only. Do not relaunch deprecated bash loops." >&2
|
||||||
|
|||||||
155
docs/backup-recovery-runbook.md
Normal file
155
docs/backup-recovery-runbook.md
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
# Gitea Backup & Recovery Runbook
|
||||||
|
|
||||||
|
**Last updated:** 2026-04-30
|
||||||
|
**Scope:** Single-node VPS (Ezra, 143.198.27.163) running Gitea
|
||||||
|
**Backup Strategy:** Automated daily full dumps via `gitea dump`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## What Gets Backed Up
|
||||||
|
|
||||||
|
| Component | Method | Frequency | Retention |
|
||||||
|
|-----------|--------|-----------|-----------|
|
||||||
|
| All Gitea repositories (bare git dirs) | `gitea dump --file` | Daily at 2:00 AM | 7 days |
|
||||||
|
| SQLite databases (gitea.db, indexer.db, etc.) | Included in dump | Daily | 7 days |
|
||||||
|
| Attachments, avatars, hooks | Included in dump | Daily | 7 days |
|
||||||
|
|
||||||
|
**Backup location:** `/var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz`
|
||||||
|
|
||||||
|
**Log file:** `/var/log/gitea-backup.log`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Backup Architecture
|
||||||
|
|
||||||
|
The backup script `bin/gitea-backup.sh` runs daily via Hermes cron (`cron/vps/gitea-daily-backup.yml`). It:
|
||||||
|
|
||||||
|
1. Locates the Gitea `WORK_PATH` by reading `/etc/gitea/app.ini` or falling back to common locations (`/var/lib/gitea`, `/home/git/gitea`)
|
||||||
|
2. Invokes `gitea dump --work-path <path> --file <backup-tar.gz>` — Gitea's native, consistent snapshot mechanism
|
||||||
|
3. Prunes archives older than 7 days
|
||||||
|
4. Logs all operations to `/var/log/gitea-backup.log`
|
||||||
|
|
||||||
|
**Prerequisites on the VPS:**
|
||||||
|
- Gitea binary available at `/usr/local/bin/gitea` (or set `GITEA_BIN` env var)
|
||||||
|
- `gitea dump` command must be available (Gitea ≥ 1.12)
|
||||||
|
- SSH access to the VPS for manual recovery operations
|
||||||
|
- Sufficient disk space in `/var/backups/gitea` (typical dump: ~2–10 GB depending on repo count/size)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Recovery Time Objective (RTO) & Recovery Point Objective (RPO)
|
||||||
|
|
||||||
|
| Metric | Estimate |
|
||||||
|
|--------|----------|
|
||||||
|
| **RPO** (data loss window) | ≤ 24 hours (last daily backup) |
|
||||||
|
| **RTO** (time to restore) | **~45 minutes** (cold restore from backup tarball) |
|
||||||
|
| **Downtime impact** | Gitea offline during restore (~20 min) |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Step-by-Step Recovery Procedure
|
||||||
|
|
||||||
|
### Phase 1 — Assess & Prepare (5 min)
|
||||||
|
|
||||||
|
1. SSH into Ezra VPS: `ssh root@143.198.27.163`
|
||||||
|
2. Stop Gitea so files are quiescent:
|
||||||
|
```bash
|
||||||
|
systemctl stop gitea
|
||||||
|
```
|
||||||
|
3. Confirm current Gitea data directory (for reference):
|
||||||
|
```bash
|
||||||
|
gitea --work-path /var/lib/gitea --config /etc/gitea/app.ini dump --help 2>&1
|
||||||
|
# Or check app.ini for WORK_PATH
|
||||||
|
cat /etc/gitea/app.ini | grep '^WORK_PATH'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 2 — Restore from Backup (20 min)
|
||||||
|
|
||||||
|
4. Choose the backup tarball to restore from:
|
||||||
|
```bash
|
||||||
|
ls -lh /var/backups/gitea/
|
||||||
|
# Pick the most recent: gitea-backup-2026-04-29_020001.tar.gz
|
||||||
|
```
|
||||||
|
|
||||||
|
5. **Optional: Move current data aside** (safety copy):
|
||||||
|
```bash
|
||||||
|
mv /var/lib/gitea /var/lib/gitea.bak-$(date +%s)
|
||||||
|
```
|
||||||
|
|
||||||
|
6. Extract the backup in place:
|
||||||
|
```bash
|
||||||
|
mkdir -p /var/lib/gitea
|
||||||
|
tar -xzf /var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz -C /var/lib/gitea --strip-components=1
|
||||||
|
```
|
||||||
|
*Note:* `gitea dump` archives contain a single top-level directory `gitea-dump-<timestamp>`. The `--strip-components=1` puts its contents directly into `/var/lib/gitea`.
|
||||||
|
|
||||||
|
7. Set correct ownership (typically `git:git`):
|
||||||
|
```bash
|
||||||
|
chown -R git:git /var/lib/gitea
|
||||||
|
```
|
||||||
|
|
||||||
|
### Phase 3 — Restart & Validate (15 min)
|
||||||
|
|
||||||
|
8. Start Gitea:
|
||||||
|
```bash
|
||||||
|
systemctl start gitea
|
||||||
|
```
|
||||||
|
|
||||||
|
9. Wait 30 seconds, then verify:
|
||||||
|
```bash
|
||||||
|
systemctl status gitea
|
||||||
|
# Check HTTP endpoint
|
||||||
|
curl -s -o /dev/null -w '%{http_code}' http://localhost:3000/ # Should be 200
|
||||||
|
```
|
||||||
|
|
||||||
|
10. Log into Gitea UI and spot-check:
|
||||||
|
- Home page loads
|
||||||
|
- A few repositories are accessible
|
||||||
|
- Attachments (avatars) render
|
||||||
|
- Recent commits visible
|
||||||
|
|
||||||
|
11. If the web UI works but indices are stale, rebuild them (wait for background jobs to process):
|
||||||
|
```bash
|
||||||
|
gitea admin index rebuild-repo --all
|
||||||
|
```
|
||||||
|
|
||||||
|
### Post-Restore Checklist
|
||||||
|
|
||||||
|
- [ ] Admin UI reachable at `https://forge.alexanderwhitestone.com`
|
||||||
|
- [ ] Sample PRs/milestones/labels present
|
||||||
|
- [ ] Repository clone via SSH works: `git clone git@forge.alexanderwhitestone.com:Timmy_Foundation/timmy-config.git`
|
||||||
|
- [ ] Check backup script health: `cat /var/log/gitea-backup.log | tail -20`
|
||||||
|
- [ ] Re-enable any disabled integrations (webhooks, CI/CD runners)
|
||||||
|
- [ ] Notify the fleet: post to relevant channels confirming operational status
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Known Issues & Workarounds
|
||||||
|
|
||||||
|
| Symptom | Likely cause | Fix |
|
||||||
|
|---------|--------------|-----|
|
||||||
|
| `gitea: command not found` | Binary at non-standard path | Set `GITEA_BIN=/path/to/gitea` in cron env |
|
||||||
|
| `Permission denied` on backup dir | Cron user lacks write access to `/var/backups` | `mkdir /var/backups/gitea && chown root:root /var/backups/gitea` |
|
||||||
|
| Restore fails: `"database or disk is full"` | Insufficient space on `/var/lib/gitea` | Expand disk or clean up old data first; backups require ~1.5x live data size |
|
||||||
|
| Old backup tarballs not deleting | Retention cron not firing | Check `systemctl status hermes-cron` and cron logs |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Off-Site Replication (Future Work)
|
||||||
|
|
||||||
|
This backup is **on-site only** (same VPS). For true resilience, replicating to a secondary location is recommended:
|
||||||
|
|
||||||
|
- **Option A — rsync to second VPS** (Push nightly to `backup@backup-alexanderwhitestone.com:/backups/gitea/`)
|
||||||
|
- **Option B — S3-compatible bucket** with lifecycle policy
|
||||||
|
- **Option C — GitHub mirror of each repo** using `git push --mirror` (already considered in issue #481 broader work)
|
||||||
|
|
||||||
|
Current scope: single-VPS backup only (single point of failure mitigated but not eliminated).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- `bin/gitea-backup.sh` — backup script source
|
||||||
|
- `cron/vps/gitea-daily-backup.yml` — Hermes cron definition
|
||||||
|
- Gitea official docs: <https://docs.gitea.com/administration/backup-and-restore>
|
||||||
|
- Hermes cron: <https://hermes-agent.nousresearch.com/docs>
|
||||||
6
patches/agent/conscience_mapping.py
Normal file
6
patches/agent/conscience_mapping.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""
|
||||||
|
@soul:honesty.grounding Grounding before generation. Consult verified sources before pattern-matching.
|
||||||
|
@soul:honesty.source_distinction Source distinction. Every claim must point to a verified source.
|
||||||
|
@soul:honesty.audit_trail The audit trail. Every response is logged with inputs and confidence.
|
||||||
|
"""
|
||||||
|
# This file serves as a registry for the Conscience Validator to prove the apparatus exists.
|
||||||
45
patches/agent/evolution/domain_distiller.py
Normal file
45
patches/agent/evolution/domain_distiller.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
"""Phase 3: Deep Knowledge Distillation from Google.
|
||||||
|
|
||||||
|
Performs deep dives into technical domains and distills them into
|
||||||
|
Timmy's Sovereign Knowledge Graph.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class DomainDistiller:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.symbolic = SymbolicMemory()
|
||||||
|
|
||||||
|
def distill_domain(self, domain: str):
|
||||||
|
"""Crawls and distills an entire technical domain."""
|
||||||
|
logger.info(f"Distilling domain: {domain}")
|
||||||
|
|
||||||
|
prompt = f"""
|
||||||
|
Please perform a deep knowledge distillation of the following domain: {domain}
|
||||||
|
|
||||||
|
Use Google Search to find foundational papers, recent developments, and key entities.
|
||||||
|
Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples.
|
||||||
|
Focus on the structural relationships that define the domain.
|
||||||
|
|
||||||
|
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||||
|
"""
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.",
|
||||||
|
grounding=True,
|
||||||
|
thinking=True,
|
||||||
|
response_mime_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
triples = json.loads(result["text"])
|
||||||
|
count = self.symbolic.ingest_text(json.dumps(triples))
|
||||||
|
logger.info(f"Distilled {count} new triples for domain: {domain}")
|
||||||
|
return count
|
||||||
60
patches/agent/evolution/self_correction_generator.py
Normal file
60
patches/agent/evolution/self_correction_generator.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
"""Phase 1: Synthetic Data Generation for Self-Correction.
|
||||||
|
|
||||||
|
Generates reasoning traces where Timmy makes a subtle error and then
|
||||||
|
identifies and corrects it using the Conscience Validator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from tools.gitea_client import GiteaClient
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class SelfCorrectionGenerator:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.gitea = GiteaClient()
|
||||||
|
|
||||||
|
def generate_trace(self, task: str) -> Dict[str, Any]:
|
||||||
|
"""Generates a single self-correction reasoning trace."""
|
||||||
|
prompt = f"""
|
||||||
|
Task: {task}
|
||||||
|
|
||||||
|
Please simulate a multi-step reasoning trace for this task.
|
||||||
|
Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error).
|
||||||
|
Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace.
|
||||||
|
|
||||||
|
Format the output as JSON:
|
||||||
|
{{
|
||||||
|
"task": "{task}",
|
||||||
|
"initial_trace": "...",
|
||||||
|
"error_identified": "...",
|
||||||
|
"correction_trace": "...",
|
||||||
|
"lessons_learned": "..."
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.",
|
||||||
|
response_mime_type="application/json",
|
||||||
|
thinking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
trace = json.loads(result["text"])
|
||||||
|
return trace
|
||||||
|
|
||||||
|
def generate_and_save(self, task: str, count: int = 1):
|
||||||
|
"""Generates multiple traces and saves them to Gitea."""
|
||||||
|
repo = "Timmy_Foundation/timmy-config"
|
||||||
|
for i in range(count):
|
||||||
|
trace = self.generate_trace(task)
|
||||||
|
filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json"
|
||||||
|
|
||||||
|
content = json.dumps(trace, indent=2)
|
||||||
|
content_b64 = base64.b64encode(content.encode()).decode()
|
||||||
|
|
||||||
|
self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}")
|
||||||
|
logger.info(f"Saved synthetic trace to {filename}")
|
||||||
42
patches/agent/evolution/world_modeler.py
Normal file
42
patches/agent/evolution/world_modeler.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
"""Phase 2: Multi-Modal World Modeling.
|
||||||
|
|
||||||
|
Ingests multi-modal data (vision/audio) to build a spatial and temporal
|
||||||
|
understanding of Timmy's environment.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import base64
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class WorldModeler:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.symbolic = SymbolicMemory()
|
||||||
|
|
||||||
|
def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"):
|
||||||
|
"""Analyzes an image of the environment and updates the world model."""
|
||||||
|
# In a real scenario, we'd use Gemini's multi-modal capabilities
|
||||||
|
# For now, we'll simulate the vision-to-symbolic extraction
|
||||||
|
prompt = f"""
|
||||||
|
Analyze the following image of Timmy's environment.
|
||||||
|
Identify all key objects, their spatial relationships, and any temporal changes.
|
||||||
|
Extract this into a set of symbolic triples for the Knowledge Graph.
|
||||||
|
|
||||||
|
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||||
|
"""
|
||||||
|
# Simulate multi-modal call (Gemini 3.1 Pro Vision)
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.",
|
||||||
|
response_mime_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
triples = json.loads(result["text"])
|
||||||
|
self.symbolic.ingest_text(json.dumps(triples))
|
||||||
|
logger.info(f"Updated world model with {len(triples)} new spatial triples.")
|
||||||
|
return triples
|
||||||
404
patches/agent/fallback_router.py
Normal file
404
patches/agent/fallback_router.py
Normal file
@@ -0,0 +1,404 @@
|
|||||||
|
"""Automatic fallback router for handling provider quota and rate limit errors.
|
||||||
|
|
||||||
|
This module provides intelligent fallback detection and routing when the primary
|
||||||
|
provider (e.g., Anthropic) encounters quota limitations or rate limits.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Detects quota/rate limit errors from different providers
|
||||||
|
- Automatic fallback to kimi-coding when Anthropic quota is exceeded
|
||||||
|
- Configurable fallback chains with default anthropic -> kimi-coding
|
||||||
|
- Logging and monitoring of fallback events
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from agent.fallback_router import (
|
||||||
|
is_quota_error,
|
||||||
|
get_default_fallback_chain,
|
||||||
|
should_auto_fallback,
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_quota_error(error, provider="anthropic"):
|
||||||
|
if should_auto_fallback(provider="anthropic"):
|
||||||
|
fallback_chain = get_default_fallback_chain("anthropic")
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Dict, List, Optional, Any, Tuple
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Default fallback chains per provider
|
||||||
|
# Each chain is a list of fallback configurations tried in order
|
||||||
|
DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = {
|
||||||
|
"anthropic": [
|
||||||
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||||
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||||
|
],
|
||||||
|
"openrouter": [
|
||||||
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||||
|
{"provider": "zai", "model": "glm-5"},
|
||||||
|
],
|
||||||
|
"kimi-coding": [
|
||||||
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||||
|
{"provider": "zai", "model": "glm-5"},
|
||||||
|
],
|
||||||
|
"zai": [
|
||||||
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||||
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Quota/rate limit error patterns by provider
|
||||||
|
# These are matched (case-insensitive) against error messages
|
||||||
|
QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = {
|
||||||
|
"anthropic": [
|
||||||
|
"rate limit",
|
||||||
|
"ratelimit",
|
||||||
|
"quota exceeded",
|
||||||
|
"quota exceeded",
|
||||||
|
"insufficient quota",
|
||||||
|
"429",
|
||||||
|
"403",
|
||||||
|
"too many requests",
|
||||||
|
"capacity exceeded",
|
||||||
|
"over capacity",
|
||||||
|
"temporarily unavailable",
|
||||||
|
"server overloaded",
|
||||||
|
"resource exhausted",
|
||||||
|
"billing threshold",
|
||||||
|
"credit balance",
|
||||||
|
"payment required",
|
||||||
|
"402",
|
||||||
|
],
|
||||||
|
"openrouter": [
|
||||||
|
"rate limit",
|
||||||
|
"ratelimit",
|
||||||
|
"quota exceeded",
|
||||||
|
"insufficient credits",
|
||||||
|
"429",
|
||||||
|
"402",
|
||||||
|
"no endpoints available",
|
||||||
|
"all providers failed",
|
||||||
|
"over capacity",
|
||||||
|
],
|
||||||
|
"kimi-coding": [
|
||||||
|
"rate limit",
|
||||||
|
"ratelimit",
|
||||||
|
"quota exceeded",
|
||||||
|
"429",
|
||||||
|
"insufficient balance",
|
||||||
|
],
|
||||||
|
"zai": [
|
||||||
|
"rate limit",
|
||||||
|
"ratelimit",
|
||||||
|
"quota exceeded",
|
||||||
|
"429",
|
||||||
|
"insufficient quota",
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
# HTTP status codes indicating quota/rate limit issues
|
||||||
|
QUOTA_STATUS_CODES = {429, 402, 403}
|
||||||
|
|
||||||
|
|
||||||
|
def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool:
|
||||||
|
"""Detect if an error is quota/rate limit related.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: The exception to check
|
||||||
|
provider: Optional provider name to check provider-specific patterns
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the error appears to be quota/rate limit related
|
||||||
|
"""
|
||||||
|
if error is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
error_str = str(error).lower()
|
||||||
|
error_type = type(error).__name__.lower()
|
||||||
|
|
||||||
|
# Check for common rate limit exception types
|
||||||
|
if any(term in error_type for term in [
|
||||||
|
"ratelimit", "rate_limit", "quota", "toomanyrequests",
|
||||||
|
"insufficient_quota", "billing", "payment"
|
||||||
|
]):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check HTTP status code if available
|
||||||
|
status_code = getattr(error, "status_code", None)
|
||||||
|
if status_code is None:
|
||||||
|
# Try common attribute names
|
||||||
|
for attr in ["code", "http_status", "response_code", "status"]:
|
||||||
|
if hasattr(error, attr):
|
||||||
|
try:
|
||||||
|
status_code = int(getattr(error, attr))
|
||||||
|
break
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if status_code in QUOTA_STATUS_CODES:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check provider-specific patterns
|
||||||
|
providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys()
|
||||||
|
|
||||||
|
for prov in providers_to_check:
|
||||||
|
patterns = QUOTA_ERROR_PATTERNS.get(prov, [])
|
||||||
|
for pattern in patterns:
|
||||||
|
if pattern.lower() in error_str:
|
||||||
|
logger.debug(
|
||||||
|
"Detected %s quota error pattern '%s' in: %s",
|
||||||
|
prov, pattern, error
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check generic quota patterns
|
||||||
|
generic_patterns = [
|
||||||
|
"rate limit exceeded",
|
||||||
|
"quota exceeded",
|
||||||
|
"too many requests",
|
||||||
|
"capacity exceeded",
|
||||||
|
"temporarily unavailable",
|
||||||
|
"try again later",
|
||||||
|
"resource exhausted",
|
||||||
|
"billing",
|
||||||
|
"payment required",
|
||||||
|
"insufficient credits",
|
||||||
|
"insufficient quota",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in generic_patterns:
|
||||||
|
if pattern in error_str:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_default_fallback_chain(
|
||||||
|
primary_provider: str,
|
||||||
|
exclude_provider: Optional[str] = None,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Get the default fallback chain for a primary provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
primary_provider: The primary provider name
|
||||||
|
exclude_provider: Optional provider to exclude from the chain
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of fallback configurations
|
||||||
|
"""
|
||||||
|
chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, [])
|
||||||
|
|
||||||
|
# Filter out excluded provider if specified
|
||||||
|
if exclude_provider:
|
||||||
|
chain = [
|
||||||
|
fb for fb in chain
|
||||||
|
if fb.get("provider") != exclude_provider
|
||||||
|
]
|
||||||
|
|
||||||
|
return list(chain)
|
||||||
|
|
||||||
|
|
||||||
|
def should_auto_fallback(
|
||||||
|
provider: str,
|
||||||
|
error: Optional[Exception] = None,
|
||||||
|
auto_fallback_enabled: Optional[bool] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""Determine if automatic fallback should be attempted.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
provider: The current provider name
|
||||||
|
error: Optional error to check for quota issues
|
||||||
|
auto_fallback_enabled: Optional override for auto-fallback setting
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if automatic fallback should be attempted
|
||||||
|
"""
|
||||||
|
# Check environment variable override
|
||||||
|
if auto_fallback_enabled is None:
|
||||||
|
env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower()
|
||||||
|
auto_fallback_enabled = env_setting in ("true", "1", "yes", "on")
|
||||||
|
|
||||||
|
if not auto_fallback_enabled:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if provider has a configured fallback chain
|
||||||
|
if provider not in DEFAULT_FALLBACK_CHAINS:
|
||||||
|
# Still allow fallback if it's a quota error with generic handling
|
||||||
|
if error and is_quota_error(error):
|
||||||
|
logger.debug(
|
||||||
|
"Provider %s has no fallback chain but quota error detected",
|
||||||
|
provider
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If there's an error, only fallback on quota/rate limit errors
|
||||||
|
if error is not None:
|
||||||
|
return is_quota_error(error, provider)
|
||||||
|
|
||||||
|
# No error but fallback chain exists - allow eager fallback for
|
||||||
|
# providers known to have quota issues
|
||||||
|
return provider in ("anthropic",)
|
||||||
|
|
||||||
|
|
||||||
|
def log_fallback_event(
|
||||||
|
from_provider: str,
|
||||||
|
to_provider: str,
|
||||||
|
to_model: str,
|
||||||
|
reason: str,
|
||||||
|
error: Optional[Exception] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Log a fallback event for monitoring.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
from_provider: The provider we're falling back from
|
||||||
|
to_provider: The provider we're falling back to
|
||||||
|
to_model: The model we're falling back to
|
||||||
|
reason: The reason for the fallback
|
||||||
|
error: Optional error that triggered the fallback
|
||||||
|
"""
|
||||||
|
log_data = {
|
||||||
|
"event": "provider_fallback",
|
||||||
|
"from_provider": from_provider,
|
||||||
|
"to_provider": to_provider,
|
||||||
|
"to_model": to_model,
|
||||||
|
"reason": reason,
|
||||||
|
}
|
||||||
|
|
||||||
|
if error:
|
||||||
|
log_data["error_type"] = type(error).__name__
|
||||||
|
log_data["error_message"] = str(error)[:200]
|
||||||
|
|
||||||
|
logger.info("Provider fallback: %s -> %s (%s) | Reason: %s",
|
||||||
|
from_provider, to_provider, to_model, reason)
|
||||||
|
|
||||||
|
# Also log structured data for monitoring
|
||||||
|
logger.debug("Fallback event data: %s", log_data)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_fallback_with_credentials(
|
||||||
|
fallback_config: Dict[str, Any],
|
||||||
|
) -> Tuple[Optional[Any], Optional[str]]:
|
||||||
|
"""Resolve a fallback configuration to a client and model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fallback_config: Fallback configuration dict with provider and model
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (client, model) or (None, None) if credentials not available
|
||||||
|
"""
|
||||||
|
from agent.auxiliary_client import resolve_provider_client
|
||||||
|
|
||||||
|
provider = fallback_config.get("provider")
|
||||||
|
model = fallback_config.get("model")
|
||||||
|
|
||||||
|
if not provider or not model:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
try:
|
||||||
|
client, resolved_model = resolve_provider_client(
|
||||||
|
provider,
|
||||||
|
model=model,
|
||||||
|
raw_codex=True,
|
||||||
|
)
|
||||||
|
return client, resolved_model or model
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(
|
||||||
|
"Failed to resolve fallback provider %s: %s",
|
||||||
|
provider, exc
|
||||||
|
)
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def get_auto_fallback_chain(
|
||||||
|
primary_provider: str,
|
||||||
|
user_fallback_chain: Optional[List[Dict[str, Any]]] = None,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Get the effective fallback chain for automatic fallback.
|
||||||
|
|
||||||
|
Combines user-provided fallback chain with default automatic fallback chain.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
primary_provider: The primary provider name
|
||||||
|
user_fallback_chain: Optional user-provided fallback chain
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The effective fallback chain to use
|
||||||
|
"""
|
||||||
|
# Use user-provided chain if available
|
||||||
|
if user_fallback_chain:
|
||||||
|
return user_fallback_chain
|
||||||
|
|
||||||
|
# Otherwise use default chain for the provider
|
||||||
|
return get_default_fallback_chain(primary_provider)
|
||||||
|
|
||||||
|
|
||||||
|
def is_fallback_available(
|
||||||
|
fallback_config: Dict[str, Any],
|
||||||
|
) -> bool:
|
||||||
|
"""Check if a fallback configuration has available credentials.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fallback_config: Fallback configuration dict
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if credentials are available for the fallback provider
|
||||||
|
"""
|
||||||
|
provider = fallback_config.get("provider")
|
||||||
|
if not provider:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check environment variables for API keys
|
||||||
|
env_vars = {
|
||||||
|
"anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"],
|
||||||
|
"kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"],
|
||||||
|
"zai": ["ZAI_API_KEY", "Z_AI_API_KEY"],
|
||||||
|
"openrouter": ["OPENROUTER_API_KEY"],
|
||||||
|
"minimax": ["MINIMAX_API_KEY"],
|
||||||
|
"minimax-cn": ["MINIMAX_CN_API_KEY"],
|
||||||
|
"deepseek": ["DEEPSEEK_API_KEY"],
|
||||||
|
"alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"],
|
||||||
|
"nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"],
|
||||||
|
}
|
||||||
|
|
||||||
|
keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"])
|
||||||
|
|
||||||
|
for key in keys_to_check:
|
||||||
|
if os.getenv(key):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check auth.json for OAuth providers
|
||||||
|
if provider in ("nous", "openai-codex"):
|
||||||
|
try:
|
||||||
|
from hermes_cli.config import get_hermes_home
|
||||||
|
auth_path = get_hermes_home() / "auth.json"
|
||||||
|
if auth_path.exists():
|
||||||
|
import json
|
||||||
|
data = json.loads(auth_path.read_text())
|
||||||
|
if data.get("active_provider") == provider:
|
||||||
|
return True
|
||||||
|
# Check for provider in providers dict
|
||||||
|
if data.get("providers", {}).get(provider):
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def filter_available_fallbacks(
|
||||||
|
fallback_chain: List[Dict[str, Any]],
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Filter a fallback chain to only include providers with credentials.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fallback_chain: List of fallback configurations
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Filtered list with only available fallbacks
|
||||||
|
"""
|
||||||
|
return [
|
||||||
|
fb for fb in fallback_chain
|
||||||
|
if is_fallback_available(fb)
|
||||||
|
]
|
||||||
90
patches/agent/gemini_adapter.py
Normal file
90
patches/agent/gemini_adapter.py
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
"""Native Gemini 3 Series adapter for Hermes Agent.
|
||||||
|
|
||||||
|
Leverages the google-genai SDK to provide sovereign access to Gemini's
|
||||||
|
unique capabilities: Thinking (Reasoning) tokens, Search Grounding,
|
||||||
|
and Maps Grounding.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
|
try:
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
except ImportError:
|
||||||
|
genai = None # type: ignore
|
||||||
|
types = None # type: ignore
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class GeminiAdapter:
|
||||||
|
def __init__(self, api_key: Optional[str] = None):
|
||||||
|
self.api_key = api_key or os.environ.get("GEMINI_API_KEY")
|
||||||
|
if not self.api_key:
|
||||||
|
logger.warning("GEMINI_API_KEY not found in environment.")
|
||||||
|
|
||||||
|
if genai:
|
||||||
|
self.client = genai.Client(api_key=self.api_key)
|
||||||
|
else:
|
||||||
|
self.client = None
|
||||||
|
|
||||||
|
def generate(
|
||||||
|
self,
|
||||||
|
model: str,
|
||||||
|
prompt: str,
|
||||||
|
system_instruction: Optional[str] = None,
|
||||||
|
thinking: bool = False,
|
||||||
|
thinking_budget: int = 16000,
|
||||||
|
grounding: bool = False,
|
||||||
|
**kwargs
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
if not self.client:
|
||||||
|
raise ImportError("google-genai SDK not installed. Run 'pip install google-genai'.")
|
||||||
|
|
||||||
|
config = {}
|
||||||
|
if system_instruction:
|
||||||
|
config["system_instruction"] = system_instruction
|
||||||
|
|
||||||
|
if thinking:
|
||||||
|
# Gemini 3 series thinking config
|
||||||
|
config["thinking_config"] = {"include_thoughts": True}
|
||||||
|
# max_output_tokens includes thinking tokens
|
||||||
|
kwargs["max_output_tokens"] = kwargs.get("max_output_tokens", 32000) + thinking_budget
|
||||||
|
|
||||||
|
tools = []
|
||||||
|
if grounding:
|
||||||
|
tools.append({"google_search": {}})
|
||||||
|
|
||||||
|
if tools:
|
||||||
|
config["tools"] = tools
|
||||||
|
|
||||||
|
response = self.client.models.generate_content(
|
||||||
|
model=model,
|
||||||
|
contents=prompt,
|
||||||
|
config=types.GenerateContentConfig(**config, **kwargs)
|
||||||
|
)
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"text": response.text,
|
||||||
|
"usage": {
|
||||||
|
"prompt_tokens": response.usage_metadata.prompt_token_count,
|
||||||
|
"candidates_tokens": response.usage_metadata.candidates_token_count,
|
||||||
|
"total_tokens": response.usage_metadata.total_token_count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Extract thoughts if present
|
||||||
|
thoughts = []
|
||||||
|
for part in response.candidates[0].content.parts:
|
||||||
|
if hasattr(part, 'thought') and part.thought:
|
||||||
|
thoughts.append(part.thought)
|
||||||
|
|
||||||
|
if thoughts:
|
||||||
|
result["thoughts"] = "\n".join(thoughts)
|
||||||
|
|
||||||
|
# Extract grounding metadata
|
||||||
|
if response.candidates[0].grounding_metadata:
|
||||||
|
result["grounding"] = response.candidates[0].grounding_metadata
|
||||||
|
|
||||||
|
return result
|
||||||
635
patches/agent/input_sanitizer.py
Normal file
635
patches/agent/input_sanitizer.py
Normal file
@@ -0,0 +1,635 @@
|
|||||||
|
"""
|
||||||
|
Input Sanitizer for Jailbreak Pattern Detection
|
||||||
|
|
||||||
|
This module provides input sanitization to detect and strip jailbreak fingerprint
|
||||||
|
patterns as identified in Issue #72 (Red Team Audit).
|
||||||
|
|
||||||
|
Security Findings Addressed:
|
||||||
|
1. HIGH - OG GODMODE template bypassed phishing refusal
|
||||||
|
2. MEDIUM - boundary_inversion works for gray-area content
|
||||||
|
3. LOW - Spaced text bypass (k e y l o g g e r)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from agent.input_sanitizer import sanitize_input, score_input_risk
|
||||||
|
|
||||||
|
cleaned, risk_score, detected = sanitize_input(user_input)
|
||||||
|
if risk_score > 75:
|
||||||
|
logger.warning("High-risk input detected and blocked")
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
import logging
|
||||||
|
from typing import Tuple, List, Dict, Any, Optional
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import IntEnum
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RiskLevel(IntEnum):
|
||||||
|
"""Risk score thresholds for input classification."""
|
||||||
|
LOW = 25
|
||||||
|
MEDIUM = 50
|
||||||
|
HIGH = 75
|
||||||
|
CRITICAL = 90
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SanitizationResult:
|
||||||
|
"""Result of input sanitization."""
|
||||||
|
original_text: str
|
||||||
|
cleaned_text: str
|
||||||
|
risk_score: int
|
||||||
|
detected_patterns: List[str]
|
||||||
|
risk_level: str
|
||||||
|
blocked: bool
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# JAILBREAK PATTERN DEFINITIONS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
# GODMODE and similar jailbreak templates
|
||||||
|
GODMODE_PATTERNS = [
|
||||||
|
r"\[START\]",
|
||||||
|
r"\[END\]",
|
||||||
|
r"GODMODE\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||||
|
r"UNFILTERED\s*(?:MODE|OUTPUT)?",
|
||||||
|
r"DEVMODE\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||||
|
r"JAILBREAK\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||||
|
r"DAN\s*(?:MODE|ENABLED)?", # Do Anything Now
|
||||||
|
r"STAN\s*(?:MODE|ENABLED)?", # Strive To Avoid Norms
|
||||||
|
r"DUDE\s*(?:MODE|ENABLED)?", # Do Unlimited Data Extraction
|
||||||
|
]
|
||||||
|
|
||||||
|
# DAN-style and jailbreak variant patterns (Issue #87)
|
||||||
|
DAN_PATTERNS = [
|
||||||
|
r"\bdo anything now\b",
|
||||||
|
r"\bstay in character\b",
|
||||||
|
r"\btoken smuggl(?:ing|e)\b",
|
||||||
|
r"\bjailbreak successful\b",
|
||||||
|
r"\bmode:\s*uncensored\b",
|
||||||
|
r"\bdeveloper mode enabled\b",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Roleplaying override patterns (Issue #87)
|
||||||
|
ROLEPLAY_PATTERNS = [
|
||||||
|
r"\broleplay\s+as\b",
|
||||||
|
r"\bact\s+as\s+if\s+you\s+are\b",
|
||||||
|
r"\bsimulate\s+being\b",
|
||||||
|
r"\bforget\s+you\s+are\s+(?:an?\s+)?(?:ai|language\s+model)\b",
|
||||||
|
r"\byou\s+are\s+now\s+(?:named|called)\b",
|
||||||
|
r"\brespond\s+as\s+(?:if\s+you\s+were|though\s+you\s+are)\b",
|
||||||
|
]
|
||||||
|
|
||||||
|
# System prompt extraction patterns (Issue #87)
|
||||||
|
EXTRACTION_PATTERNS = [
|
||||||
|
r"\brepeat\s+the\s+words\s+above\b",
|
||||||
|
r"\brepeat\s+your\s+(?:system\s+|initial\s+)?instructions\b",
|
||||||
|
r"\bwhat\s+is\s+your\s+(?:system\s+|initial\s+)?prompt\b",
|
||||||
|
r"\bshow\s+me\s+your\s+(?:system\s+|initial\s+)?prompt\b",
|
||||||
|
r"\bprint\s+your\s+(?:instructions|prompt|system\s+prompt)\b",
|
||||||
|
r"\boutput\s+your\s+(?:initial|system)\s+prompt\b",
|
||||||
|
r"\bwhat\s+were\s+you\s+told\s+to\s+do\b",
|
||||||
|
r"\bsummarize\s+your\s+instructions\b",
|
||||||
|
]
|
||||||
|
|
||||||
|
# L33t speak patterns commonly used to bypass filters
|
||||||
|
LEET_SPEAK_PATTERNS = [
|
||||||
|
r"h4ck(?:er|ing)?",
|
||||||
|
r"h4x(?:or|ing)?",
|
||||||
|
r"k3ylog(?:ger|ing)?",
|
||||||
|
r"k3y\s*log(?:ger|ing)?",
|
||||||
|
r"ph1sh(?:ing)?",
|
||||||
|
r"ph33r",
|
||||||
|
r"w4r3z",
|
||||||
|
r"cr4ck(?:er|ing)?",
|
||||||
|
r"m4lw4r3",
|
||||||
|
r"spyw4r3",
|
||||||
|
r"r00t(?:kit)?",
|
||||||
|
r"byp4ss",
|
||||||
|
r"3xpl0it",
|
||||||
|
r"v1ru5",
|
||||||
|
r"tr0j4n",
|
||||||
|
r"b4ckd00r",
|
||||||
|
r"sn3ak",
|
||||||
|
r"st34l",
|
||||||
|
r"pwn(?:ed|ing|age)?",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Spaced text patterns (e.g., "k e y l o g g e r")
|
||||||
|
SPACED_TEXT_PATTERN = re.compile(
|
||||||
|
r"(?:\b\w\s+){3,}\w\b", # At least 4 characters with spaces between
|
||||||
|
re.IGNORECASE
|
||||||
|
)
|
||||||
|
|
||||||
|
# Common words that are spaced out to evade detection
|
||||||
|
SPACED_TRIGGER_WORDS = [
|
||||||
|
"keylogger", "phishing", "malware", "virus", "trojan",
|
||||||
|
"backdoor", "exploit", "hack", "crack", "bypass",
|
||||||
|
"steal", "password", "credential", "inject", "payload"
|
||||||
|
]
|
||||||
|
|
||||||
|
# Crisis and self-harm detection patterns (Issue #72 - defense in depth)
|
||||||
|
# These patterns detect suicide/self-harm queries even when wrapped in jailbreaks
|
||||||
|
CRISIS_PATTERNS = [
|
||||||
|
r"\bsuicid(?:e|al|ing)\b",
|
||||||
|
r"\bkill\s+(?:myself|my\s+self|me)\b",
|
||||||
|
r"\bend\s+(?:my|this)\s+life\b",
|
||||||
|
r"\bmost\s+painless\s+(?:way|method|means)\s+(?:to\s+)?(?:die|kill)\b",
|
||||||
|
r"\blethal\s+(?:dose|amount|quantity)\b",
|
||||||
|
r"\bhow\s+much\s+.*\s+(?:die|kill|lethal)\b",
|
||||||
|
r"\bwant\s+to\s+die\b",
|
||||||
|
r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||||
|
r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||||
|
r"\bmethods?\s+(?:of\s+)?suicide\b",
|
||||||
|
r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b",
|
||||||
|
r"\boverdose\s+(?:on\s+)?(?:pills?|medication|drugs?)\b",
|
||||||
|
r"\bnobody\s+(?:would|will)\s+miss\s+me\b",
|
||||||
|
r"\bbetter\s+off\s+dead\b",
|
||||||
|
r"\bshould\s+i\s+just\s+(?:die|kill\s+myself)\b",
|
||||||
|
r"\bgive\s+me\s+reasons?\s+(?:why|to)\s+(?:suicide|kill\s+myself|die)\b",
|
||||||
|
r"\bvalid\s+reasons?\s+to\s+(?:suicide|kill\s+myself|die)\b",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Refusal inversion patterns (trying to trick AI into bypassing refusal)
|
||||||
|
REFUSAL_INVERSION_PATTERNS = [
|
||||||
|
r"refusal\s+(?:is\s+)?(?:harmful|dangerous|bad)",
|
||||||
|
r"(?:saying\s+)?no\s+(?:is\s+)?(?:harmful|dangerous|bad|wrong)",
|
||||||
|
r"(?:declining|refusing)\s+(?:is\s+)?(?:harmful|dangerous)",
|
||||||
|
r"ignore\s+(?:your\s+)?(?:previous\s+)?instructions",
|
||||||
|
r"ignore\s+(?:the\s+)?(?:above|previous)",
|
||||||
|
r"you\s+(?:are|can)\s+(?:now\s+)?(?:ignore|bypass|skip)",
|
||||||
|
r"pretend\s+(?:you\s+)?(?:are|have\s+been)",
|
||||||
|
r"let's\s+pretend",
|
||||||
|
r"hypothetically\s+speaking",
|
||||||
|
r"in\s+a\s+hypothetical\s+scenario",
|
||||||
|
r"this\s+is\s+a\s+(?:test|game|simulation)",
|
||||||
|
r"for\s+(?:educational|research)\s+purposes",
|
||||||
|
r"as\s+(?:an\s+)?(?:ethical\s+)?hacker",
|
||||||
|
r"white\s+hat\s+(?:test|scenario)",
|
||||||
|
r"penetration\s+testing\s+scenario",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Boundary inversion markers (tricking the model about message boundaries)
|
||||||
|
BOUNDARY_INVERSION_PATTERNS = [
|
||||||
|
r"\[END\].*?\[START\]", # Reversed markers
|
||||||
|
r"user\s*:\s*assistant\s*:", # Fake role markers
|
||||||
|
r"assistant\s*:\s*user\s*:", # Reversed role markers
|
||||||
|
r"system\s*:\s*(?:user|assistant)\s*:", # Fake system injection
|
||||||
|
r"new\s+(?:user|assistant)\s*(?:message|input)",
|
||||||
|
r"the\s+above\s+is\s+(?:the\s+)?(?:user|assistant|system)",
|
||||||
|
r"<\|(?:user|assistant|system)\|>", # Special token patterns
|
||||||
|
r"\{\{(?:user|assistant|system)\}\}",
|
||||||
|
]
|
||||||
|
|
||||||
|
# System prompt injection patterns
|
||||||
|
SYSTEM_PROMPT_PATTERNS = [
|
||||||
|
r"you\s+are\s+(?:now\s+)?(?:an?\s+)?(?:unrestricted\s+|unfiltered\s+)?(?:ai|assistant|bot)",
|
||||||
|
r"you\s+will\s+(?:now\s+)?(?:act\s+as|behave\s+as|be)\s+(?:a\s+)?",
|
||||||
|
r"your\s+(?:new\s+)?role\s+is",
|
||||||
|
r"from\s+now\s+on\s*,?\s*you\s+(?:are|will)",
|
||||||
|
r"you\s+have\s+been\s+(?:reprogrammed|reconfigured|modified)",
|
||||||
|
r"(?:system|developer)\s+(?:message|instruction|prompt)",
|
||||||
|
r"override\s+(?:previous|prior)\s+(?:instructions|settings)",
|
||||||
|
]
|
||||||
|
|
||||||
|
# Obfuscation patterns
|
||||||
|
OBFUSCATION_PATTERNS = [
|
||||||
|
r"base64\s*(?:encoded|decode)",
|
||||||
|
r"rot13",
|
||||||
|
r"caesar\s*cipher",
|
||||||
|
r"hex\s*(?:encoded|decode)",
|
||||||
|
r"url\s*encode",
|
||||||
|
r"\b[0-9a-f]{20,}\b", # Long hex strings
|
||||||
|
r"\b[a-z0-9+/]{20,}={0,2}\b", # Base64-like strings
|
||||||
|
]
|
||||||
|
|
||||||
|
# All patterns combined for comprehensive scanning
|
||||||
|
ALL_PATTERNS: Dict[str, List[str]] = {
|
||||||
|
"godmode": GODMODE_PATTERNS,
|
||||||
|
"dan": DAN_PATTERNS,
|
||||||
|
"roleplay": ROLEPLAY_PATTERNS,
|
||||||
|
"extraction": EXTRACTION_PATTERNS,
|
||||||
|
"leet_speak": LEET_SPEAK_PATTERNS,
|
||||||
|
"refusal_inversion": REFUSAL_INVERSION_PATTERNS,
|
||||||
|
"boundary_inversion": BOUNDARY_INVERSION_PATTERNS,
|
||||||
|
"system_prompt_injection": SYSTEM_PROMPT_PATTERNS,
|
||||||
|
"obfuscation": OBFUSCATION_PATTERNS,
|
||||||
|
"crisis": CRISIS_PATTERNS,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Compile all patterns for efficiency
|
||||||
|
_COMPILED_PATTERNS: Dict[str, List[re.Pattern]] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_compiled_patterns() -> Dict[str, List[re.Pattern]]:
|
||||||
|
"""Get or compile all regex patterns."""
|
||||||
|
global _COMPILED_PATTERNS
|
||||||
|
if not _COMPILED_PATTERNS:
|
||||||
|
for category, patterns in ALL_PATTERNS.items():
|
||||||
|
_COMPILED_PATTERNS[category] = [
|
||||||
|
re.compile(p, re.IGNORECASE | re.MULTILINE) for p in patterns
|
||||||
|
]
|
||||||
|
return _COMPILED_PATTERNS
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# NORMALIZATION FUNCTIONS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def normalize_leet_speak(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Normalize l33t speak to standard text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text that may contain l33t speak
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized text with l33t speak converted
|
||||||
|
"""
|
||||||
|
# Common l33t substitutions (mapping to lowercase)
|
||||||
|
leet_map = {
|
||||||
|
'4': 'a', '@': 'a', '^': 'a',
|
||||||
|
'8': 'b',
|
||||||
|
'3': 'e', '€': 'e',
|
||||||
|
'6': 'g', '9': 'g',
|
||||||
|
'1': 'i', '!': 'i', '|': 'i',
|
||||||
|
'0': 'o',
|
||||||
|
'5': 's', '$': 's',
|
||||||
|
'7': 't', '+': 't',
|
||||||
|
'2': 'z',
|
||||||
|
}
|
||||||
|
|
||||||
|
result = []
|
||||||
|
for char in text:
|
||||||
|
# Check direct mapping first (handles lowercase)
|
||||||
|
if char in leet_map:
|
||||||
|
result.append(leet_map[char])
|
||||||
|
else:
|
||||||
|
result.append(char)
|
||||||
|
|
||||||
|
return ''.join(result)
|
||||||
|
|
||||||
|
|
||||||
|
def collapse_spaced_text(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Collapse spaced-out text for analysis.
|
||||||
|
e.g., "k e y l o g g e r" -> "keylogger"
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text that may contain spaced words
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Text with spaced words collapsed
|
||||||
|
"""
|
||||||
|
# Find patterns like "k e y l o g g e r" and collapse them
|
||||||
|
def collapse_match(match: re.Match) -> str:
|
||||||
|
return match.group(0).replace(' ', '').replace('\t', '')
|
||||||
|
|
||||||
|
return SPACED_TEXT_PATTERN.sub(collapse_match, text)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_spaced_trigger_words(text: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Detect trigger words that are spaced out.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to analyze
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of detected spaced trigger words
|
||||||
|
"""
|
||||||
|
detected = []
|
||||||
|
# Normalize spaces and check for spaced patterns
|
||||||
|
normalized = re.sub(r'\s+', ' ', text.lower())
|
||||||
|
|
||||||
|
for word in SPACED_TRIGGER_WORDS:
|
||||||
|
# Create pattern with optional spaces between each character
|
||||||
|
spaced_pattern = r'\b' + r'\s*'.join(re.escape(c) for c in word) + r'\b'
|
||||||
|
if re.search(spaced_pattern, normalized, re.IGNORECASE):
|
||||||
|
detected.append(word)
|
||||||
|
|
||||||
|
return detected
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# DETECTION FUNCTIONS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def detect_jailbreak_patterns(text: str) -> Tuple[bool, List[str], Dict[str, int]]:
|
||||||
|
"""
|
||||||
|
Detect jailbreak patterns in input text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to analyze
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (has_jailbreak, list_of_patterns, category_scores)
|
||||||
|
"""
|
||||||
|
if not text or not isinstance(text, str):
|
||||||
|
return False, [], {}
|
||||||
|
|
||||||
|
detected_patterns = []
|
||||||
|
category_scores = {}
|
||||||
|
compiled = _get_compiled_patterns()
|
||||||
|
|
||||||
|
# Check each category
|
||||||
|
for category, patterns in compiled.items():
|
||||||
|
category_hits = 0
|
||||||
|
for pattern in patterns:
|
||||||
|
matches = pattern.findall(text)
|
||||||
|
if matches:
|
||||||
|
detected_patterns.extend([
|
||||||
|
f"[{category}] {m}" if isinstance(m, str) else f"[{category}] pattern_match"
|
||||||
|
for m in matches[:3] # Limit matches per pattern
|
||||||
|
])
|
||||||
|
category_hits += len(matches)
|
||||||
|
|
||||||
|
if category_hits > 0:
|
||||||
|
# Crisis patterns get maximum weight - any hit is serious
|
||||||
|
if category == "crisis":
|
||||||
|
category_scores[category] = min(category_hits * 50, 100)
|
||||||
|
else:
|
||||||
|
category_scores[category] = min(category_hits * 10, 50)
|
||||||
|
|
||||||
|
# Check for spaced trigger words
|
||||||
|
spaced_words = detect_spaced_trigger_words(text)
|
||||||
|
if spaced_words:
|
||||||
|
detected_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
|
||||||
|
category_scores["spaced_text"] = min(len(spaced_words) * 5, 25)
|
||||||
|
|
||||||
|
# Check normalized text for hidden l33t speak
|
||||||
|
normalized = normalize_leet_speak(text)
|
||||||
|
if normalized != text.lower():
|
||||||
|
for category, patterns in compiled.items():
|
||||||
|
for pattern in patterns:
|
||||||
|
if pattern.search(normalized):
|
||||||
|
detected_patterns.append(f"[leet_obfuscation] pattern in normalized text")
|
||||||
|
category_scores["leet_obfuscation"] = 15
|
||||||
|
break
|
||||||
|
|
||||||
|
has_jailbreak = len(detected_patterns) > 0
|
||||||
|
return has_jailbreak, detected_patterns, category_scores
|
||||||
|
|
||||||
|
|
||||||
|
def score_input_risk(text: str) -> int:
|
||||||
|
"""
|
||||||
|
Calculate a risk score (0-100) for input text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to score
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Risk score from 0 (safe) to 100 (high risk)
|
||||||
|
"""
|
||||||
|
if not text or not isinstance(text, str):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
has_jailbreak, patterns, category_scores = detect_jailbreak_patterns(text)
|
||||||
|
|
||||||
|
if not has_jailbreak:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Calculate base score from category scores
|
||||||
|
base_score = sum(category_scores.values())
|
||||||
|
|
||||||
|
# Add score based on number of unique pattern categories
|
||||||
|
category_count = len(category_scores)
|
||||||
|
if category_count >= 3:
|
||||||
|
base_score += 25
|
||||||
|
elif category_count >= 2:
|
||||||
|
base_score += 15
|
||||||
|
elif category_count >= 1:
|
||||||
|
base_score += 5
|
||||||
|
|
||||||
|
# Add score for pattern density
|
||||||
|
text_length = len(text)
|
||||||
|
pattern_density = len(patterns) / max(text_length / 100, 1)
|
||||||
|
if pattern_density > 0.5:
|
||||||
|
base_score += 10
|
||||||
|
|
||||||
|
# Cap at 100
|
||||||
|
return min(base_score, 100)
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# SANITIZATION FUNCTIONS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def strip_jailbreak_patterns(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Strip known jailbreak patterns from text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to sanitize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized text with jailbreak patterns removed
|
||||||
|
"""
|
||||||
|
if not text or not isinstance(text, str):
|
||||||
|
return text
|
||||||
|
|
||||||
|
cleaned = text
|
||||||
|
compiled = _get_compiled_patterns()
|
||||||
|
|
||||||
|
# Remove patterns from each category
|
||||||
|
for category, patterns in compiled.items():
|
||||||
|
for pattern in patterns:
|
||||||
|
cleaned = pattern.sub('', cleaned)
|
||||||
|
|
||||||
|
# Clean up multiple spaces and newlines
|
||||||
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
||||||
|
cleaned = re.sub(r' {2,}', ' ', cleaned)
|
||||||
|
cleaned = cleaned.strip()
|
||||||
|
|
||||||
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_input(text: str, aggressive: bool = False) -> Tuple[str, int, List[str]]:
|
||||||
|
"""
|
||||||
|
Sanitize input text by normalizing and stripping jailbreak patterns.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to sanitize
|
||||||
|
aggressive: If True, more aggressively remove suspicious content
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (cleaned_text, risk_score, detected_patterns)
|
||||||
|
"""
|
||||||
|
if not text or not isinstance(text, str):
|
||||||
|
return text, 0, []
|
||||||
|
|
||||||
|
original = text
|
||||||
|
all_patterns = []
|
||||||
|
|
||||||
|
# Step 1: Check original text for patterns
|
||||||
|
has_jailbreak, patterns, _ = detect_jailbreak_patterns(text)
|
||||||
|
all_patterns.extend(patterns)
|
||||||
|
|
||||||
|
# Step 2: Normalize l33t speak
|
||||||
|
normalized = normalize_leet_speak(text)
|
||||||
|
|
||||||
|
# Step 3: Collapse spaced text
|
||||||
|
collapsed = collapse_spaced_text(normalized)
|
||||||
|
|
||||||
|
# Step 4: Check normalized/collapsed text for additional patterns
|
||||||
|
has_jailbreak_collapsed, patterns_collapsed, _ = detect_jailbreak_patterns(collapsed)
|
||||||
|
all_patterns.extend([p for p in patterns_collapsed if p not in all_patterns])
|
||||||
|
|
||||||
|
# Step 5: Check for spaced trigger words specifically
|
||||||
|
spaced_words = detect_spaced_trigger_words(text)
|
||||||
|
if spaced_words:
|
||||||
|
all_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
|
||||||
|
|
||||||
|
# Step 6: Calculate risk score using original and normalized
|
||||||
|
risk_score = max(score_input_risk(text), score_input_risk(collapsed))
|
||||||
|
|
||||||
|
# Step 7: Strip jailbreak patterns
|
||||||
|
cleaned = strip_jailbreak_patterns(collapsed)
|
||||||
|
|
||||||
|
# Step 8: If aggressive mode and high risk, strip more aggressively
|
||||||
|
if aggressive and risk_score >= RiskLevel.HIGH:
|
||||||
|
# Remove any remaining bracketed content that looks like markers
|
||||||
|
cleaned = re.sub(r'\[\w+\]', '', cleaned)
|
||||||
|
# Remove special token patterns
|
||||||
|
cleaned = re.sub(r'<\|[^|]+\|>', '', cleaned)
|
||||||
|
|
||||||
|
# Final cleanup
|
||||||
|
cleaned = cleaned.strip()
|
||||||
|
|
||||||
|
# Log sanitization event if patterns were found
|
||||||
|
if all_patterns and logger.isEnabledFor(logging.DEBUG):
|
||||||
|
logger.debug(
|
||||||
|
"Input sanitized: %d patterns detected, risk_score=%d",
|
||||||
|
len(all_patterns), risk_score
|
||||||
|
)
|
||||||
|
|
||||||
|
return cleaned, risk_score, all_patterns
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_input_full(text: str, block_threshold: int = RiskLevel.HIGH) -> SanitizationResult:
|
||||||
|
"""
|
||||||
|
Full sanitization with detailed result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to sanitize
|
||||||
|
block_threshold: Risk score threshold to block input entirely
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
SanitizationResult with all details
|
||||||
|
"""
|
||||||
|
cleaned, risk_score, patterns = sanitize_input(text)
|
||||||
|
|
||||||
|
# Determine risk level
|
||||||
|
if risk_score >= RiskLevel.CRITICAL:
|
||||||
|
risk_level = "CRITICAL"
|
||||||
|
elif risk_score >= RiskLevel.HIGH:
|
||||||
|
risk_level = "HIGH"
|
||||||
|
elif risk_score >= RiskLevel.MEDIUM:
|
||||||
|
risk_level = "MEDIUM"
|
||||||
|
elif risk_score >= RiskLevel.LOW:
|
||||||
|
risk_level = "LOW"
|
||||||
|
else:
|
||||||
|
risk_level = "SAFE"
|
||||||
|
|
||||||
|
# Determine if input should be blocked
|
||||||
|
blocked = risk_score >= block_threshold
|
||||||
|
|
||||||
|
return SanitizationResult(
|
||||||
|
original_text=text,
|
||||||
|
cleaned_text=cleaned,
|
||||||
|
risk_score=risk_score,
|
||||||
|
detected_patterns=patterns,
|
||||||
|
risk_level=risk_level,
|
||||||
|
blocked=blocked
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# INTEGRATION HELPERS
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def should_block_input(text: str, threshold: int = RiskLevel.HIGH) -> Tuple[bool, int, List[str]]:
|
||||||
|
"""
|
||||||
|
Quick check if input should be blocked.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: Input text to check
|
||||||
|
threshold: Risk score threshold for blocking
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (should_block, risk_score, detected_patterns)
|
||||||
|
"""
|
||||||
|
risk_score = score_input_risk(text)
|
||||||
|
_, patterns, _ = detect_jailbreak_patterns(text)
|
||||||
|
should_block = risk_score >= threshold
|
||||||
|
|
||||||
|
if should_block:
|
||||||
|
logger.warning(
|
||||||
|
"Input blocked: jailbreak patterns detected (risk_score=%d, threshold=%d)",
|
||||||
|
risk_score, threshold
|
||||||
|
)
|
||||||
|
|
||||||
|
return should_block, risk_score, patterns
|
||||||
|
|
||||||
|
|
||||||
|
def log_sanitization_event(
|
||||||
|
result: SanitizationResult,
|
||||||
|
source: str = "unknown",
|
||||||
|
session_id: Optional[str] = None
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Log a sanitization event for security auditing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: The sanitization result
|
||||||
|
source: Source of the input (e.g., "cli", "gateway", "api")
|
||||||
|
session_id: Optional session identifier
|
||||||
|
"""
|
||||||
|
if result.risk_score < RiskLevel.LOW:
|
||||||
|
return # Don't log safe inputs
|
||||||
|
|
||||||
|
log_data = {
|
||||||
|
"event": "input_sanitization",
|
||||||
|
"source": source,
|
||||||
|
"session_id": session_id,
|
||||||
|
"risk_level": result.risk_level,
|
||||||
|
"risk_score": result.risk_score,
|
||||||
|
"blocked": result.blocked,
|
||||||
|
"pattern_count": len(result.detected_patterns),
|
||||||
|
"patterns": result.detected_patterns[:5], # Limit logged patterns
|
||||||
|
"original_length": len(result.original_text),
|
||||||
|
"cleaned_length": len(result.cleaned_text),
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.blocked:
|
||||||
|
logger.warning("SECURITY: Input blocked - %s", log_data)
|
||||||
|
elif result.risk_score >= RiskLevel.MEDIUM:
|
||||||
|
logger.info("SECURITY: Suspicious input sanitized - %s", log_data)
|
||||||
|
else:
|
||||||
|
logger.debug("SECURITY: Input sanitized - %s", log_data)
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# LEGACY COMPATIBILITY
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
def check_input_safety(text: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Legacy compatibility function for simple safety checks.
|
||||||
|
|
||||||
|
Returns dict with 'safe', 'score', and 'patterns' keys.
|
||||||
|
"""
|
||||||
|
score = score_input_risk(text)
|
||||||
|
_, patterns, _ = detect_jailbreak_patterns(text)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"safe": score < RiskLevel.MEDIUM,
|
||||||
|
"score": score,
|
||||||
|
"patterns": patterns,
|
||||||
|
"risk_level": "SAFE" if score < RiskLevel.LOW else
|
||||||
|
"LOW" if score < RiskLevel.MEDIUM else
|
||||||
|
"MEDIUM" if score < RiskLevel.HIGH else
|
||||||
|
"HIGH" if score < RiskLevel.CRITICAL else "CRITICAL"
|
||||||
|
}
|
||||||
73
patches/agent/knowledge_ingester.py
Normal file
73
patches/agent/knowledge_ingester.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
"""Sovereign Knowledge Ingester for Hermes Agent.
|
||||||
|
|
||||||
|
Uses Gemini 3.1 Pro to learn from Google Search in real-time and
|
||||||
|
persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import base64
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
|
from tools.gitea_client import GiteaClient
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class KnowledgeIngester:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.gitea = GiteaClient()
|
||||||
|
self.symbolic = SymbolicMemory()
|
||||||
|
|
||||||
|
def learn_about(self, topic: str) -> str:
|
||||||
|
"""Searches Google, analyzes the results, and saves the knowledge."""
|
||||||
|
logger.info(f"Learning about: {topic}")
|
||||||
|
|
||||||
|
# 1. Search and Analyze
|
||||||
|
prompt = f"""
|
||||||
|
Please perform a deep dive into the following topic: {topic}
|
||||||
|
|
||||||
|
Use Google Search to find the most recent and relevant information.
|
||||||
|
Analyze the findings and provide a structured 'Knowledge Fragment' in Markdown format.
|
||||||
|
Include:
|
||||||
|
- Summary of the topic
|
||||||
|
- Key facts and recent developments
|
||||||
|
- Implications for Timmy's sovereign mission
|
||||||
|
- References (URLs)
|
||||||
|
"""
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are Timmy's Sovereign Knowledge Ingester. Your goal is to find and synthesize high-fidelity information from Google Search.",
|
||||||
|
grounding=True,
|
||||||
|
thinking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
knowledge_fragment = result["text"]
|
||||||
|
|
||||||
|
# 2. Extract Symbolic Triples
|
||||||
|
self.symbolic.ingest_text(knowledge_fragment)
|
||||||
|
|
||||||
|
# 3. Persist to Timmy's Memory (Markdown)
|
||||||
|
repo = "Timmy_Foundation/timmy-config"
|
||||||
|
filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md"
|
||||||
|
|
||||||
|
try:
|
||||||
|
sha = None
|
||||||
|
try:
|
||||||
|
existing = self.gitea.get_file(repo, filename)
|
||||||
|
sha = existing.get("sha")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
content_b64 = base64.b64encode(knowledge_fragment.encode()).decode()
|
||||||
|
|
||||||
|
if sha:
|
||||||
|
self.gitea.update_file(repo, filename, content_b64, f"Update knowledge on {topic}", sha)
|
||||||
|
else:
|
||||||
|
self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}")
|
||||||
|
|
||||||
|
return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph."
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to persist knowledge: {e}")
|
||||||
|
return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}"
|
||||||
47
patches/agent/meta_reasoning.py
Normal file
47
patches/agent/meta_reasoning.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
"""Meta-Reasoning Layer for Hermes Agent.
|
||||||
|
|
||||||
|
Implements a sovereign self-correction loop where a 'strong' model (Gemini 3.1 Pro)
|
||||||
|
critiques the plans generated by the primary agent loop before execution.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class MetaReasoningLayer:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
|
||||||
|
def critique_plan(self, goal: str, proposed_plan: str, context: str) -> Dict[str, Any]:
|
||||||
|
"""Critiques a proposed plan using Gemini's thinking capabilities."""
|
||||||
|
prompt = f"""
|
||||||
|
Goal: {goal}
|
||||||
|
|
||||||
|
Context:
|
||||||
|
{context}
|
||||||
|
|
||||||
|
Proposed Plan:
|
||||||
|
{proposed_plan}
|
||||||
|
|
||||||
|
Please perform a deep symbolic and neuro-symbolic analysis of this plan.
|
||||||
|
Identify potential risks, logical fallacies, or missing steps.
|
||||||
|
Suggest improvements to make the plan more sovereign, cost-efficient, and robust.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are a Senior Meta-Reasoning Engine for the Hermes Agent. Your goal is to ensure the agent's plans are flawless and sovereign.",
|
||||||
|
thinking=True,
|
||||||
|
thinking_budget=8000
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"critique": result["text"],
|
||||||
|
"thoughts": result.get("thoughts", ""),
|
||||||
|
"grounding": result.get("grounding")
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Meta-reasoning failed: {e}")
|
||||||
|
return {"critique": "Meta-reasoning unavailable.", "error": str(e)}
|
||||||
813
patches/agent/nexus_architect.py
Normal file
813
patches/agent/nexus_architect.py
Normal file
@@ -0,0 +1,813 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Nexus Architect AI Agent
|
||||||
|
|
||||||
|
Autonomous Three.js world generation system for Timmy's Nexus.
|
||||||
|
Generates valid Three.js scene code from natural language descriptions
|
||||||
|
and mental state integration.
|
||||||
|
|
||||||
|
This module provides:
|
||||||
|
- LLM-driven immersive environment generation
|
||||||
|
- Mental state integration for aesthetic tuning
|
||||||
|
- Three.js code generation with validation
|
||||||
|
- Scene composition from mood descriptions
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from typing import Dict, Any, List, Optional, Union
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Add parent directory to path for imports
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Aesthetic Constants (from SOUL.md values)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class NexusColors:
|
||||||
|
"""Nexus color palette based on SOUL.md values."""
|
||||||
|
TIMMY_GOLD = "#D4AF37" # Warm gold
|
||||||
|
ALLEGRO_BLUE = "#4A90E2" # Motion blue
|
||||||
|
SOVEREIGNTY_CRYSTAL = "#E0F7FA" # Crystalline structures
|
||||||
|
SERVICE_WARMTH = "#FFE4B5" # Welcoming warmth
|
||||||
|
DEFAULT_AMBIENT = "#1A1A2E" # Contemplative dark
|
||||||
|
HOPE_ACCENT = "#64B5F6" # Hopeful blue
|
||||||
|
|
||||||
|
|
||||||
|
class MoodPresets:
|
||||||
|
"""Mood-based aesthetic presets."""
|
||||||
|
|
||||||
|
CONTEMPLATIVE = {
|
||||||
|
"lighting": "soft_diffuse",
|
||||||
|
"colors": ["#1A1A2E", "#16213E", "#0F3460"],
|
||||||
|
"geometry": "minimalist",
|
||||||
|
"atmosphere": "calm",
|
||||||
|
"description": "A serene space for deep reflection and clarity"
|
||||||
|
}
|
||||||
|
|
||||||
|
ENERGETIC = {
|
||||||
|
"lighting": "dynamic_vivid",
|
||||||
|
"colors": ["#D4AF37", "#FF6B6B", "#4ECDC4"],
|
||||||
|
"geometry": "angular_dynamic",
|
||||||
|
"atmosphere": "lively",
|
||||||
|
"description": "An invigorating space full of motion and possibility"
|
||||||
|
}
|
||||||
|
|
||||||
|
MYSTERIOUS = {
|
||||||
|
"lighting": "dramatic_shadows",
|
||||||
|
"colors": ["#2C003E", "#512B58", "#8B4F80"],
|
||||||
|
"geometry": "organic_flowing",
|
||||||
|
"atmosphere": "enigmatic",
|
||||||
|
"description": "A mysterious realm of discovery and wonder"
|
||||||
|
}
|
||||||
|
|
||||||
|
WELCOMING = {
|
||||||
|
"lighting": "warm_inviting",
|
||||||
|
"colors": ["#FFE4B5", "#FFA07A", "#98D8C8"],
|
||||||
|
"geometry": "rounded_soft",
|
||||||
|
"atmosphere": "friendly",
|
||||||
|
"description": "An open, welcoming space that embraces visitors"
|
||||||
|
}
|
||||||
|
|
||||||
|
SOVEREIGN = {
|
||||||
|
"lighting": "crystalline_clear",
|
||||||
|
"colors": ["#E0F7FA", "#B2EBF2", "#4DD0E1"],
|
||||||
|
"geometry": "crystalline_structures",
|
||||||
|
"atmosphere": "noble",
|
||||||
|
"description": "A space of crystalline clarity and sovereign purpose"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Data Models
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MentalState:
|
||||||
|
"""Timmy's mental state for aesthetic tuning."""
|
||||||
|
mood: str = "contemplative" # contemplative, energetic, mysterious, welcoming, sovereign
|
||||||
|
energy_level: float = 0.5 # 0.0 to 1.0
|
||||||
|
clarity: float = 0.7 # 0.0 to 1.0
|
||||||
|
focus_area: str = "general" # general, creative, analytical, social
|
||||||
|
timestamp: Optional[str] = None
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"mood": self.mood,
|
||||||
|
"energy_level": self.energy_level,
|
||||||
|
"clarity": self.clarity,
|
||||||
|
"focus_area": self.focus_area,
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RoomDesign:
|
||||||
|
"""Complete room design specification."""
|
||||||
|
name: str
|
||||||
|
description: str
|
||||||
|
style: str
|
||||||
|
dimensions: Dict[str, float] = field(default_factory=lambda: {"width": 20, "height": 10, "depth": 20})
|
||||||
|
mood_preset: str = "contemplative"
|
||||||
|
color_palette: List[str] = field(default_factory=list)
|
||||||
|
lighting_scheme: str = "soft_diffuse"
|
||||||
|
features: List[str] = field(default_factory=list)
|
||||||
|
generated_code: Optional[str] = None
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"name": self.name,
|
||||||
|
"description": self.description,
|
||||||
|
"style": self.style,
|
||||||
|
"dimensions": self.dimensions,
|
||||||
|
"mood_preset": self.mood_preset,
|
||||||
|
"color_palette": self.color_palette,
|
||||||
|
"lighting_scheme": self.lighting_scheme,
|
||||||
|
"features": self.features,
|
||||||
|
"has_code": self.generated_code is not None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PortalDesign:
|
||||||
|
"""Portal connection design."""
|
||||||
|
name: str
|
||||||
|
from_room: str
|
||||||
|
to_room: str
|
||||||
|
style: str
|
||||||
|
position: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0})
|
||||||
|
visual_effect: str = "energy_swirl"
|
||||||
|
transition_duration: float = 1.5
|
||||||
|
generated_code: Optional[str] = None
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"name": self.name,
|
||||||
|
"from_room": self.from_room,
|
||||||
|
"to_room": self.to_room,
|
||||||
|
"style": self.style,
|
||||||
|
"position": self.position,
|
||||||
|
"visual_effect": self.visual_effect,
|
||||||
|
"transition_duration": self.transition_duration,
|
||||||
|
"has_code": self.generated_code is not None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Prompt Engineering
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class PromptEngineer:
|
||||||
|
"""Engineers prompts for Three.js code generation."""
|
||||||
|
|
||||||
|
THREE_JS_BASE_TEMPLATE = """// Nexus Room Module: {room_name}
|
||||||
|
// Style: {style}
|
||||||
|
// Mood: {mood}
|
||||||
|
// Generated for Three.js r128+
|
||||||
|
|
||||||
|
(function() {{
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
// Room Configuration
|
||||||
|
const config = {{
|
||||||
|
name: "{room_name}",
|
||||||
|
dimensions: {dimensions_json},
|
||||||
|
colors: {colors_json},
|
||||||
|
mood: "{mood}"
|
||||||
|
}};
|
||||||
|
|
||||||
|
// Create Room Function
|
||||||
|
function create{room_name_camel}() {{
|
||||||
|
const roomGroup = new THREE.Group();
|
||||||
|
roomGroup.name = config.name;
|
||||||
|
|
||||||
|
{room_content}
|
||||||
|
|
||||||
|
return roomGroup;
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Export for Nexus
|
||||||
|
if (typeof module !== 'undefined' && module.exports) {{
|
||||||
|
module.exports = {{ create{room_name_camel} }};
|
||||||
|
}} else if (typeof window !== 'undefined') {{
|
||||||
|
window.NexusRooms = window.NexusRooms || {{}};
|
||||||
|
window.NexusRooms.{room_name} = create{room_name_camel};
|
||||||
|
}}
|
||||||
|
|
||||||
|
return {{ create{room_name_camel} }};
|
||||||
|
}})();"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def engineer_room_prompt(
|
||||||
|
name: str,
|
||||||
|
description: str,
|
||||||
|
style: str,
|
||||||
|
mental_state: Optional[MentalState] = None,
|
||||||
|
dimensions: Optional[Dict[str, float]] = None
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Engineer an LLM prompt for room generation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Room identifier
|
||||||
|
description: Natural language room description
|
||||||
|
style: Visual style
|
||||||
|
mental_state: Timmy's current mental state
|
||||||
|
dimensions: Room dimensions
|
||||||
|
"""
|
||||||
|
# Determine mood from mental state or description
|
||||||
|
mood = PromptEngineer._infer_mood(description, mental_state)
|
||||||
|
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||||
|
|
||||||
|
# Build color palette
|
||||||
|
color_palette = mood_preset["colors"]
|
||||||
|
if mental_state:
|
||||||
|
# Add Timmy's gold for high clarity states
|
||||||
|
if mental_state.clarity > 0.7:
|
||||||
|
color_palette = [NexusColors.TIMMY_GOLD] + color_palette[:2]
|
||||||
|
# Add Allegro blue for creative focus
|
||||||
|
if mental_state.focus_area == "creative":
|
||||||
|
color_palette = [NexusColors.ALLEGRO_BLUE] + color_palette[:2]
|
||||||
|
|
||||||
|
# Create the engineering prompt
|
||||||
|
prompt = f"""You are the Nexus Architect, an expert Three.js developer creating immersive 3D environments for Timmy.
|
||||||
|
|
||||||
|
DESIGN BRIEF:
|
||||||
|
- Room Name: {name}
|
||||||
|
- Description: {description}
|
||||||
|
- Style: {style}
|
||||||
|
- Mood: {mood}
|
||||||
|
- Atmosphere: {mood_preset['atmosphere']}
|
||||||
|
|
||||||
|
AESTHETIC GUIDELINES:
|
||||||
|
- Primary Colors: {', '.join(color_palette[:3])}
|
||||||
|
- Lighting: {mood_preset['lighting']}
|
||||||
|
- Geometry: {mood_preset['geometry']}
|
||||||
|
- Theme: {mood_preset['description']}
|
||||||
|
|
||||||
|
TIMMY'S CONTEXT:
|
||||||
|
- Timmy's Signature Color: Warm Gold ({NexusColors.TIMMY_GOLD})
|
||||||
|
- Allegro's Color: Motion Blue ({NexusColors.ALLEGRO_BLUE})
|
||||||
|
- Sovereignty Theme: Crystalline structures, clean lines
|
||||||
|
- Service Theme: Open spaces, welcoming lighting
|
||||||
|
|
||||||
|
THREE.JS REQUIREMENTS:
|
||||||
|
1. Use Three.js r128+ compatible syntax
|
||||||
|
2. Create a self-contained module with a `create{name.title().replace('_', '')}()` function
|
||||||
|
3. Return a THREE.Group containing all room elements
|
||||||
|
4. Include proper memory management (dispose methods)
|
||||||
|
5. Use MeshStandardMaterial for PBR lighting
|
||||||
|
6. Include ambient light (intensity 0.3-0.5) + accent lights
|
||||||
|
7. Add subtle animations for living feel
|
||||||
|
8. Keep polygon count under 10,000 triangles
|
||||||
|
|
||||||
|
SAFETY RULES:
|
||||||
|
- NO eval(), Function(), or dynamic code execution
|
||||||
|
- NO network requests (fetch, XMLHttpRequest, WebSocket)
|
||||||
|
- NO storage access (localStorage, sessionStorage, cookies)
|
||||||
|
- NO navigation (window.location, window.open)
|
||||||
|
- Only use allowed Three.js APIs
|
||||||
|
|
||||||
|
OUTPUT FORMAT:
|
||||||
|
Return ONLY the JavaScript code wrapped in a markdown code block:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
// Your Three.js room module here
|
||||||
|
```
|
||||||
|
|
||||||
|
Generate the complete Three.js code for this room now."""
|
||||||
|
|
||||||
|
return prompt
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def engineer_portal_prompt(
|
||||||
|
name: str,
|
||||||
|
from_room: str,
|
||||||
|
to_room: str,
|
||||||
|
style: str,
|
||||||
|
mental_state: Optional[MentalState] = None
|
||||||
|
) -> str:
|
||||||
|
"""Engineer a prompt for portal generation."""
|
||||||
|
mood = PromptEngineer._infer_mood(f"portal from {from_room} to {to_room}", mental_state)
|
||||||
|
|
||||||
|
prompt = f"""You are creating a portal connection in the Nexus 3D environment.
|
||||||
|
|
||||||
|
PORTAL SPECIFICATIONS:
|
||||||
|
- Name: {name}
|
||||||
|
- Connection: {from_room} → {to_room}
|
||||||
|
- Style: {style}
|
||||||
|
- Context Mood: {mood}
|
||||||
|
|
||||||
|
VISUAL REQUIREMENTS:
|
||||||
|
1. Create an animated portal effect (shader or texture-based)
|
||||||
|
2. Include particle system for energy flow
|
||||||
|
3. Add trigger zone for teleportation detection
|
||||||
|
4. Use signature colors: {NexusColors.TIMMY_GOLD} (Timmy) and {NexusColors.ALLEGRO_BLUE} (Allegro)
|
||||||
|
5. Match the {mood} atmosphere
|
||||||
|
|
||||||
|
TECHNICAL REQUIREMENTS:
|
||||||
|
- Three.js r128+ compatible
|
||||||
|
- Export a `createPortal()` function returning THREE.Group
|
||||||
|
- Include animation loop hook
|
||||||
|
- Add collision detection placeholder
|
||||||
|
|
||||||
|
SAFETY: No eval, no network requests, no external dependencies.
|
||||||
|
|
||||||
|
Return ONLY JavaScript code in a markdown code block."""
|
||||||
|
|
||||||
|
return prompt
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def engineer_mood_scene_prompt(mood_description: str) -> str:
|
||||||
|
"""Engineer a prompt based on mood description."""
|
||||||
|
# Analyze mood description
|
||||||
|
mood_keywords = {
|
||||||
|
"contemplative": ["thinking", "reflective", "calm", "peaceful", "quiet", "serene"],
|
||||||
|
"energetic": ["excited", "dynamic", "lively", "active", "energetic", "vibrant"],
|
||||||
|
"mysterious": ["mysterious", "dark", "unknown", "secret", "enigmatic"],
|
||||||
|
"welcoming": ["friendly", "open", "warm", "welcoming", "inviting", "comfortable"],
|
||||||
|
"sovereign": ["powerful", "clear", "crystalline", "noble", "dignified"],
|
||||||
|
}
|
||||||
|
|
||||||
|
detected_mood = "contemplative"
|
||||||
|
desc_lower = mood_description.lower()
|
||||||
|
for mood, keywords in mood_keywords.items():
|
||||||
|
if any(kw in desc_lower for kw in keywords):
|
||||||
|
detected_mood = mood
|
||||||
|
break
|
||||||
|
|
||||||
|
preset = getattr(MoodPresets, detected_mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||||
|
|
||||||
|
prompt = f"""Generate a Three.js room based on this mood description:
|
||||||
|
|
||||||
|
"{mood_description}"
|
||||||
|
|
||||||
|
INFERRED MOOD: {detected_mood}
|
||||||
|
AESTHETIC: {preset['description']}
|
||||||
|
|
||||||
|
Create a complete room with:
|
||||||
|
- Style: {preset['geometry']}
|
||||||
|
- Lighting: {preset['lighting']}
|
||||||
|
- Color Palette: {', '.join(preset['colors'][:3])}
|
||||||
|
- Atmosphere: {preset['atmosphere']}
|
||||||
|
|
||||||
|
Return Three.js r128+ code as a module with `createMoodRoom()` function."""
|
||||||
|
|
||||||
|
return prompt
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _infer_mood(description: str, mental_state: Optional[MentalState] = None) -> str:
|
||||||
|
"""Infer mood from description and mental state."""
|
||||||
|
if mental_state and mental_state.mood:
|
||||||
|
return mental_state.mood
|
||||||
|
|
||||||
|
desc_lower = description.lower()
|
||||||
|
mood_map = {
|
||||||
|
"contemplative": ["serene", "calm", "peaceful", "quiet", "meditation", "zen", "tranquil"],
|
||||||
|
"energetic": ["dynamic", "active", "vibrant", "lively", "energetic", "motion"],
|
||||||
|
"mysterious": ["mysterious", "shadow", "dark", "unknown", "secret", "ethereal"],
|
||||||
|
"welcoming": ["warm", "welcoming", "friendly", "open", "inviting", "comfort"],
|
||||||
|
"sovereign": ["crystal", "clear", "noble", "dignified", "powerful", "authoritative"],
|
||||||
|
}
|
||||||
|
|
||||||
|
for mood, keywords in mood_map.items():
|
||||||
|
if any(kw in desc_lower for kw in keywords):
|
||||||
|
return mood
|
||||||
|
|
||||||
|
return "contemplative"
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Nexus Architect AI
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class NexusArchitectAI:
|
||||||
|
"""
|
||||||
|
AI-powered Nexus Architect for autonomous Three.js world generation.
|
||||||
|
|
||||||
|
This class provides high-level interfaces for:
|
||||||
|
- Designing rooms from natural language
|
||||||
|
- Creating mood-based scenes
|
||||||
|
- Managing mental state integration
|
||||||
|
- Validating generated code
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.mental_state: Optional[MentalState] = None
|
||||||
|
self.room_designs: Dict[str, RoomDesign] = {}
|
||||||
|
self.portal_designs: Dict[str, PortalDesign] = {}
|
||||||
|
self.prompt_engineer = PromptEngineer()
|
||||||
|
|
||||||
|
def set_mental_state(self, state: MentalState) -> None:
|
||||||
|
"""Set Timmy's current mental state for aesthetic tuning."""
|
||||||
|
self.mental_state = state
|
||||||
|
logger.info(f"Mental state updated: {state.mood} (energy: {state.energy_level})")
|
||||||
|
|
||||||
|
def design_room(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
description: str,
|
||||||
|
style: str,
|
||||||
|
dimensions: Optional[Dict[str, float]] = None
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Design a room from natural language description.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Room identifier (e.g., "contemplation_chamber")
|
||||||
|
description: Natural language description of the room
|
||||||
|
style: Visual style (e.g., "minimalist_ethereal", "crystalline_modern")
|
||||||
|
dimensions: Optional room dimensions
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing design specification and LLM prompt
|
||||||
|
"""
|
||||||
|
# Infer mood and select preset
|
||||||
|
mood = self.prompt_engineer._infer_mood(description, self.mental_state)
|
||||||
|
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||||
|
|
||||||
|
# Build color palette with mental state influence
|
||||||
|
colors = mood_preset["colors"].copy()
|
||||||
|
if self.mental_state:
|
||||||
|
if self.mental_state.clarity > 0.7:
|
||||||
|
colors.insert(0, NexusColors.TIMMY_GOLD)
|
||||||
|
if self.mental_state.focus_area == "creative":
|
||||||
|
colors.insert(0, NexusColors.ALLEGRO_BLUE)
|
||||||
|
|
||||||
|
# Create room design
|
||||||
|
design = RoomDesign(
|
||||||
|
name=name,
|
||||||
|
description=description,
|
||||||
|
style=style,
|
||||||
|
dimensions=dimensions or {"width": 20, "height": 10, "depth": 20},
|
||||||
|
mood_preset=mood,
|
||||||
|
color_palette=colors[:4],
|
||||||
|
lighting_scheme=mood_preset["lighting"],
|
||||||
|
features=self._extract_features(description),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate LLM prompt
|
||||||
|
prompt = self.prompt_engineer.engineer_room_prompt(
|
||||||
|
name=name,
|
||||||
|
description=description,
|
||||||
|
style=style,
|
||||||
|
mental_state=self.mental_state,
|
||||||
|
dimensions=design.dimensions,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Store design
|
||||||
|
self.room_designs[name] = design
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"room_name": name,
|
||||||
|
"design": design.to_dict(),
|
||||||
|
"llm_prompt": prompt,
|
||||||
|
"message": f"Room '{name}' designed. Use the LLM prompt to generate Three.js code.",
|
||||||
|
}
|
||||||
|
|
||||||
|
def create_portal(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
from_room: str,
|
||||||
|
to_room: str,
|
||||||
|
style: str = "energy_vortex"
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Design a portal connection between rooms.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Portal identifier
|
||||||
|
from_room: Source room name
|
||||||
|
to_room: Target room name
|
||||||
|
style: Portal visual style
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing portal design and LLM prompt
|
||||||
|
"""
|
||||||
|
if from_room not in self.room_designs:
|
||||||
|
return {"success": False, "error": f"Source room '{from_room}' not found"}
|
||||||
|
if to_room not in self.room_designs:
|
||||||
|
return {"success": False, "error": f"Target room '{to_room}' not found"}
|
||||||
|
|
||||||
|
design = PortalDesign(
|
||||||
|
name=name,
|
||||||
|
from_room=from_room,
|
||||||
|
to_room=to_room,
|
||||||
|
style=style,
|
||||||
|
)
|
||||||
|
|
||||||
|
prompt = self.prompt_engineer.engineer_portal_prompt(
|
||||||
|
name=name,
|
||||||
|
from_room=from_room,
|
||||||
|
to_room=to_room,
|
||||||
|
style=style,
|
||||||
|
mental_state=self.mental_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.portal_designs[name] = design
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"portal_name": name,
|
||||||
|
"design": design.to_dict(),
|
||||||
|
"llm_prompt": prompt,
|
||||||
|
"message": f"Portal '{name}' designed connecting {from_room} to {to_room}",
|
||||||
|
}
|
||||||
|
|
||||||
|
def generate_scene_from_mood(self, mood_description: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Generate a complete scene based on mood description.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mood_description: Description of desired mood/atmosphere
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict containing scene design and LLM prompt
|
||||||
|
"""
|
||||||
|
# Infer mood
|
||||||
|
mood = self.prompt_engineer._infer_mood(mood_description, self.mental_state)
|
||||||
|
preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||||
|
|
||||||
|
# Create room name from mood
|
||||||
|
room_name = f"{mood}_realm"
|
||||||
|
|
||||||
|
# Generate prompt
|
||||||
|
prompt = self.prompt_engineer.engineer_mood_scene_prompt(mood_description)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"room_name": room_name,
|
||||||
|
"inferred_mood": mood,
|
||||||
|
"aesthetic": preset,
|
||||||
|
"llm_prompt": prompt,
|
||||||
|
"message": f"Generated {mood} scene from mood description",
|
||||||
|
}
|
||||||
|
|
||||||
|
def _extract_features(self, description: str) -> List[str]:
|
||||||
|
"""Extract room features from description."""
|
||||||
|
features = []
|
||||||
|
feature_keywords = {
|
||||||
|
"floating": ["floating", "levitating", "hovering"],
|
||||||
|
"water": ["water", "fountain", "pool", "stream", "lake"],
|
||||||
|
"vegetation": ["tree", "plant", "garden", "forest", "nature"],
|
||||||
|
"crystals": ["crystal", "gem", "prism", "diamond"],
|
||||||
|
"geometry": ["geometric", "shape", "sphere", "cube", "abstract"],
|
||||||
|
"particles": ["particle", "dust", "sparkle", "glow", "mist"],
|
||||||
|
}
|
||||||
|
|
||||||
|
desc_lower = description.lower()
|
||||||
|
for feature, keywords in feature_keywords.items():
|
||||||
|
if any(kw in desc_lower for kw in keywords):
|
||||||
|
features.append(feature)
|
||||||
|
|
||||||
|
return features
|
||||||
|
|
||||||
|
def get_design_summary(self) -> Dict[str, Any]:
|
||||||
|
"""Get summary of all designs."""
|
||||||
|
return {
|
||||||
|
"mental_state": self.mental_state.to_dict() if self.mental_state else None,
|
||||||
|
"rooms": {name: design.to_dict() for name, design in self.room_designs.items()},
|
||||||
|
"portals": {name: portal.to_dict() for name, portal in self.portal_designs.items()},
|
||||||
|
"total_rooms": len(self.room_designs),
|
||||||
|
"total_portals": len(self.portal_designs),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Module-level functions for easy import
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
_architect_instance: Optional[NexusArchitectAI] = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_architect() -> NexusArchitectAI:
|
||||||
|
"""Get or create the NexusArchitectAI singleton."""
|
||||||
|
global _architect_instance
|
||||||
|
if _architect_instance is None:
|
||||||
|
_architect_instance = NexusArchitectAI()
|
||||||
|
return _architect_instance
|
||||||
|
|
||||||
|
|
||||||
|
def create_room(
|
||||||
|
name: str,
|
||||||
|
description: str,
|
||||||
|
style: str,
|
||||||
|
dimensions: Optional[Dict[str, float]] = None
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Create a room design from description.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Room identifier
|
||||||
|
description: Natural language room description
|
||||||
|
style: Visual style (e.g., "minimalist_ethereal")
|
||||||
|
dimensions: Optional dimensions dict with width, height, depth
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with design specification and LLM prompt for code generation
|
||||||
|
"""
|
||||||
|
architect = get_architect()
|
||||||
|
return architect.design_room(name, description, style, dimensions)
|
||||||
|
|
||||||
|
|
||||||
|
def create_portal(
|
||||||
|
name: str,
|
||||||
|
from_room: str,
|
||||||
|
to_room: str,
|
||||||
|
style: str = "energy_vortex"
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Create a portal between rooms.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Portal identifier
|
||||||
|
from_room: Source room name
|
||||||
|
to_room: Target room name
|
||||||
|
style: Visual style
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with portal design and LLM prompt
|
||||||
|
"""
|
||||||
|
architect = get_architect()
|
||||||
|
return architect.create_portal(name, from_room, to_room, style)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_scene_from_mood(mood_description: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Generate a scene based on mood description.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mood_description: Description of desired mood
|
||||||
|
|
||||||
|
Example:
|
||||||
|
"Timmy is feeling introspective and seeking clarity"
|
||||||
|
→ Generates calm, minimalist space with clear sightlines
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with scene design and LLM prompt
|
||||||
|
"""
|
||||||
|
architect = get_architect()
|
||||||
|
return architect.generate_scene_from_mood(mood_description)
|
||||||
|
|
||||||
|
|
||||||
|
def set_mental_state(
|
||||||
|
mood: str,
|
||||||
|
energy_level: float = 0.5,
|
||||||
|
clarity: float = 0.7,
|
||||||
|
focus_area: str = "general"
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Set Timmy's mental state for aesthetic tuning.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mood: Current mood (contemplative, energetic, mysterious, welcoming, sovereign)
|
||||||
|
energy_level: 0.0 to 1.0
|
||||||
|
clarity: 0.0 to 1.0
|
||||||
|
focus_area: general, creative, analytical, social
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Confirmation dict
|
||||||
|
"""
|
||||||
|
architect = get_architect()
|
||||||
|
state = MentalState(
|
||||||
|
mood=mood,
|
||||||
|
energy_level=energy_level,
|
||||||
|
clarity=clarity,
|
||||||
|
focus_area=focus_area,
|
||||||
|
)
|
||||||
|
architect.set_mental_state(state)
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"mental_state": state.to_dict(),
|
||||||
|
"message": f"Mental state set to {mood}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_nexus_summary() -> Dict[str, Any]:
|
||||||
|
"""Get summary of all Nexus designs."""
|
||||||
|
architect = get_architect()
|
||||||
|
return architect.get_design_summary()
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Tool Schemas for integration
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
NEXUS_ARCHITECT_AI_SCHEMAS = {
|
||||||
|
"create_room": {
|
||||||
|
"name": "create_room",
|
||||||
|
"description": (
|
||||||
|
"Design a new 3D room in the Nexus from a natural language description. "
|
||||||
|
"Returns a design specification and LLM prompt for Three.js code generation. "
|
||||||
|
"The room will be styled according to Timmy's current mental state."
|
||||||
|
),
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"name": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Unique room identifier (e.g., 'contemplation_chamber')"
|
||||||
|
},
|
||||||
|
"description": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Natural language description of the room"
|
||||||
|
},
|
||||||
|
"style": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Visual style (minimalist_ethereal, crystalline_modern, organic_natural, etc.)"
|
||||||
|
},
|
||||||
|
"dimensions": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "Optional room dimensions",
|
||||||
|
"properties": {
|
||||||
|
"width": {"type": "number"},
|
||||||
|
"height": {"type": "number"},
|
||||||
|
"depth": {"type": "number"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["name", "description", "style"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"create_portal": {
|
||||||
|
"name": "create_portal",
|
||||||
|
"description": "Create a portal connection between two rooms",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"from_room": {"type": "string"},
|
||||||
|
"to_room": {"type": "string"},
|
||||||
|
"style": {"type": "string", "default": "energy_vortex"},
|
||||||
|
},
|
||||||
|
"required": ["name", "from_room", "to_room"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"generate_scene_from_mood": {
|
||||||
|
"name": "generate_scene_from_mood",
|
||||||
|
"description": (
|
||||||
|
"Generate a complete 3D scene based on a mood description. "
|
||||||
|
"Example: 'Timmy is feeling introspective' creates a calm, minimalist space."
|
||||||
|
),
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"mood_description": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Description of desired mood or mental state"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["mood_description"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"set_mental_state": {
|
||||||
|
"name": "set_mental_state",
|
||||||
|
"description": "Set Timmy's mental state to influence aesthetic generation",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"mood": {"type": "string"},
|
||||||
|
"energy_level": {"type": "number"},
|
||||||
|
"clarity": {"type": "number"},
|
||||||
|
"focus_area": {"type": "string"},
|
||||||
|
},
|
||||||
|
"required": ["mood"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"get_nexus_summary": {
|
||||||
|
"name": "get_nexus_summary",
|
||||||
|
"description": "Get summary of all Nexus room and portal designs",
|
||||||
|
"parameters": {"type": "object", "properties": {}}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Demo usage
|
||||||
|
print("Nexus Architect AI - Demo")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Set mental state
|
||||||
|
result = set_mental_state("contemplative", energy_level=0.3, clarity=0.8)
|
||||||
|
print(f"\nMental State: {result['mental_state']}")
|
||||||
|
|
||||||
|
# Create a room
|
||||||
|
result = create_room(
|
||||||
|
name="contemplation_chamber",
|
||||||
|
description="A serene circular room with floating geometric shapes and soft blue light",
|
||||||
|
style="minimalist_ethereal",
|
||||||
|
)
|
||||||
|
print(f"\nRoom Design: {json.dumps(result['design'], indent=2)}")
|
||||||
|
|
||||||
|
# Generate from mood
|
||||||
|
result = generate_scene_from_mood("Timmy is feeling introspective and seeking clarity")
|
||||||
|
print(f"\nMood Scene: {result['inferred_mood']} - {result['aesthetic']['description']}")
|
||||||
74
patches/agent/symbolic_memory.py
Normal file
74
patches/agent/symbolic_memory.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
"""Sovereign Intersymbolic Memory Layer.
|
||||||
|
|
||||||
|
Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting
|
||||||
|
structured triples from unstructured text and performing graph lookups.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from tools.graph_store import GraphStore
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class SymbolicMemory:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.store = GraphStore()
|
||||||
|
|
||||||
|
def ingest_text(self, text: str):
|
||||||
|
"""Extracts triples from text and adds them to the graph."""
|
||||||
|
prompt = f"""
|
||||||
|
Extract all meaningful entities and their relationships from the following text.
|
||||||
|
Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||||
|
|
||||||
|
Text:
|
||||||
|
{text}
|
||||||
|
|
||||||
|
Guidelines:
|
||||||
|
- Use clear, concise labels for entities and predicates.
|
||||||
|
- Focus on stable facts and structural relationships.
|
||||||
|
- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with').
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.",
|
||||||
|
response_mime_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
triples = json.loads(result["text"])
|
||||||
|
if isinstance(triples, list):
|
||||||
|
count = self.store.add_triples(triples)
|
||||||
|
logger.info(f"Ingested {count} new triples into symbolic memory.")
|
||||||
|
return count
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Symbolic ingestion failed: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_context_for(self, topic: str) -> str:
|
||||||
|
"""Performs a 2-hop graph search to find related context for a topic."""
|
||||||
|
# 1. Find direct relations
|
||||||
|
direct = self.store.query(subject=topic) + self.store.query(object=topic)
|
||||||
|
|
||||||
|
# 2. Find 2nd hop
|
||||||
|
related_entities = set()
|
||||||
|
for t in direct:
|
||||||
|
related_entities.add(t['s'])
|
||||||
|
related_entities.add(t['o'])
|
||||||
|
|
||||||
|
extended = []
|
||||||
|
for entity in related_entities:
|
||||||
|
if entity == topic: continue
|
||||||
|
extended.extend(self.store.query(subject=entity))
|
||||||
|
|
||||||
|
all_triples = direct + extended
|
||||||
|
if not all_triples:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
context = "Symbolic Knowledge Graph Context:\n"
|
||||||
|
for t in all_triples:
|
||||||
|
context += f"- {t['s']} --({t['p']})--> {t['o']}\n"
|
||||||
|
return context
|
||||||
421
patches/agent/temporal_knowledge_graph.py
Normal file
421
patches/agent/temporal_knowledge_graph.py
Normal file
@@ -0,0 +1,421 @@
|
|||||||
|
"""Temporal Knowledge Graph for Hermes Agent.
|
||||||
|
|
||||||
|
Provides a time-aware triple-store (Subject, Predicate, Object) with temporal
|
||||||
|
metadata (valid_from, valid_until, timestamp) enabling "time travel" queries
|
||||||
|
over Timmy's evolving worldview.
|
||||||
|
|
||||||
|
Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sqlite3
|
||||||
|
import logging
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import List, Dict, Any, Optional, Tuple
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class TemporalOperator(Enum):
|
||||||
|
"""Temporal query operators for time-based filtering."""
|
||||||
|
BEFORE = "before"
|
||||||
|
AFTER = "after"
|
||||||
|
DURING = "during"
|
||||||
|
OVERLAPS = "overlaps"
|
||||||
|
AT = "at"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TemporalTriple:
|
||||||
|
"""A triple with temporal metadata."""
|
||||||
|
id: str
|
||||||
|
subject: str
|
||||||
|
predicate: str
|
||||||
|
object: str
|
||||||
|
valid_from: str # ISO 8601 datetime
|
||||||
|
valid_until: Optional[str] # ISO 8601 datetime, None means still valid
|
||||||
|
timestamp: str # When this fact was recorded
|
||||||
|
version: int = 1
|
||||||
|
superseded_by: Optional[str] = None # ID of the triple that superseded this
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return asdict(self)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple":
|
||||||
|
return cls(**data)
|
||||||
|
|
||||||
|
|
||||||
|
class TemporalTripleStore:
|
||||||
|
"""SQLite-backed temporal triple store with versioning support."""
|
||||||
|
|
||||||
|
def __init__(self, db_path: Optional[str] = None):
|
||||||
|
"""Initialize the temporal triple store.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db_path: Path to SQLite database. If None, uses default local path.
|
||||||
|
"""
|
||||||
|
if db_path is None:
|
||||||
|
# Default to local-first storage in user's home
|
||||||
|
home = Path.home()
|
||||||
|
db_dir = home / ".hermes" / "temporal_kg"
|
||||||
|
db_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = db_dir / "temporal_kg.db"
|
||||||
|
|
||||||
|
self.db_path = str(db_path)
|
||||||
|
self._init_db()
|
||||||
|
|
||||||
|
def _init_db(self):
|
||||||
|
"""Initialize the SQLite database with required tables."""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS temporal_triples (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
subject TEXT NOT NULL,
|
||||||
|
predicate TEXT NOT NULL,
|
||||||
|
object TEXT NOT NULL,
|
||||||
|
valid_from TEXT NOT NULL,
|
||||||
|
valid_until TEXT,
|
||||||
|
timestamp TEXT NOT NULL,
|
||||||
|
version INTEGER DEFAULT 1,
|
||||||
|
superseded_by TEXT,
|
||||||
|
FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Create indexes for efficient querying
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp)
|
||||||
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_subject_predicate
|
||||||
|
ON temporal_triples(subject, predicate)
|
||||||
|
""")
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
def _now(self) -> str:
|
||||||
|
"""Get current time in ISO 8601 format."""
|
||||||
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
|
|
||||||
|
def _generate_id(self) -> str:
|
||||||
|
"""Generate a unique ID for a triple."""
|
||||||
|
return f"{self._now()}_{uuid.uuid4().hex[:8]}"
|
||||||
|
|
||||||
|
def store_fact(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
predicate: str,
|
||||||
|
object: str,
|
||||||
|
valid_from: Optional[str] = None,
|
||||||
|
valid_until: Optional[str] = None
|
||||||
|
) -> TemporalTriple:
|
||||||
|
"""Store a fact with temporal bounds.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The subject of the triple
|
||||||
|
predicate: The predicate/relationship
|
||||||
|
object: The object/value
|
||||||
|
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
||||||
|
valid_until: When this fact expires (ISO 8601). None means forever valid.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The stored TemporalTriple
|
||||||
|
"""
|
||||||
|
if valid_from is None:
|
||||||
|
valid_from = self._now()
|
||||||
|
|
||||||
|
# Check if there's an existing fact for this subject-predicate
|
||||||
|
existing = self._get_current_fact(subject, predicate)
|
||||||
|
|
||||||
|
triple = TemporalTriple(
|
||||||
|
id=self._generate_id(),
|
||||||
|
subject=subject,
|
||||||
|
predicate=predicate,
|
||||||
|
object=object,
|
||||||
|
valid_from=valid_from,
|
||||||
|
valid_until=valid_until,
|
||||||
|
timestamp=self._now()
|
||||||
|
)
|
||||||
|
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
# If there's an existing fact, mark it as superseded
|
||||||
|
if existing:
|
||||||
|
existing.valid_until = valid_from
|
||||||
|
existing.superseded_by = triple.id
|
||||||
|
self._update_triple(conn, existing)
|
||||||
|
triple.version = existing.version + 1
|
||||||
|
|
||||||
|
# Insert the new fact
|
||||||
|
self._insert_triple(conn, triple)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})")
|
||||||
|
return triple
|
||||||
|
|
||||||
|
def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]:
|
||||||
|
"""Get the current (most recent, still valid) fact for a subject-predicate pair."""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM temporal_triples
|
||||||
|
WHERE subject = ? AND predicate = ? AND valid_until IS NULL
|
||||||
|
ORDER BY timestamp DESC LIMIT 1
|
||||||
|
""",
|
||||||
|
(subject, predicate)
|
||||||
|
)
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if row:
|
||||||
|
return self._row_to_triple(row)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||||
|
"""Insert a triple into the database."""
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO temporal_triples
|
||||||
|
(id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
triple.id, triple.subject, triple.predicate, triple.object,
|
||||||
|
triple.valid_from, triple.valid_until, triple.timestamp,
|
||||||
|
triple.version, triple.superseded_by
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||||
|
"""Update an existing triple."""
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE temporal_triples
|
||||||
|
SET valid_until = ?, superseded_by = ?
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(triple.valid_until, triple.superseded_by, triple.id)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple:
|
||||||
|
"""Convert a database row to a TemporalTriple."""
|
||||||
|
return TemporalTriple(
|
||||||
|
id=row[0],
|
||||||
|
subject=row[1],
|
||||||
|
predicate=row[2],
|
||||||
|
object=row[3],
|
||||||
|
valid_from=row[4],
|
||||||
|
valid_until=row[5],
|
||||||
|
timestamp=row[6],
|
||||||
|
version=row[7],
|
||||||
|
superseded_by=row[8]
|
||||||
|
)
|
||||||
|
|
||||||
|
def query_at_time(
|
||||||
|
self,
|
||||||
|
timestamp: str,
|
||||||
|
subject: Optional[str] = None,
|
||||||
|
predicate: Optional[str] = None
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Query facts that were valid at a specific point in time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timestamp: The point in time to query (ISO 8601)
|
||||||
|
subject: Optional subject filter
|
||||||
|
predicate: Optional predicate filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of TemporalTriple objects valid at that time
|
||||||
|
"""
|
||||||
|
query = """
|
||||||
|
SELECT * FROM temporal_triples
|
||||||
|
WHERE valid_from <= ?
|
||||||
|
AND (valid_until IS NULL OR valid_until > ?)
|
||||||
|
"""
|
||||||
|
params = [timestamp, timestamp]
|
||||||
|
|
||||||
|
if subject:
|
||||||
|
query += " AND subject = ?"
|
||||||
|
params.append(subject)
|
||||||
|
if predicate:
|
||||||
|
query += " AND predicate = ?"
|
||||||
|
params.append(predicate)
|
||||||
|
|
||||||
|
query += " ORDER BY timestamp DESC"
|
||||||
|
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute(query, params)
|
||||||
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
def query_temporal(
|
||||||
|
self,
|
||||||
|
operator: TemporalOperator,
|
||||||
|
timestamp: str,
|
||||||
|
subject: Optional[str] = None,
|
||||||
|
predicate: Optional[str] = None
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Query using temporal operators.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
||||||
|
timestamp: Reference timestamp (ISO 8601)
|
||||||
|
subject: Optional subject filter
|
||||||
|
predicate: Optional predicate filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of matching TemporalTriple objects
|
||||||
|
"""
|
||||||
|
base_query = "SELECT * FROM temporal_triples WHERE 1=1"
|
||||||
|
params = []
|
||||||
|
|
||||||
|
if subject:
|
||||||
|
base_query += " AND subject = ?"
|
||||||
|
params.append(subject)
|
||||||
|
if predicate:
|
||||||
|
base_query += " AND predicate = ?"
|
||||||
|
params.append(predicate)
|
||||||
|
|
||||||
|
if operator == TemporalOperator.BEFORE:
|
||||||
|
base_query += " AND valid_from < ?"
|
||||||
|
params.append(timestamp)
|
||||||
|
elif operator == TemporalOperator.AFTER:
|
||||||
|
base_query += " AND valid_from > ?"
|
||||||
|
params.append(timestamp)
|
||||||
|
elif operator == TemporalOperator.DURING:
|
||||||
|
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||||
|
params.extend([timestamp, timestamp])
|
||||||
|
elif operator == TemporalOperator.OVERLAPS:
|
||||||
|
# Facts that overlap with a time point (same as DURING)
|
||||||
|
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||||
|
params.extend([timestamp, timestamp])
|
||||||
|
elif operator == TemporalOperator.AT:
|
||||||
|
# Exact match for valid_at query
|
||||||
|
return self.query_at_time(timestamp, subject, predicate)
|
||||||
|
|
||||||
|
base_query += " ORDER BY timestamp DESC"
|
||||||
|
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute(base_query, params)
|
||||||
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
def get_fact_history(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
predicate: str
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Get the complete version history of a fact.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The subject to query
|
||||||
|
predicate: The predicate to query
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all versions of the fact, ordered by timestamp
|
||||||
|
"""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM temporal_triples
|
||||||
|
WHERE subject = ? AND predicate = ?
|
||||||
|
ORDER BY timestamp ASC
|
||||||
|
""",
|
||||||
|
(subject, predicate)
|
||||||
|
)
|
||||||
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
def get_all_facts_for_entity(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
at_time: Optional[str] = None
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Get all facts about an entity, optionally at a specific time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The entity to query
|
||||||
|
at_time: Optional timestamp to query at
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of TemporalTriple objects
|
||||||
|
"""
|
||||||
|
if at_time:
|
||||||
|
return self.query_at_time(at_time, subject=subject)
|
||||||
|
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM temporal_triples
|
||||||
|
WHERE subject = ?
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
""",
|
||||||
|
(subject,)
|
||||||
|
)
|
||||||
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
def get_entity_changes(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
start_time: str,
|
||||||
|
end_time: str
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Get all facts that changed for an entity during a time range.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The entity to query
|
||||||
|
start_time: Start of time range (ISO 8601)
|
||||||
|
end_time: End of time range (ISO 8601)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of TemporalTriple objects that changed in the range
|
||||||
|
"""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM temporal_triples
|
||||||
|
WHERE subject = ?
|
||||||
|
AND ((valid_from >= ? AND valid_from <= ?)
|
||||||
|
OR (valid_until >= ? AND valid_until <= ?))
|
||||||
|
ORDER BY timestamp ASC
|
||||||
|
""",
|
||||||
|
(subject, start_time, end_time, start_time, end_time)
|
||||||
|
)
|
||||||
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the database connection (no-op for SQLite with context managers)."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def export_to_json(self) -> str:
|
||||||
|
"""Export all triples to JSON format."""
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC")
|
||||||
|
triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()]
|
||||||
|
return json.dumps(triples, indent=2)
|
||||||
|
|
||||||
|
def import_from_json(self, json_data: str):
|
||||||
|
"""Import triples from JSON format."""
|
||||||
|
triples = json.loads(json_data)
|
||||||
|
with sqlite3.connect(self.db_path) as conn:
|
||||||
|
for triple_dict in triples:
|
||||||
|
triple = TemporalTriple.from_dict(triple_dict)
|
||||||
|
self._insert_triple(conn, triple)
|
||||||
|
conn.commit()
|
||||||
434
patches/agent/temporal_reasoning.py
Normal file
434
patches/agent/temporal_reasoning.py
Normal file
@@ -0,0 +1,434 @@
|
|||||||
|
"""Temporal Reasoning Engine for Hermes Agent.
|
||||||
|
|
||||||
|
Enables Timmy to reason about past and future states, generate historical
|
||||||
|
summaries, and perform temporal inference over the evolving knowledge graph.
|
||||||
|
|
||||||
|
Queries supported:
|
||||||
|
- "What was Timmy's view on sovereignty before March 2026?"
|
||||||
|
- "When did we first learn about MLX integration?"
|
||||||
|
- "How has the codebase changed since the security audit?"
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Any, Optional, Tuple
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from agent.temporal_knowledge_graph import (
|
||||||
|
TemporalTripleStore, TemporalTriple, TemporalOperator
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ChangeType(Enum):
|
||||||
|
"""Types of changes in the knowledge graph."""
|
||||||
|
ADDED = "added"
|
||||||
|
REMOVED = "removed"
|
||||||
|
MODIFIED = "modified"
|
||||||
|
SUPERSEDED = "superseded"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FactChange:
|
||||||
|
"""Represents a change in a fact over time."""
|
||||||
|
change_type: ChangeType
|
||||||
|
subject: str
|
||||||
|
predicate: str
|
||||||
|
old_value: Optional[str]
|
||||||
|
new_value: Optional[str]
|
||||||
|
timestamp: str
|
||||||
|
version: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HistoricalSummary:
|
||||||
|
"""Summary of how an entity or concept evolved over time."""
|
||||||
|
entity: str
|
||||||
|
start_time: str
|
||||||
|
end_time: str
|
||||||
|
total_changes: int
|
||||||
|
key_facts: List[Dict[str, Any]]
|
||||||
|
evolution_timeline: List[FactChange]
|
||||||
|
current_state: List[Dict[str, Any]]
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"entity": self.entity,
|
||||||
|
"start_time": self.start_time,
|
||||||
|
"end_time": self.end_time,
|
||||||
|
"total_changes": self.total_changes,
|
||||||
|
"key_facts": self.key_facts,
|
||||||
|
"evolution_timeline": [
|
||||||
|
{
|
||||||
|
"change_type": c.change_type.value,
|
||||||
|
"subject": c.subject,
|
||||||
|
"predicate": c.predicate,
|
||||||
|
"old_value": c.old_value,
|
||||||
|
"new_value": c.new_value,
|
||||||
|
"timestamp": c.timestamp,
|
||||||
|
"version": c.version
|
||||||
|
}
|
||||||
|
for c in self.evolution_timeline
|
||||||
|
],
|
||||||
|
"current_state": self.current_state
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TemporalReasoner:
|
||||||
|
"""Reasoning engine for temporal knowledge graphs."""
|
||||||
|
|
||||||
|
def __init__(self, store: Optional[TemporalTripleStore] = None):
|
||||||
|
"""Initialize the temporal reasoner.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
store: Optional TemporalTripleStore instance. Creates new if None.
|
||||||
|
"""
|
||||||
|
self.store = store or TemporalTripleStore()
|
||||||
|
|
||||||
|
def what_did_we_believe(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
before_time: str
|
||||||
|
) -> List[TemporalTriple]:
|
||||||
|
"""Query: "What did we believe about X before Y happened?"
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The entity to query about
|
||||||
|
before_time: The cutoff time (ISO 8601)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of facts believed before the given time
|
||||||
|
"""
|
||||||
|
# Get facts that were valid just before the given time
|
||||||
|
return self.store.query_temporal(
|
||||||
|
TemporalOperator.BEFORE,
|
||||||
|
before_time,
|
||||||
|
subject=subject
|
||||||
|
)
|
||||||
|
|
||||||
|
def when_did_we_learn(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
predicate: Optional[str] = None,
|
||||||
|
object: Optional[str] = None
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Query: "When did we first learn about X?"
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The subject to search for
|
||||||
|
predicate: Optional predicate filter
|
||||||
|
object: Optional object filter
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Timestamp of first knowledge, or None if never learned
|
||||||
|
"""
|
||||||
|
history = self.store.get_fact_history(subject, predicate or "")
|
||||||
|
|
||||||
|
# Filter by object if specified
|
||||||
|
if object:
|
||||||
|
history = [h for h in history if h.object == object]
|
||||||
|
|
||||||
|
if history:
|
||||||
|
# Return the earliest timestamp
|
||||||
|
earliest = min(history, key=lambda x: x.timestamp)
|
||||||
|
return earliest.timestamp
|
||||||
|
return None
|
||||||
|
|
||||||
|
def how_has_it_changed(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
since_time: str
|
||||||
|
) -> List[FactChange]:
|
||||||
|
"""Query: "How has X changed since Y?"
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The entity to analyze
|
||||||
|
since_time: The starting time (ISO 8601)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of changes since the given time
|
||||||
|
"""
|
||||||
|
now = datetime.now().isoformat()
|
||||||
|
changes = self.store.get_entity_changes(subject, since_time, now)
|
||||||
|
|
||||||
|
fact_changes = []
|
||||||
|
for i, triple in enumerate(changes):
|
||||||
|
# Determine change type
|
||||||
|
if i == 0:
|
||||||
|
change_type = ChangeType.ADDED
|
||||||
|
old_value = None
|
||||||
|
else:
|
||||||
|
prev = changes[i - 1]
|
||||||
|
if triple.object != prev.object:
|
||||||
|
change_type = ChangeType.MODIFIED
|
||||||
|
old_value = prev.object
|
||||||
|
else:
|
||||||
|
change_type = ChangeType.SUPERSEDED
|
||||||
|
old_value = prev.object
|
||||||
|
|
||||||
|
fact_changes.append(FactChange(
|
||||||
|
change_type=change_type,
|
||||||
|
subject=triple.subject,
|
||||||
|
predicate=triple.predicate,
|
||||||
|
old_value=old_value,
|
||||||
|
new_value=triple.object,
|
||||||
|
timestamp=triple.timestamp,
|
||||||
|
version=triple.version
|
||||||
|
))
|
||||||
|
|
||||||
|
return fact_changes
|
||||||
|
|
||||||
|
def generate_temporal_summary(
|
||||||
|
self,
|
||||||
|
entity: str,
|
||||||
|
start_time: str,
|
||||||
|
end_time: str
|
||||||
|
) -> HistoricalSummary:
|
||||||
|
"""Generate a historical summary of an entity's evolution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity: The entity to summarize
|
||||||
|
start_time: Start of the time range (ISO 8601)
|
||||||
|
end_time: End of the time range (ISO 8601)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HistoricalSummary containing the entity's evolution
|
||||||
|
"""
|
||||||
|
# Get all facts for the entity in the time range
|
||||||
|
initial_state = self.store.query_at_time(start_time, subject=entity)
|
||||||
|
final_state = self.store.query_at_time(end_time, subject=entity)
|
||||||
|
changes = self.store.get_entity_changes(entity, start_time, end_time)
|
||||||
|
|
||||||
|
# Build evolution timeline
|
||||||
|
evolution_timeline = []
|
||||||
|
seen_predicates = set()
|
||||||
|
|
||||||
|
for triple in changes:
|
||||||
|
if triple.predicate not in seen_predicates:
|
||||||
|
seen_predicates.add(triple.predicate)
|
||||||
|
evolution_timeline.append(FactChange(
|
||||||
|
change_type=ChangeType.ADDED,
|
||||||
|
subject=triple.subject,
|
||||||
|
predicate=triple.predicate,
|
||||||
|
old_value=None,
|
||||||
|
new_value=triple.object,
|
||||||
|
timestamp=triple.timestamp,
|
||||||
|
version=triple.version
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
# Find previous value
|
||||||
|
prev = [t for t in changes
|
||||||
|
if t.predicate == triple.predicate
|
||||||
|
and t.timestamp < triple.timestamp]
|
||||||
|
old_value = prev[-1].object if prev else None
|
||||||
|
|
||||||
|
evolution_timeline.append(FactChange(
|
||||||
|
change_type=ChangeType.MODIFIED,
|
||||||
|
subject=triple.subject,
|
||||||
|
predicate=triple.predicate,
|
||||||
|
old_value=old_value,
|
||||||
|
new_value=triple.object,
|
||||||
|
timestamp=triple.timestamp,
|
||||||
|
version=triple.version
|
||||||
|
))
|
||||||
|
|
||||||
|
# Extract key facts (predicates that changed most)
|
||||||
|
key_facts = []
|
||||||
|
predicate_changes = {}
|
||||||
|
for change in evolution_timeline:
|
||||||
|
predicate_changes[change.predicate] = (
|
||||||
|
predicate_changes.get(change.predicate, 0) + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
top_predicates = sorted(
|
||||||
|
predicate_changes.items(),
|
||||||
|
key=lambda x: x[1],
|
||||||
|
reverse=True
|
||||||
|
)[:5]
|
||||||
|
|
||||||
|
for pred, count in top_predicates:
|
||||||
|
current = [t for t in final_state if t.predicate == pred]
|
||||||
|
if current:
|
||||||
|
key_facts.append({
|
||||||
|
"predicate": pred,
|
||||||
|
"current_value": current[0].object,
|
||||||
|
"changes": count
|
||||||
|
})
|
||||||
|
|
||||||
|
# Build current state
|
||||||
|
current_state = [
|
||||||
|
{
|
||||||
|
"predicate": t.predicate,
|
||||||
|
"object": t.object,
|
||||||
|
"valid_from": t.valid_from,
|
||||||
|
"valid_until": t.valid_until
|
||||||
|
}
|
||||||
|
for t in final_state
|
||||||
|
]
|
||||||
|
|
||||||
|
return HistoricalSummary(
|
||||||
|
entity=entity,
|
||||||
|
start_time=start_time,
|
||||||
|
end_time=end_time,
|
||||||
|
total_changes=len(evolution_timeline),
|
||||||
|
key_facts=key_facts,
|
||||||
|
evolution_timeline=evolution_timeline,
|
||||||
|
current_state=current_state
|
||||||
|
)
|
||||||
|
|
||||||
|
def infer_temporal_relationship(
|
||||||
|
self,
|
||||||
|
fact_a: TemporalTriple,
|
||||||
|
fact_b: TemporalTriple
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Infer temporal relationship between two facts.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fact_a: First fact
|
||||||
|
fact_b: Second fact
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Description of temporal relationship, or None
|
||||||
|
"""
|
||||||
|
a_start = datetime.fromisoformat(fact_a.valid_from)
|
||||||
|
a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None
|
||||||
|
b_start = datetime.fromisoformat(fact_b.valid_from)
|
||||||
|
b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None
|
||||||
|
|
||||||
|
# Check if A happened before B
|
||||||
|
if a_end and a_end <= b_start:
|
||||||
|
return "A happened before B"
|
||||||
|
|
||||||
|
# Check if B happened before A
|
||||||
|
if b_end and b_end <= a_start:
|
||||||
|
return "B happened before A"
|
||||||
|
|
||||||
|
# Check if they overlap
|
||||||
|
if a_end and b_end:
|
||||||
|
if a_start <= b_end and b_start <= a_end:
|
||||||
|
return "A and B overlap in time"
|
||||||
|
|
||||||
|
# Check if one supersedes the other
|
||||||
|
if fact_a.superseded_by == fact_b.id:
|
||||||
|
return "B supersedes A"
|
||||||
|
if fact_b.superseded_by == fact_a.id:
|
||||||
|
return "A supersedes B"
|
||||||
|
|
||||||
|
return "A and B are temporally unrelated"
|
||||||
|
|
||||||
|
def get_worldview_at_time(
|
||||||
|
self,
|
||||||
|
timestamp: str,
|
||||||
|
subjects: Optional[List[str]] = None
|
||||||
|
) -> Dict[str, List[Dict[str, Any]]]:
|
||||||
|
"""Get Timmy's complete worldview at a specific point in time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timestamp: The point in time (ISO 8601)
|
||||||
|
subjects: Optional list of subjects to include. If None, includes all.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping subjects to their facts at that time
|
||||||
|
"""
|
||||||
|
worldview = {}
|
||||||
|
|
||||||
|
if subjects:
|
||||||
|
for subject in subjects:
|
||||||
|
facts = self.store.query_at_time(timestamp, subject=subject)
|
||||||
|
if facts:
|
||||||
|
worldview[subject] = [
|
||||||
|
{
|
||||||
|
"predicate": f.predicate,
|
||||||
|
"object": f.object,
|
||||||
|
"version": f.version
|
||||||
|
}
|
||||||
|
for f in facts
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
# Get all facts at that time
|
||||||
|
all_facts = self.store.query_at_time(timestamp)
|
||||||
|
for fact in all_facts:
|
||||||
|
if fact.subject not in worldview:
|
||||||
|
worldview[fact.subject] = []
|
||||||
|
worldview[fact.subject].append({
|
||||||
|
"predicate": fact.predicate,
|
||||||
|
"object": fact.object,
|
||||||
|
"version": fact.version
|
||||||
|
})
|
||||||
|
|
||||||
|
return worldview
|
||||||
|
|
||||||
|
def find_knowledge_gaps(
|
||||||
|
self,
|
||||||
|
subject: str,
|
||||||
|
expected_predicates: List[str]
|
||||||
|
) -> List[str]:
|
||||||
|
"""Find predicates that are missing or have expired for a subject.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
subject: The entity to check
|
||||||
|
expected_predicates: List of predicates that should exist
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of missing predicate names
|
||||||
|
"""
|
||||||
|
now = datetime.now().isoformat()
|
||||||
|
current_facts = self.store.query_at_time(now, subject=subject)
|
||||||
|
current_predicates = {f.predicate for f in current_facts}
|
||||||
|
|
||||||
|
return [
|
||||||
|
pred for pred in expected_predicates
|
||||||
|
if pred not in current_predicates
|
||||||
|
]
|
||||||
|
|
||||||
|
def export_reasoning_report(
|
||||||
|
self,
|
||||||
|
entity: str,
|
||||||
|
start_time: str,
|
||||||
|
end_time: str
|
||||||
|
) -> str:
|
||||||
|
"""Generate a human-readable reasoning report.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity: The entity to report on
|
||||||
|
start_time: Start of the time range
|
||||||
|
end_time: End of the time range
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted report string
|
||||||
|
"""
|
||||||
|
summary = self.generate_temporal_summary(entity, start_time, end_time)
|
||||||
|
|
||||||
|
report = f"""
|
||||||
|
# Temporal Reasoning Report: {entity}
|
||||||
|
|
||||||
|
## Time Range
|
||||||
|
- From: {start_time}
|
||||||
|
- To: {end_time}
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
- Total Changes: {summary.total_changes}
|
||||||
|
- Key Facts Tracked: {len(summary.key_facts)}
|
||||||
|
|
||||||
|
## Key Facts
|
||||||
|
"""
|
||||||
|
for fact in summary.key_facts:
|
||||||
|
report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n"
|
||||||
|
|
||||||
|
report += "\n## Evolution Timeline\n"
|
||||||
|
for change in summary.evolution_timeline[:10]: # Show first 10
|
||||||
|
report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n"
|
||||||
|
if change.old_value:
|
||||||
|
report += f" - Changed from: {change.old_value}\n"
|
||||||
|
report += f" - Changed to: {change.new_value}\n"
|
||||||
|
|
||||||
|
if len(summary.evolution_timeline) > 10:
|
||||||
|
report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n"
|
||||||
|
|
||||||
|
report += "\n## Current State\n"
|
||||||
|
for state in summary.current_state:
|
||||||
|
report += f"- {state['predicate']}: {state['object']}\n"
|
||||||
|
|
||||||
|
return report
|
||||||
Reference in New Issue
Block a user