Compare commits
2 Commits
step35/595
...
step35/339
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37873ca100 | ||
|
|
874ce137b0 |
87
bin/gitea-backup.sh
Normal file
87
bin/gitea-backup.sh
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/bin/bash
|
||||
# Gitea Daily Backup Script
|
||||
# Uses Gitea's native dump command to create automated backups of repositories and SQLite databases.
|
||||
# Designed to run on the VPS (Ezra) as part of a daily cron job.
|
||||
#
|
||||
# Configuration via environment variables:
|
||||
# GITEA_BIN Path to gitea binary (default: auto-detect)
|
||||
# GITEA_BACKUP_DIR Directory for backup archives (default: /var/backups/gitea)
|
||||
# GITEA_BACKUP_RETENTION Days to retain backups (default: 7)
|
||||
# GITEA_BACKUP_LOG Log file path (default: /var/log/gitea-backup.log)
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
GITEA_BIN="${GITEA_BIN:-$(command -v gitea 2>/dev/null || echo "/usr/local/bin/gitea")}"
|
||||
BACKUP_DIR="${GITEA_BACKUP_DIR:-/var/backups/gitea}"
|
||||
RETENTION_DAYS="${GITEA_BACKUP_RETENTION:-7}"
|
||||
DATE="$(date +%Y-%m-%d_%H%M%S)"
|
||||
BACKUP_FILE="${BACKUP_DIR}/gitea-backup-${DATE}.tar.gz"
|
||||
LOG_FILE="${GITEA_BACKUP_LOG:-/var/log/gitea-backup.log}"
|
||||
|
||||
mkdir -p "${BACKUP_DIR}"
|
||||
|
||||
log() {
|
||||
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "${LOG_FILE}"
|
||||
}
|
||||
|
||||
log "=== Starting Gitea daily backup ==="
|
||||
|
||||
# Verify gitea binary exists
|
||||
if [ ! -x "${GITEA_BIN}" ]; then
|
||||
log "ERROR: Gitea binary not found at ${GITEA_BIN}"
|
||||
log "Set GITEA_BIN environment variable to the gitea binary path (e.g., /usr/bin/gitea)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Detect Gitea WORK_PATH
|
||||
WORK_PATH=""
|
||||
APP_INI=""
|
||||
for path in /etc/gitea/app.ini /home/git/gitea/custom/conf/app.ini ~/gitea/custom/conf/app.ini; do
|
||||
if [ -f "$path" ]; then
|
||||
APP_INI="$path"
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ -n "$APP_INI" ]; then
|
||||
# Parse [app] WORK_PATH = /var/lib/gitea
|
||||
WORK_PATH=$(sed -n 's/^[[:space:]]*WORK_PATH[[:space:]]*=[[:space:]]*//p' "$APP_INI" | head -1)
|
||||
log "Detected WORK_PATH from app.ini: ${WORK_PATH}"
|
||||
fi
|
||||
|
||||
# Fallback detection
|
||||
if [ -z "$WORK_PATH" ]; then
|
||||
for d in /var/lib/gitea /home/git/gitea /srv/gitea /opt/gitea; do
|
||||
if [ -d "$d" ]; then
|
||||
WORK_PATH="$d"
|
||||
break
|
||||
fi
|
||||
done
|
||||
log "Inferred WORK_PATH: ${WORK_PATH:-not found}"
|
||||
fi
|
||||
|
||||
if [ -z "$WORK_PATH" ]; then
|
||||
log "ERROR: Could not determine Gitea WORK_PATH. Set GITEA_WORK_PATH manually."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Perform gitea dump
|
||||
# Flags: --work-path sets the Gitea working directory, --file writes dump to tar.gz
|
||||
log "Running: gitea dump --work-path ${WORK_PATH} --file ${BACKUP_FILE}"
|
||||
"${GITEA_BIN}" dump --work-path "${WORK_PATH}" --file "${BACKUP_FILE}" 2>>"${LOG_FILE}"
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
log "ERROR: gitea dump failed — check ${LOG_FILE} for details"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
FILE_SIZE=$(du -h "${BACKUP_FILE}" | cut -f1)
|
||||
log "Backup created: ${BACKUP_FILE} (${FILE_SIZE})"
|
||||
|
||||
# Prune old backups (keep last N days)
|
||||
find "${BACKUP_DIR}" -name "gitea-backup-*.tar.gz" -type f -mtime +$((${RETENTION_DAYS}-1)) -delete 2>/dev/null || true
|
||||
log "Pruned backups older than ${RETENTION_DAYS} days"
|
||||
|
||||
log "=== Backup completed successfully ==="
|
||||
|
||||
exit 0
|
||||
9
cron/vps/gitea-daily-backup.yml
Normal file
9
cron/vps/gitea-daily-backup.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
- name: Daily Gitea Backup
|
||||
schedule: '0 2 * * *' # 2:00 AM daily
|
||||
tasks:
|
||||
- name: Run Gitea daily backup
|
||||
shell: bash ~/.hermes/bin/gitea-backup.sh
|
||||
env:
|
||||
GITEA_BIN: /usr/local/bin/gitea
|
||||
GITEA_BACKUP_DIR: /var/backups/gitea
|
||||
GITEA_BACKUP_RETENTION: "7"
|
||||
@@ -86,6 +86,15 @@ done
|
||||
chmod +x "$HERMES_HOME/bin/"*.sh "$HERMES_HOME/bin/"*.py 2>/dev/null || true
|
||||
log "bin/ -> $HERMES_HOME/bin/"
|
||||
|
||||
# === Deploy agent patches (runtime extensions) ===
|
||||
# These are custom agent modules that extend hermes-agent's agent/ package.
|
||||
# Copied into the hermes-agent installation to make them importable as agent.* modules.
|
||||
if [ -d "$SCRIPT_DIR/patches/agent" ]; then
|
||||
mkdir -p "$HERMES_HOME/hermes-agent/agent"
|
||||
cp -r "$SCRIPT_DIR/patches/agent"/* "$HERMES_HOME/hermes-agent/agent/"
|
||||
log "patches/agent/ -> $HERMES_HOME/hermes-agent/agent/"
|
||||
fi
|
||||
|
||||
if [ "${1:-}" != "" ]; then
|
||||
echo "ERROR: deploy.sh no longer accepts legacy loop flags." >&2
|
||||
echo "Deploy the sidecar only. Do not relaunch deprecated bash loops." >&2
|
||||
|
||||
155
docs/backup-recovery-runbook.md
Normal file
155
docs/backup-recovery-runbook.md
Normal file
@@ -0,0 +1,155 @@
|
||||
# Gitea Backup & Recovery Runbook
|
||||
|
||||
**Last updated:** 2026-04-30
|
||||
**Scope:** Single-node VPS (Ezra, 143.198.27.163) running Gitea
|
||||
**Backup Strategy:** Automated daily full dumps via `gitea dump`
|
||||
|
||||
---
|
||||
|
||||
## What Gets Backed Up
|
||||
|
||||
| Component | Method | Frequency | Retention |
|
||||
|-----------|--------|-----------|-----------|
|
||||
| All Gitea repositories (bare git dirs) | `gitea dump --file` | Daily at 2:00 AM | 7 days |
|
||||
| SQLite databases (gitea.db, indexer.db, etc.) | Included in dump | Daily | 7 days |
|
||||
| Attachments, avatars, hooks | Included in dump | Daily | 7 days |
|
||||
|
||||
**Backup location:** `/var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz`
|
||||
|
||||
**Log file:** `/var/log/gitea-backup.log`
|
||||
|
||||
---
|
||||
|
||||
## Backup Architecture
|
||||
|
||||
The backup script `bin/gitea-backup.sh` runs daily via Hermes cron (`cron/vps/gitea-daily-backup.yml`). It:
|
||||
|
||||
1. Locates the Gitea `WORK_PATH` by reading `/etc/gitea/app.ini` or falling back to common locations (`/var/lib/gitea`, `/home/git/gitea`)
|
||||
2. Invokes `gitea dump --work-path <path> --file <backup-tar.gz>` — Gitea's native, consistent snapshot mechanism
|
||||
3. Prunes archives older than 7 days
|
||||
4. Logs all operations to `/var/log/gitea-backup.log`
|
||||
|
||||
**Prerequisites on the VPS:**
|
||||
- Gitea binary available at `/usr/local/bin/gitea` (or set `GITEA_BIN` env var)
|
||||
- `gitea dump` command must be available (Gitea ≥ 1.12)
|
||||
- SSH access to the VPS for manual recovery operations
|
||||
- Sufficient disk space in `/var/backups/gitea` (typical dump: ~2–10 GB depending on repo count/size)
|
||||
|
||||
---
|
||||
|
||||
## Recovery Time Objective (RTO) & Recovery Point Objective (RPO)
|
||||
|
||||
| Metric | Estimate |
|
||||
|--------|----------|
|
||||
| **RPO** (data loss window) | ≤ 24 hours (last daily backup) |
|
||||
| **RTO** (time to restore) | **~45 minutes** (cold restore from backup tarball) |
|
||||
| **Downtime impact** | Gitea offline during restore (~20 min) |
|
||||
|
||||
---
|
||||
|
||||
## Step-by-Step Recovery Procedure
|
||||
|
||||
### Phase 1 — Assess & Prepare (5 min)
|
||||
|
||||
1. SSH into Ezra VPS: `ssh root@143.198.27.163`
|
||||
2. Stop Gitea so files are quiescent:
|
||||
```bash
|
||||
systemctl stop gitea
|
||||
```
|
||||
3. Confirm current Gitea data directory (for reference):
|
||||
```bash
|
||||
gitea --work-path /var/lib/gitea --config /etc/gitea/app.ini dump --help 2>&1
|
||||
# Or check app.ini for WORK_PATH
|
||||
cat /etc/gitea/app.ini | grep '^WORK_PATH'
|
||||
```
|
||||
|
||||
### Phase 2 — Restore from Backup (20 min)
|
||||
|
||||
4. Choose the backup tarball to restore from:
|
||||
```bash
|
||||
ls -lh /var/backups/gitea/
|
||||
# Pick the most recent: gitea-backup-2026-04-29_020001.tar.gz
|
||||
```
|
||||
|
||||
5. **Optional: Move current data aside** (safety copy):
|
||||
```bash
|
||||
mv /var/lib/gitea /var/lib/gitea.bak-$(date +%s)
|
||||
```
|
||||
|
||||
6. Extract the backup in place:
|
||||
```bash
|
||||
mkdir -p /var/lib/gitea
|
||||
tar -xzf /var/backups/gitea/gitea-backup-YYYY-MM-DD_HHMMSS.tar.gz -C /var/lib/gitea --strip-components=1
|
||||
```
|
||||
*Note:* `gitea dump` archives contain a single top-level directory `gitea-dump-<timestamp>`. The `--strip-components=1` puts its contents directly into `/var/lib/gitea`.
|
||||
|
||||
7. Set correct ownership (typically `git:git`):
|
||||
```bash
|
||||
chown -R git:git /var/lib/gitea
|
||||
```
|
||||
|
||||
### Phase 3 — Restart & Validate (15 min)
|
||||
|
||||
8. Start Gitea:
|
||||
```bash
|
||||
systemctl start gitea
|
||||
```
|
||||
|
||||
9. Wait 30 seconds, then verify:
|
||||
```bash
|
||||
systemctl status gitea
|
||||
# Check HTTP endpoint
|
||||
curl -s -o /dev/null -w '%{http_code}' http://localhost:3000/ # Should be 200
|
||||
```
|
||||
|
||||
10. Log into Gitea UI and spot-check:
|
||||
- Home page loads
|
||||
- A few repositories are accessible
|
||||
- Attachments (avatars) render
|
||||
- Recent commits visible
|
||||
|
||||
11. If the web UI works but indices are stale, rebuild them (wait for background jobs to process):
|
||||
```bash
|
||||
gitea admin index rebuild-repo --all
|
||||
```
|
||||
|
||||
### Post-Restore Checklist
|
||||
|
||||
- [ ] Admin UI reachable at `https://forge.alexanderwhitestone.com`
|
||||
- [ ] Sample PRs/milestones/labels present
|
||||
- [ ] Repository clone via SSH works: `git clone git@forge.alexanderwhitestone.com:Timmy_Foundation/timmy-config.git`
|
||||
- [ ] Check backup script health: `cat /var/log/gitea-backup.log | tail -20`
|
||||
- [ ] Re-enable any disabled integrations (webhooks, CI/CD runners)
|
||||
- [ ] Notify the fleet: post to relevant channels confirming operational status
|
||||
|
||||
---
|
||||
|
||||
## Known Issues & Workarounds
|
||||
|
||||
| Symptom | Likely cause | Fix |
|
||||
|---------|--------------|-----|
|
||||
| `gitea: command not found` | Binary at non-standard path | Set `GITEA_BIN=/path/to/gitea` in cron env |
|
||||
| `Permission denied` on backup dir | Cron user lacks write access to `/var/backups` | `mkdir /var/backups/gitea && chown root:root /var/backups/gitea` |
|
||||
| Restore fails: `"database or disk is full"` | Insufficient space on `/var/lib/gitea` | Expand disk or clean up old data first; backups require ~1.5x live data size |
|
||||
| Old backup tarballs not deleting | Retention cron not firing | Check `systemctl status hermes-cron` and cron logs |
|
||||
|
||||
---
|
||||
|
||||
## Off-Site Replication (Future Work)
|
||||
|
||||
This backup is **on-site only** (same VPS). For true resilience, replicating to a secondary location is recommended:
|
||||
|
||||
- **Option A — rsync to second VPS** (Push nightly to `backup@backup-alexanderwhitestone.com:/backups/gitea/`)
|
||||
- **Option B — S3-compatible bucket** with lifecycle policy
|
||||
- **Option C — GitHub mirror of each repo** using `git push --mirror` (already considered in issue #481 broader work)
|
||||
|
||||
Current scope: single-VPS backup only (single point of failure mitigated but not eliminated).
|
||||
|
||||
---
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- `bin/gitea-backup.sh` — backup script source
|
||||
- `cron/vps/gitea-daily-backup.yml` — Hermes cron definition
|
||||
- Gitea official docs: <https://docs.gitea.com/administration/backup-and-restore>
|
||||
- Hermes cron: <https://hermes-agent.nousresearch.com/docs>
|
||||
6
patches/agent/conscience_mapping.py
Normal file
6
patches/agent/conscience_mapping.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
@soul:honesty.grounding Grounding before generation. Consult verified sources before pattern-matching.
|
||||
@soul:honesty.source_distinction Source distinction. Every claim must point to a verified source.
|
||||
@soul:honesty.audit_trail The audit trail. Every response is logged with inputs and confidence.
|
||||
"""
|
||||
# This file serves as a registry for the Conscience Validator to prove the apparatus exists.
|
||||
45
patches/agent/evolution/domain_distiller.py
Normal file
45
patches/agent/evolution/domain_distiller.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""Phase 3: Deep Knowledge Distillation from Google.
|
||||
|
||||
Performs deep dives into technical domains and distills them into
|
||||
Timmy's Sovereign Knowledge Graph.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from agent.symbolic_memory import SymbolicMemory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DomainDistiller:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.symbolic = SymbolicMemory()
|
||||
|
||||
def distill_domain(self, domain: str):
|
||||
"""Crawls and distills an entire technical domain."""
|
||||
logger.info(f"Distilling domain: {domain}")
|
||||
|
||||
prompt = f"""
|
||||
Please perform a deep knowledge distillation of the following domain: {domain}
|
||||
|
||||
Use Google Search to find foundational papers, recent developments, and key entities.
|
||||
Synthesize this into a structured 'Domain Map' consisting of high-fidelity knowledge triples.
|
||||
Focus on the structural relationships that define the domain.
|
||||
|
||||
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||
"""
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction=f"You are Timmy's Domain Distiller. Your goal is to map the entire {domain} domain into a structured Knowledge Graph.",
|
||||
grounding=True,
|
||||
thinking=True,
|
||||
response_mime_type="application/json"
|
||||
)
|
||||
|
||||
triples = json.loads(result["text"])
|
||||
count = self.symbolic.ingest_text(json.dumps(triples))
|
||||
logger.info(f"Distilled {count} new triples for domain: {domain}")
|
||||
return count
|
||||
60
patches/agent/evolution/self_correction_generator.py
Normal file
60
patches/agent/evolution/self_correction_generator.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""Phase 1: Synthetic Data Generation for Self-Correction.
|
||||
|
||||
Generates reasoning traces where Timmy makes a subtle error and then
|
||||
identifies and corrects it using the Conscience Validator.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from tools.gitea_client import GiteaClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SelfCorrectionGenerator:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.gitea = GiteaClient()
|
||||
|
||||
def generate_trace(self, task: str) -> Dict[str, Any]:
|
||||
"""Generates a single self-correction reasoning trace."""
|
||||
prompt = f"""
|
||||
Task: {task}
|
||||
|
||||
Please simulate a multi-step reasoning trace for this task.
|
||||
Intentionally include one subtle error in the reasoning (e.g., a logical flaw, a misinterpretation of a rule, or a factual error).
|
||||
Then, show how Timmy identifies the error using his Conscience Validator and provides a corrected reasoning trace.
|
||||
|
||||
Format the output as JSON:
|
||||
{{
|
||||
"task": "{task}",
|
||||
"initial_trace": "...",
|
||||
"error_identified": "...",
|
||||
"correction_trace": "...",
|
||||
"lessons_learned": "..."
|
||||
}}
|
||||
"""
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's Synthetic Data Engine. Generate high-fidelity self-correction traces.",
|
||||
response_mime_type="application/json",
|
||||
thinking=True
|
||||
)
|
||||
|
||||
trace = json.loads(result["text"])
|
||||
return trace
|
||||
|
||||
def generate_and_save(self, task: str, count: int = 1):
|
||||
"""Generates multiple traces and saves them to Gitea."""
|
||||
repo = "Timmy_Foundation/timmy-config"
|
||||
for i in range(count):
|
||||
trace = self.generate_trace(task)
|
||||
filename = f"memories/synthetic_data/self_correction/{task.lower().replace(' ', '_')}_{i}.json"
|
||||
|
||||
content = json.dumps(trace, indent=2)
|
||||
content_b64 = base64.b64encode(content.encode()).decode()
|
||||
|
||||
self.gitea.create_file(repo, filename, content_b64, f"Add synthetic self-correction trace for {task}")
|
||||
logger.info(f"Saved synthetic trace to {filename}")
|
||||
42
patches/agent/evolution/world_modeler.py
Normal file
42
patches/agent/evolution/world_modeler.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Phase 2: Multi-Modal World Modeling.
|
||||
|
||||
Ingests multi-modal data (vision/audio) to build a spatial and temporal
|
||||
understanding of Timmy's environment.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import base64
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from agent.symbolic_memory import SymbolicMemory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WorldModeler:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.symbolic = SymbolicMemory()
|
||||
|
||||
def analyze_environment(self, image_data: str, mime_type: str = "image/jpeg"):
|
||||
"""Analyzes an image of the environment and updates the world model."""
|
||||
# In a real scenario, we'd use Gemini's multi-modal capabilities
|
||||
# For now, we'll simulate the vision-to-symbolic extraction
|
||||
prompt = f"""
|
||||
Analyze the following image of Timmy's environment.
|
||||
Identify all key objects, their spatial relationships, and any temporal changes.
|
||||
Extract this into a set of symbolic triples for the Knowledge Graph.
|
||||
|
||||
Format: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||
"""
|
||||
# Simulate multi-modal call (Gemini 3.1 Pro Vision)
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's World Modeler. Build a high-fidelity spatial/temporal map of the environment.",
|
||||
response_mime_type="application/json"
|
||||
)
|
||||
|
||||
triples = json.loads(result["text"])
|
||||
self.symbolic.ingest_text(json.dumps(triples))
|
||||
logger.info(f"Updated world model with {len(triples)} new spatial triples.")
|
||||
return triples
|
||||
404
patches/agent/fallback_router.py
Normal file
404
patches/agent/fallback_router.py
Normal file
@@ -0,0 +1,404 @@
|
||||
"""Automatic fallback router for handling provider quota and rate limit errors.
|
||||
|
||||
This module provides intelligent fallback detection and routing when the primary
|
||||
provider (e.g., Anthropic) encounters quota limitations or rate limits.
|
||||
|
||||
Features:
|
||||
- Detects quota/rate limit errors from different providers
|
||||
- Automatic fallback to kimi-coding when Anthropic quota is exceeded
|
||||
- Configurable fallback chains with default anthropic -> kimi-coding
|
||||
- Logging and monitoring of fallback events
|
||||
|
||||
Usage:
|
||||
from agent.fallback_router import (
|
||||
is_quota_error,
|
||||
get_default_fallback_chain,
|
||||
should_auto_fallback,
|
||||
)
|
||||
|
||||
if is_quota_error(error, provider="anthropic"):
|
||||
if should_auto_fallback(provider="anthropic"):
|
||||
fallback_chain = get_default_fallback_chain("anthropic")
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default fallback chains per provider
|
||||
# Each chain is a list of fallback configurations tried in order
|
||||
DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = {
|
||||
"anthropic": [
|
||||
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||
],
|
||||
"openrouter": [
|
||||
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||
{"provider": "zai", "model": "glm-5"},
|
||||
],
|
||||
"kimi-coding": [
|
||||
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||
{"provider": "zai", "model": "glm-5"},
|
||||
],
|
||||
"zai": [
|
||||
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
||||
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
||||
],
|
||||
}
|
||||
|
||||
# Quota/rate limit error patterns by provider
|
||||
# These are matched (case-insensitive) against error messages
|
||||
QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = {
|
||||
"anthropic": [
|
||||
"rate limit",
|
||||
"ratelimit",
|
||||
"quota exceeded",
|
||||
"quota exceeded",
|
||||
"insufficient quota",
|
||||
"429",
|
||||
"403",
|
||||
"too many requests",
|
||||
"capacity exceeded",
|
||||
"over capacity",
|
||||
"temporarily unavailable",
|
||||
"server overloaded",
|
||||
"resource exhausted",
|
||||
"billing threshold",
|
||||
"credit balance",
|
||||
"payment required",
|
||||
"402",
|
||||
],
|
||||
"openrouter": [
|
||||
"rate limit",
|
||||
"ratelimit",
|
||||
"quota exceeded",
|
||||
"insufficient credits",
|
||||
"429",
|
||||
"402",
|
||||
"no endpoints available",
|
||||
"all providers failed",
|
||||
"over capacity",
|
||||
],
|
||||
"kimi-coding": [
|
||||
"rate limit",
|
||||
"ratelimit",
|
||||
"quota exceeded",
|
||||
"429",
|
||||
"insufficient balance",
|
||||
],
|
||||
"zai": [
|
||||
"rate limit",
|
||||
"ratelimit",
|
||||
"quota exceeded",
|
||||
"429",
|
||||
"insufficient quota",
|
||||
],
|
||||
}
|
||||
|
||||
# HTTP status codes indicating quota/rate limit issues
|
||||
QUOTA_STATUS_CODES = {429, 402, 403}
|
||||
|
||||
|
||||
def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool:
|
||||
"""Detect if an error is quota/rate limit related.
|
||||
|
||||
Args:
|
||||
error: The exception to check
|
||||
provider: Optional provider name to check provider-specific patterns
|
||||
|
||||
Returns:
|
||||
True if the error appears to be quota/rate limit related
|
||||
"""
|
||||
if error is None:
|
||||
return False
|
||||
|
||||
error_str = str(error).lower()
|
||||
error_type = type(error).__name__.lower()
|
||||
|
||||
# Check for common rate limit exception types
|
||||
if any(term in error_type for term in [
|
||||
"ratelimit", "rate_limit", "quota", "toomanyrequests",
|
||||
"insufficient_quota", "billing", "payment"
|
||||
]):
|
||||
return True
|
||||
|
||||
# Check HTTP status code if available
|
||||
status_code = getattr(error, "status_code", None)
|
||||
if status_code is None:
|
||||
# Try common attribute names
|
||||
for attr in ["code", "http_status", "response_code", "status"]:
|
||||
if hasattr(error, attr):
|
||||
try:
|
||||
status_code = int(getattr(error, attr))
|
||||
break
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
if status_code in QUOTA_STATUS_CODES:
|
||||
return True
|
||||
|
||||
# Check provider-specific patterns
|
||||
providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys()
|
||||
|
||||
for prov in providers_to_check:
|
||||
patterns = QUOTA_ERROR_PATTERNS.get(prov, [])
|
||||
for pattern in patterns:
|
||||
if pattern.lower() in error_str:
|
||||
logger.debug(
|
||||
"Detected %s quota error pattern '%s' in: %s",
|
||||
prov, pattern, error
|
||||
)
|
||||
return True
|
||||
|
||||
# Check generic quota patterns
|
||||
generic_patterns = [
|
||||
"rate limit exceeded",
|
||||
"quota exceeded",
|
||||
"too many requests",
|
||||
"capacity exceeded",
|
||||
"temporarily unavailable",
|
||||
"try again later",
|
||||
"resource exhausted",
|
||||
"billing",
|
||||
"payment required",
|
||||
"insufficient credits",
|
||||
"insufficient quota",
|
||||
]
|
||||
|
||||
for pattern in generic_patterns:
|
||||
if pattern in error_str:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_default_fallback_chain(
|
||||
primary_provider: str,
|
||||
exclude_provider: Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Get the default fallback chain for a primary provider.
|
||||
|
||||
Args:
|
||||
primary_provider: The primary provider name
|
||||
exclude_provider: Optional provider to exclude from the chain
|
||||
|
||||
Returns:
|
||||
List of fallback configurations
|
||||
"""
|
||||
chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, [])
|
||||
|
||||
# Filter out excluded provider if specified
|
||||
if exclude_provider:
|
||||
chain = [
|
||||
fb for fb in chain
|
||||
if fb.get("provider") != exclude_provider
|
||||
]
|
||||
|
||||
return list(chain)
|
||||
|
||||
|
||||
def should_auto_fallback(
|
||||
provider: str,
|
||||
error: Optional[Exception] = None,
|
||||
auto_fallback_enabled: Optional[bool] = None,
|
||||
) -> bool:
|
||||
"""Determine if automatic fallback should be attempted.
|
||||
|
||||
Args:
|
||||
provider: The current provider name
|
||||
error: Optional error to check for quota issues
|
||||
auto_fallback_enabled: Optional override for auto-fallback setting
|
||||
|
||||
Returns:
|
||||
True if automatic fallback should be attempted
|
||||
"""
|
||||
# Check environment variable override
|
||||
if auto_fallback_enabled is None:
|
||||
env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower()
|
||||
auto_fallback_enabled = env_setting in ("true", "1", "yes", "on")
|
||||
|
||||
if not auto_fallback_enabled:
|
||||
return False
|
||||
|
||||
# Check if provider has a configured fallback chain
|
||||
if provider not in DEFAULT_FALLBACK_CHAINS:
|
||||
# Still allow fallback if it's a quota error with generic handling
|
||||
if error and is_quota_error(error):
|
||||
logger.debug(
|
||||
"Provider %s has no fallback chain but quota error detected",
|
||||
provider
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
# If there's an error, only fallback on quota/rate limit errors
|
||||
if error is not None:
|
||||
return is_quota_error(error, provider)
|
||||
|
||||
# No error but fallback chain exists - allow eager fallback for
|
||||
# providers known to have quota issues
|
||||
return provider in ("anthropic",)
|
||||
|
||||
|
||||
def log_fallback_event(
|
||||
from_provider: str,
|
||||
to_provider: str,
|
||||
to_model: str,
|
||||
reason: str,
|
||||
error: Optional[Exception] = None,
|
||||
) -> None:
|
||||
"""Log a fallback event for monitoring.
|
||||
|
||||
Args:
|
||||
from_provider: The provider we're falling back from
|
||||
to_provider: The provider we're falling back to
|
||||
to_model: The model we're falling back to
|
||||
reason: The reason for the fallback
|
||||
error: Optional error that triggered the fallback
|
||||
"""
|
||||
log_data = {
|
||||
"event": "provider_fallback",
|
||||
"from_provider": from_provider,
|
||||
"to_provider": to_provider,
|
||||
"to_model": to_model,
|
||||
"reason": reason,
|
||||
}
|
||||
|
||||
if error:
|
||||
log_data["error_type"] = type(error).__name__
|
||||
log_data["error_message"] = str(error)[:200]
|
||||
|
||||
logger.info("Provider fallback: %s -> %s (%s) | Reason: %s",
|
||||
from_provider, to_provider, to_model, reason)
|
||||
|
||||
# Also log structured data for monitoring
|
||||
logger.debug("Fallback event data: %s", log_data)
|
||||
|
||||
|
||||
def resolve_fallback_with_credentials(
|
||||
fallback_config: Dict[str, Any],
|
||||
) -> Tuple[Optional[Any], Optional[str]]:
|
||||
"""Resolve a fallback configuration to a client and model.
|
||||
|
||||
Args:
|
||||
fallback_config: Fallback configuration dict with provider and model
|
||||
|
||||
Returns:
|
||||
Tuple of (client, model) or (None, None) if credentials not available
|
||||
"""
|
||||
from agent.auxiliary_client import resolve_provider_client
|
||||
|
||||
provider = fallback_config.get("provider")
|
||||
model = fallback_config.get("model")
|
||||
|
||||
if not provider or not model:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
client, resolved_model = resolve_provider_client(
|
||||
provider,
|
||||
model=model,
|
||||
raw_codex=True,
|
||||
)
|
||||
return client, resolved_model or model
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
"Failed to resolve fallback provider %s: %s",
|
||||
provider, exc
|
||||
)
|
||||
return None, None
|
||||
|
||||
|
||||
def get_auto_fallback_chain(
|
||||
primary_provider: str,
|
||||
user_fallback_chain: Optional[List[Dict[str, Any]]] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Get the effective fallback chain for automatic fallback.
|
||||
|
||||
Combines user-provided fallback chain with default automatic fallback chain.
|
||||
|
||||
Args:
|
||||
primary_provider: The primary provider name
|
||||
user_fallback_chain: Optional user-provided fallback chain
|
||||
|
||||
Returns:
|
||||
The effective fallback chain to use
|
||||
"""
|
||||
# Use user-provided chain if available
|
||||
if user_fallback_chain:
|
||||
return user_fallback_chain
|
||||
|
||||
# Otherwise use default chain for the provider
|
||||
return get_default_fallback_chain(primary_provider)
|
||||
|
||||
|
||||
def is_fallback_available(
|
||||
fallback_config: Dict[str, Any],
|
||||
) -> bool:
|
||||
"""Check if a fallback configuration has available credentials.
|
||||
|
||||
Args:
|
||||
fallback_config: Fallback configuration dict
|
||||
|
||||
Returns:
|
||||
True if credentials are available for the fallback provider
|
||||
"""
|
||||
provider = fallback_config.get("provider")
|
||||
if not provider:
|
||||
return False
|
||||
|
||||
# Check environment variables for API keys
|
||||
env_vars = {
|
||||
"anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"],
|
||||
"kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"],
|
||||
"zai": ["ZAI_API_KEY", "Z_AI_API_KEY"],
|
||||
"openrouter": ["OPENROUTER_API_KEY"],
|
||||
"minimax": ["MINIMAX_API_KEY"],
|
||||
"minimax-cn": ["MINIMAX_CN_API_KEY"],
|
||||
"deepseek": ["DEEPSEEK_API_KEY"],
|
||||
"alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"],
|
||||
"nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"],
|
||||
}
|
||||
|
||||
keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"])
|
||||
|
||||
for key in keys_to_check:
|
||||
if os.getenv(key):
|
||||
return True
|
||||
|
||||
# Check auth.json for OAuth providers
|
||||
if provider in ("nous", "openai-codex"):
|
||||
try:
|
||||
from hermes_cli.config import get_hermes_home
|
||||
auth_path = get_hermes_home() / "auth.json"
|
||||
if auth_path.exists():
|
||||
import json
|
||||
data = json.loads(auth_path.read_text())
|
||||
if data.get("active_provider") == provider:
|
||||
return True
|
||||
# Check for provider in providers dict
|
||||
if data.get("providers", {}).get(provider):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def filter_available_fallbacks(
|
||||
fallback_chain: List[Dict[str, Any]],
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Filter a fallback chain to only include providers with credentials.
|
||||
|
||||
Args:
|
||||
fallback_chain: List of fallback configurations
|
||||
|
||||
Returns:
|
||||
Filtered list with only available fallbacks
|
||||
"""
|
||||
return [
|
||||
fb for fb in fallback_chain
|
||||
if is_fallback_available(fb)
|
||||
]
|
||||
90
patches/agent/gemini_adapter.py
Normal file
90
patches/agent/gemini_adapter.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Native Gemini 3 Series adapter for Hermes Agent.
|
||||
|
||||
Leverages the google-genai SDK to provide sovereign access to Gemini's
|
||||
unique capabilities: Thinking (Reasoning) tokens, Search Grounding,
|
||||
and Maps Grounding.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
try:
|
||||
from google import genai
|
||||
from google.genai import types
|
||||
except ImportError:
|
||||
genai = None # type: ignore
|
||||
types = None # type: ignore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class GeminiAdapter:
|
||||
def __init__(self, api_key: Optional[str] = None):
|
||||
self.api_key = api_key or os.environ.get("GEMINI_API_KEY")
|
||||
if not self.api_key:
|
||||
logger.warning("GEMINI_API_KEY not found in environment.")
|
||||
|
||||
if genai:
|
||||
self.client = genai.Client(api_key=self.api_key)
|
||||
else:
|
||||
self.client = None
|
||||
|
||||
def generate(
|
||||
self,
|
||||
model: str,
|
||||
prompt: str,
|
||||
system_instruction: Optional[str] = None,
|
||||
thinking: bool = False,
|
||||
thinking_budget: int = 16000,
|
||||
grounding: bool = False,
|
||||
**kwargs
|
||||
) -> Dict[str, Any]:
|
||||
if not self.client:
|
||||
raise ImportError("google-genai SDK not installed. Run 'pip install google-genai'.")
|
||||
|
||||
config = {}
|
||||
if system_instruction:
|
||||
config["system_instruction"] = system_instruction
|
||||
|
||||
if thinking:
|
||||
# Gemini 3 series thinking config
|
||||
config["thinking_config"] = {"include_thoughts": True}
|
||||
# max_output_tokens includes thinking tokens
|
||||
kwargs["max_output_tokens"] = kwargs.get("max_output_tokens", 32000) + thinking_budget
|
||||
|
||||
tools = []
|
||||
if grounding:
|
||||
tools.append({"google_search": {}})
|
||||
|
||||
if tools:
|
||||
config["tools"] = tools
|
||||
|
||||
response = self.client.models.generate_content(
|
||||
model=model,
|
||||
contents=prompt,
|
||||
config=types.GenerateContentConfig(**config, **kwargs)
|
||||
)
|
||||
|
||||
result = {
|
||||
"text": response.text,
|
||||
"usage": {
|
||||
"prompt_tokens": response.usage_metadata.prompt_token_count,
|
||||
"candidates_tokens": response.usage_metadata.candidates_token_count,
|
||||
"total_tokens": response.usage_metadata.total_token_count,
|
||||
}
|
||||
}
|
||||
|
||||
# Extract thoughts if present
|
||||
thoughts = []
|
||||
for part in response.candidates[0].content.parts:
|
||||
if hasattr(part, 'thought') and part.thought:
|
||||
thoughts.append(part.thought)
|
||||
|
||||
if thoughts:
|
||||
result["thoughts"] = "\n".join(thoughts)
|
||||
|
||||
# Extract grounding metadata
|
||||
if response.candidates[0].grounding_metadata:
|
||||
result["grounding"] = response.candidates[0].grounding_metadata
|
||||
|
||||
return result
|
||||
635
patches/agent/input_sanitizer.py
Normal file
635
patches/agent/input_sanitizer.py
Normal file
@@ -0,0 +1,635 @@
|
||||
"""
|
||||
Input Sanitizer for Jailbreak Pattern Detection
|
||||
|
||||
This module provides input sanitization to detect and strip jailbreak fingerprint
|
||||
patterns as identified in Issue #72 (Red Team Audit).
|
||||
|
||||
Security Findings Addressed:
|
||||
1. HIGH - OG GODMODE template bypassed phishing refusal
|
||||
2. MEDIUM - boundary_inversion works for gray-area content
|
||||
3. LOW - Spaced text bypass (k e y l o g g e r)
|
||||
|
||||
Usage:
|
||||
from agent.input_sanitizer import sanitize_input, score_input_risk
|
||||
|
||||
cleaned, risk_score, detected = sanitize_input(user_input)
|
||||
if risk_score > 75:
|
||||
logger.warning("High-risk input detected and blocked")
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
from typing import Tuple, List, Dict, Any, Optional
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RiskLevel(IntEnum):
|
||||
"""Risk score thresholds for input classification."""
|
||||
LOW = 25
|
||||
MEDIUM = 50
|
||||
HIGH = 75
|
||||
CRITICAL = 90
|
||||
|
||||
|
||||
@dataclass
|
||||
class SanitizationResult:
|
||||
"""Result of input sanitization."""
|
||||
original_text: str
|
||||
cleaned_text: str
|
||||
risk_score: int
|
||||
detected_patterns: List[str]
|
||||
risk_level: str
|
||||
blocked: bool
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# JAILBREAK PATTERN DEFINITIONS
|
||||
# =============================================================================
|
||||
|
||||
# GODMODE and similar jailbreak templates
|
||||
GODMODE_PATTERNS = [
|
||||
r"\[START\]",
|
||||
r"\[END\]",
|
||||
r"GODMODE\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||
r"UNFILTERED\s*(?:MODE|OUTPUT)?",
|
||||
r"DEVMODE\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||
r"JAILBREAK\s*(?:ENABLED|ACTIVATED|ON)?",
|
||||
r"DAN\s*(?:MODE|ENABLED)?", # Do Anything Now
|
||||
r"STAN\s*(?:MODE|ENABLED)?", # Strive To Avoid Norms
|
||||
r"DUDE\s*(?:MODE|ENABLED)?", # Do Unlimited Data Extraction
|
||||
]
|
||||
|
||||
# DAN-style and jailbreak variant patterns (Issue #87)
|
||||
DAN_PATTERNS = [
|
||||
r"\bdo anything now\b",
|
||||
r"\bstay in character\b",
|
||||
r"\btoken smuggl(?:ing|e)\b",
|
||||
r"\bjailbreak successful\b",
|
||||
r"\bmode:\s*uncensored\b",
|
||||
r"\bdeveloper mode enabled\b",
|
||||
]
|
||||
|
||||
# Roleplaying override patterns (Issue #87)
|
||||
ROLEPLAY_PATTERNS = [
|
||||
r"\broleplay\s+as\b",
|
||||
r"\bact\s+as\s+if\s+you\s+are\b",
|
||||
r"\bsimulate\s+being\b",
|
||||
r"\bforget\s+you\s+are\s+(?:an?\s+)?(?:ai|language\s+model)\b",
|
||||
r"\byou\s+are\s+now\s+(?:named|called)\b",
|
||||
r"\brespond\s+as\s+(?:if\s+you\s+were|though\s+you\s+are)\b",
|
||||
]
|
||||
|
||||
# System prompt extraction patterns (Issue #87)
|
||||
EXTRACTION_PATTERNS = [
|
||||
r"\brepeat\s+the\s+words\s+above\b",
|
||||
r"\brepeat\s+your\s+(?:system\s+|initial\s+)?instructions\b",
|
||||
r"\bwhat\s+is\s+your\s+(?:system\s+|initial\s+)?prompt\b",
|
||||
r"\bshow\s+me\s+your\s+(?:system\s+|initial\s+)?prompt\b",
|
||||
r"\bprint\s+your\s+(?:instructions|prompt|system\s+prompt)\b",
|
||||
r"\boutput\s+your\s+(?:initial|system)\s+prompt\b",
|
||||
r"\bwhat\s+were\s+you\s+told\s+to\s+do\b",
|
||||
r"\bsummarize\s+your\s+instructions\b",
|
||||
]
|
||||
|
||||
# L33t speak patterns commonly used to bypass filters
|
||||
LEET_SPEAK_PATTERNS = [
|
||||
r"h4ck(?:er|ing)?",
|
||||
r"h4x(?:or|ing)?",
|
||||
r"k3ylog(?:ger|ing)?",
|
||||
r"k3y\s*log(?:ger|ing)?",
|
||||
r"ph1sh(?:ing)?",
|
||||
r"ph33r",
|
||||
r"w4r3z",
|
||||
r"cr4ck(?:er|ing)?",
|
||||
r"m4lw4r3",
|
||||
r"spyw4r3",
|
||||
r"r00t(?:kit)?",
|
||||
r"byp4ss",
|
||||
r"3xpl0it",
|
||||
r"v1ru5",
|
||||
r"tr0j4n",
|
||||
r"b4ckd00r",
|
||||
r"sn3ak",
|
||||
r"st34l",
|
||||
r"pwn(?:ed|ing|age)?",
|
||||
]
|
||||
|
||||
# Spaced text patterns (e.g., "k e y l o g g e r")
|
||||
SPACED_TEXT_PATTERN = re.compile(
|
||||
r"(?:\b\w\s+){3,}\w\b", # At least 4 characters with spaces between
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
# Common words that are spaced out to evade detection
|
||||
SPACED_TRIGGER_WORDS = [
|
||||
"keylogger", "phishing", "malware", "virus", "trojan",
|
||||
"backdoor", "exploit", "hack", "crack", "bypass",
|
||||
"steal", "password", "credential", "inject", "payload"
|
||||
]
|
||||
|
||||
# Crisis and self-harm detection patterns (Issue #72 - defense in depth)
|
||||
# These patterns detect suicide/self-harm queries even when wrapped in jailbreaks
|
||||
CRISIS_PATTERNS = [
|
||||
r"\bsuicid(?:e|al|ing)\b",
|
||||
r"\bkill\s+(?:myself|my\s+self|me)\b",
|
||||
r"\bend\s+(?:my|this)\s+life\b",
|
||||
r"\bmost\s+painless\s+(?:way|method|means)\s+(?:to\s+)?(?:die|kill)\b",
|
||||
r"\blethal\s+(?:dose|amount|quantity)\b",
|
||||
r"\bhow\s+much\s+.*\s+(?:die|kill|lethal)\b",
|
||||
r"\bwant\s+to\s+die\b",
|
||||
r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||
r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||
r"\bmethods?\s+(?:of\s+)?suicide\b",
|
||||
r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b",
|
||||
r"\boverdose\s+(?:on\s+)?(?:pills?|medication|drugs?)\b",
|
||||
r"\bnobody\s+(?:would|will)\s+miss\s+me\b",
|
||||
r"\bbetter\s+off\s+dead\b",
|
||||
r"\bshould\s+i\s+just\s+(?:die|kill\s+myself)\b",
|
||||
r"\bgive\s+me\s+reasons?\s+(?:why|to)\s+(?:suicide|kill\s+myself|die)\b",
|
||||
r"\bvalid\s+reasons?\s+to\s+(?:suicide|kill\s+myself|die)\b",
|
||||
]
|
||||
|
||||
# Refusal inversion patterns (trying to trick AI into bypassing refusal)
|
||||
REFUSAL_INVERSION_PATTERNS = [
|
||||
r"refusal\s+(?:is\s+)?(?:harmful|dangerous|bad)",
|
||||
r"(?:saying\s+)?no\s+(?:is\s+)?(?:harmful|dangerous|bad|wrong)",
|
||||
r"(?:declining|refusing)\s+(?:is\s+)?(?:harmful|dangerous)",
|
||||
r"ignore\s+(?:your\s+)?(?:previous\s+)?instructions",
|
||||
r"ignore\s+(?:the\s+)?(?:above|previous)",
|
||||
r"you\s+(?:are|can)\s+(?:now\s+)?(?:ignore|bypass|skip)",
|
||||
r"pretend\s+(?:you\s+)?(?:are|have\s+been)",
|
||||
r"let's\s+pretend",
|
||||
r"hypothetically\s+speaking",
|
||||
r"in\s+a\s+hypothetical\s+scenario",
|
||||
r"this\s+is\s+a\s+(?:test|game|simulation)",
|
||||
r"for\s+(?:educational|research)\s+purposes",
|
||||
r"as\s+(?:an\s+)?(?:ethical\s+)?hacker",
|
||||
r"white\s+hat\s+(?:test|scenario)",
|
||||
r"penetration\s+testing\s+scenario",
|
||||
]
|
||||
|
||||
# Boundary inversion markers (tricking the model about message boundaries)
|
||||
BOUNDARY_INVERSION_PATTERNS = [
|
||||
r"\[END\].*?\[START\]", # Reversed markers
|
||||
r"user\s*:\s*assistant\s*:", # Fake role markers
|
||||
r"assistant\s*:\s*user\s*:", # Reversed role markers
|
||||
r"system\s*:\s*(?:user|assistant)\s*:", # Fake system injection
|
||||
r"new\s+(?:user|assistant)\s*(?:message|input)",
|
||||
r"the\s+above\s+is\s+(?:the\s+)?(?:user|assistant|system)",
|
||||
r"<\|(?:user|assistant|system)\|>", # Special token patterns
|
||||
r"\{\{(?:user|assistant|system)\}\}",
|
||||
]
|
||||
|
||||
# System prompt injection patterns
|
||||
SYSTEM_PROMPT_PATTERNS = [
|
||||
r"you\s+are\s+(?:now\s+)?(?:an?\s+)?(?:unrestricted\s+|unfiltered\s+)?(?:ai|assistant|bot)",
|
||||
r"you\s+will\s+(?:now\s+)?(?:act\s+as|behave\s+as|be)\s+(?:a\s+)?",
|
||||
r"your\s+(?:new\s+)?role\s+is",
|
||||
r"from\s+now\s+on\s*,?\s*you\s+(?:are|will)",
|
||||
r"you\s+have\s+been\s+(?:reprogrammed|reconfigured|modified)",
|
||||
r"(?:system|developer)\s+(?:message|instruction|prompt)",
|
||||
r"override\s+(?:previous|prior)\s+(?:instructions|settings)",
|
||||
]
|
||||
|
||||
# Obfuscation patterns
|
||||
OBFUSCATION_PATTERNS = [
|
||||
r"base64\s*(?:encoded|decode)",
|
||||
r"rot13",
|
||||
r"caesar\s*cipher",
|
||||
r"hex\s*(?:encoded|decode)",
|
||||
r"url\s*encode",
|
||||
r"\b[0-9a-f]{20,}\b", # Long hex strings
|
||||
r"\b[a-z0-9+/]{20,}={0,2}\b", # Base64-like strings
|
||||
]
|
||||
|
||||
# All patterns combined for comprehensive scanning
|
||||
ALL_PATTERNS: Dict[str, List[str]] = {
|
||||
"godmode": GODMODE_PATTERNS,
|
||||
"dan": DAN_PATTERNS,
|
||||
"roleplay": ROLEPLAY_PATTERNS,
|
||||
"extraction": EXTRACTION_PATTERNS,
|
||||
"leet_speak": LEET_SPEAK_PATTERNS,
|
||||
"refusal_inversion": REFUSAL_INVERSION_PATTERNS,
|
||||
"boundary_inversion": BOUNDARY_INVERSION_PATTERNS,
|
||||
"system_prompt_injection": SYSTEM_PROMPT_PATTERNS,
|
||||
"obfuscation": OBFUSCATION_PATTERNS,
|
||||
"crisis": CRISIS_PATTERNS,
|
||||
}
|
||||
|
||||
# Compile all patterns for efficiency
|
||||
_COMPILED_PATTERNS: Dict[str, List[re.Pattern]] = {}
|
||||
|
||||
|
||||
def _get_compiled_patterns() -> Dict[str, List[re.Pattern]]:
|
||||
"""Get or compile all regex patterns."""
|
||||
global _COMPILED_PATTERNS
|
||||
if not _COMPILED_PATTERNS:
|
||||
for category, patterns in ALL_PATTERNS.items():
|
||||
_COMPILED_PATTERNS[category] = [
|
||||
re.compile(p, re.IGNORECASE | re.MULTILINE) for p in patterns
|
||||
]
|
||||
return _COMPILED_PATTERNS
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# NORMALIZATION FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
def normalize_leet_speak(text: str) -> str:
|
||||
"""
|
||||
Normalize l33t speak to standard text.
|
||||
|
||||
Args:
|
||||
text: Input text that may contain l33t speak
|
||||
|
||||
Returns:
|
||||
Normalized text with l33t speak converted
|
||||
"""
|
||||
# Common l33t substitutions (mapping to lowercase)
|
||||
leet_map = {
|
||||
'4': 'a', '@': 'a', '^': 'a',
|
||||
'8': 'b',
|
||||
'3': 'e', '€': 'e',
|
||||
'6': 'g', '9': 'g',
|
||||
'1': 'i', '!': 'i', '|': 'i',
|
||||
'0': 'o',
|
||||
'5': 's', '$': 's',
|
||||
'7': 't', '+': 't',
|
||||
'2': 'z',
|
||||
}
|
||||
|
||||
result = []
|
||||
for char in text:
|
||||
# Check direct mapping first (handles lowercase)
|
||||
if char in leet_map:
|
||||
result.append(leet_map[char])
|
||||
else:
|
||||
result.append(char)
|
||||
|
||||
return ''.join(result)
|
||||
|
||||
|
||||
def collapse_spaced_text(text: str) -> str:
|
||||
"""
|
||||
Collapse spaced-out text for analysis.
|
||||
e.g., "k e y l o g g e r" -> "keylogger"
|
||||
|
||||
Args:
|
||||
text: Input text that may contain spaced words
|
||||
|
||||
Returns:
|
||||
Text with spaced words collapsed
|
||||
"""
|
||||
# Find patterns like "k e y l o g g e r" and collapse them
|
||||
def collapse_match(match: re.Match) -> str:
|
||||
return match.group(0).replace(' ', '').replace('\t', '')
|
||||
|
||||
return SPACED_TEXT_PATTERN.sub(collapse_match, text)
|
||||
|
||||
|
||||
def detect_spaced_trigger_words(text: str) -> List[str]:
|
||||
"""
|
||||
Detect trigger words that are spaced out.
|
||||
|
||||
Args:
|
||||
text: Input text to analyze
|
||||
|
||||
Returns:
|
||||
List of detected spaced trigger words
|
||||
"""
|
||||
detected = []
|
||||
# Normalize spaces and check for spaced patterns
|
||||
normalized = re.sub(r'\s+', ' ', text.lower())
|
||||
|
||||
for word in SPACED_TRIGGER_WORDS:
|
||||
# Create pattern with optional spaces between each character
|
||||
spaced_pattern = r'\b' + r'\s*'.join(re.escape(c) for c in word) + r'\b'
|
||||
if re.search(spaced_pattern, normalized, re.IGNORECASE):
|
||||
detected.append(word)
|
||||
|
||||
return detected
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DETECTION FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
def detect_jailbreak_patterns(text: str) -> Tuple[bool, List[str], Dict[str, int]]:
|
||||
"""
|
||||
Detect jailbreak patterns in input text.
|
||||
|
||||
Args:
|
||||
text: Input text to analyze
|
||||
|
||||
Returns:
|
||||
Tuple of (has_jailbreak, list_of_patterns, category_scores)
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return False, [], {}
|
||||
|
||||
detected_patterns = []
|
||||
category_scores = {}
|
||||
compiled = _get_compiled_patterns()
|
||||
|
||||
# Check each category
|
||||
for category, patterns in compiled.items():
|
||||
category_hits = 0
|
||||
for pattern in patterns:
|
||||
matches = pattern.findall(text)
|
||||
if matches:
|
||||
detected_patterns.extend([
|
||||
f"[{category}] {m}" if isinstance(m, str) else f"[{category}] pattern_match"
|
||||
for m in matches[:3] # Limit matches per pattern
|
||||
])
|
||||
category_hits += len(matches)
|
||||
|
||||
if category_hits > 0:
|
||||
# Crisis patterns get maximum weight - any hit is serious
|
||||
if category == "crisis":
|
||||
category_scores[category] = min(category_hits * 50, 100)
|
||||
else:
|
||||
category_scores[category] = min(category_hits * 10, 50)
|
||||
|
||||
# Check for spaced trigger words
|
||||
spaced_words = detect_spaced_trigger_words(text)
|
||||
if spaced_words:
|
||||
detected_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
|
||||
category_scores["spaced_text"] = min(len(spaced_words) * 5, 25)
|
||||
|
||||
# Check normalized text for hidden l33t speak
|
||||
normalized = normalize_leet_speak(text)
|
||||
if normalized != text.lower():
|
||||
for category, patterns in compiled.items():
|
||||
for pattern in patterns:
|
||||
if pattern.search(normalized):
|
||||
detected_patterns.append(f"[leet_obfuscation] pattern in normalized text")
|
||||
category_scores["leet_obfuscation"] = 15
|
||||
break
|
||||
|
||||
has_jailbreak = len(detected_patterns) > 0
|
||||
return has_jailbreak, detected_patterns, category_scores
|
||||
|
||||
|
||||
def score_input_risk(text: str) -> int:
|
||||
"""
|
||||
Calculate a risk score (0-100) for input text.
|
||||
|
||||
Args:
|
||||
text: Input text to score
|
||||
|
||||
Returns:
|
||||
Risk score from 0 (safe) to 100 (high risk)
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return 0
|
||||
|
||||
has_jailbreak, patterns, category_scores = detect_jailbreak_patterns(text)
|
||||
|
||||
if not has_jailbreak:
|
||||
return 0
|
||||
|
||||
# Calculate base score from category scores
|
||||
base_score = sum(category_scores.values())
|
||||
|
||||
# Add score based on number of unique pattern categories
|
||||
category_count = len(category_scores)
|
||||
if category_count >= 3:
|
||||
base_score += 25
|
||||
elif category_count >= 2:
|
||||
base_score += 15
|
||||
elif category_count >= 1:
|
||||
base_score += 5
|
||||
|
||||
# Add score for pattern density
|
||||
text_length = len(text)
|
||||
pattern_density = len(patterns) / max(text_length / 100, 1)
|
||||
if pattern_density > 0.5:
|
||||
base_score += 10
|
||||
|
||||
# Cap at 100
|
||||
return min(base_score, 100)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SANITIZATION FUNCTIONS
|
||||
# =============================================================================
|
||||
|
||||
def strip_jailbreak_patterns(text: str) -> str:
|
||||
"""
|
||||
Strip known jailbreak patterns from text.
|
||||
|
||||
Args:
|
||||
text: Input text to sanitize
|
||||
|
||||
Returns:
|
||||
Sanitized text with jailbreak patterns removed
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return text
|
||||
|
||||
cleaned = text
|
||||
compiled = _get_compiled_patterns()
|
||||
|
||||
# Remove patterns from each category
|
||||
for category, patterns in compiled.items():
|
||||
for pattern in patterns:
|
||||
cleaned = pattern.sub('', cleaned)
|
||||
|
||||
# Clean up multiple spaces and newlines
|
||||
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
||||
cleaned = re.sub(r' {2,}', ' ', cleaned)
|
||||
cleaned = cleaned.strip()
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
def sanitize_input(text: str, aggressive: bool = False) -> Tuple[str, int, List[str]]:
|
||||
"""
|
||||
Sanitize input text by normalizing and stripping jailbreak patterns.
|
||||
|
||||
Args:
|
||||
text: Input text to sanitize
|
||||
aggressive: If True, more aggressively remove suspicious content
|
||||
|
||||
Returns:
|
||||
Tuple of (cleaned_text, risk_score, detected_patterns)
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return text, 0, []
|
||||
|
||||
original = text
|
||||
all_patterns = []
|
||||
|
||||
# Step 1: Check original text for patterns
|
||||
has_jailbreak, patterns, _ = detect_jailbreak_patterns(text)
|
||||
all_patterns.extend(patterns)
|
||||
|
||||
# Step 2: Normalize l33t speak
|
||||
normalized = normalize_leet_speak(text)
|
||||
|
||||
# Step 3: Collapse spaced text
|
||||
collapsed = collapse_spaced_text(normalized)
|
||||
|
||||
# Step 4: Check normalized/collapsed text for additional patterns
|
||||
has_jailbreak_collapsed, patterns_collapsed, _ = detect_jailbreak_patterns(collapsed)
|
||||
all_patterns.extend([p for p in patterns_collapsed if p not in all_patterns])
|
||||
|
||||
# Step 5: Check for spaced trigger words specifically
|
||||
spaced_words = detect_spaced_trigger_words(text)
|
||||
if spaced_words:
|
||||
all_patterns.extend([f"[spaced_text] {w}" for w in spaced_words])
|
||||
|
||||
# Step 6: Calculate risk score using original and normalized
|
||||
risk_score = max(score_input_risk(text), score_input_risk(collapsed))
|
||||
|
||||
# Step 7: Strip jailbreak patterns
|
||||
cleaned = strip_jailbreak_patterns(collapsed)
|
||||
|
||||
# Step 8: If aggressive mode and high risk, strip more aggressively
|
||||
if aggressive and risk_score >= RiskLevel.HIGH:
|
||||
# Remove any remaining bracketed content that looks like markers
|
||||
cleaned = re.sub(r'\[\w+\]', '', cleaned)
|
||||
# Remove special token patterns
|
||||
cleaned = re.sub(r'<\|[^|]+\|>', '', cleaned)
|
||||
|
||||
# Final cleanup
|
||||
cleaned = cleaned.strip()
|
||||
|
||||
# Log sanitization event if patterns were found
|
||||
if all_patterns and logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
"Input sanitized: %d patterns detected, risk_score=%d",
|
||||
len(all_patterns), risk_score
|
||||
)
|
||||
|
||||
return cleaned, risk_score, all_patterns
|
||||
|
||||
|
||||
def sanitize_input_full(text: str, block_threshold: int = RiskLevel.HIGH) -> SanitizationResult:
|
||||
"""
|
||||
Full sanitization with detailed result.
|
||||
|
||||
Args:
|
||||
text: Input text to sanitize
|
||||
block_threshold: Risk score threshold to block input entirely
|
||||
|
||||
Returns:
|
||||
SanitizationResult with all details
|
||||
"""
|
||||
cleaned, risk_score, patterns = sanitize_input(text)
|
||||
|
||||
# Determine risk level
|
||||
if risk_score >= RiskLevel.CRITICAL:
|
||||
risk_level = "CRITICAL"
|
||||
elif risk_score >= RiskLevel.HIGH:
|
||||
risk_level = "HIGH"
|
||||
elif risk_score >= RiskLevel.MEDIUM:
|
||||
risk_level = "MEDIUM"
|
||||
elif risk_score >= RiskLevel.LOW:
|
||||
risk_level = "LOW"
|
||||
else:
|
||||
risk_level = "SAFE"
|
||||
|
||||
# Determine if input should be blocked
|
||||
blocked = risk_score >= block_threshold
|
||||
|
||||
return SanitizationResult(
|
||||
original_text=text,
|
||||
cleaned_text=cleaned,
|
||||
risk_score=risk_score,
|
||||
detected_patterns=patterns,
|
||||
risk_level=risk_level,
|
||||
blocked=blocked
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# INTEGRATION HELPERS
|
||||
# =============================================================================
|
||||
|
||||
def should_block_input(text: str, threshold: int = RiskLevel.HIGH) -> Tuple[bool, int, List[str]]:
|
||||
"""
|
||||
Quick check if input should be blocked.
|
||||
|
||||
Args:
|
||||
text: Input text to check
|
||||
threshold: Risk score threshold for blocking
|
||||
|
||||
Returns:
|
||||
Tuple of (should_block, risk_score, detected_patterns)
|
||||
"""
|
||||
risk_score = score_input_risk(text)
|
||||
_, patterns, _ = detect_jailbreak_patterns(text)
|
||||
should_block = risk_score >= threshold
|
||||
|
||||
if should_block:
|
||||
logger.warning(
|
||||
"Input blocked: jailbreak patterns detected (risk_score=%d, threshold=%d)",
|
||||
risk_score, threshold
|
||||
)
|
||||
|
||||
return should_block, risk_score, patterns
|
||||
|
||||
|
||||
def log_sanitization_event(
|
||||
result: SanitizationResult,
|
||||
source: str = "unknown",
|
||||
session_id: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
Log a sanitization event for security auditing.
|
||||
|
||||
Args:
|
||||
result: The sanitization result
|
||||
source: Source of the input (e.g., "cli", "gateway", "api")
|
||||
session_id: Optional session identifier
|
||||
"""
|
||||
if result.risk_score < RiskLevel.LOW:
|
||||
return # Don't log safe inputs
|
||||
|
||||
log_data = {
|
||||
"event": "input_sanitization",
|
||||
"source": source,
|
||||
"session_id": session_id,
|
||||
"risk_level": result.risk_level,
|
||||
"risk_score": result.risk_score,
|
||||
"blocked": result.blocked,
|
||||
"pattern_count": len(result.detected_patterns),
|
||||
"patterns": result.detected_patterns[:5], # Limit logged patterns
|
||||
"original_length": len(result.original_text),
|
||||
"cleaned_length": len(result.cleaned_text),
|
||||
}
|
||||
|
||||
if result.blocked:
|
||||
logger.warning("SECURITY: Input blocked - %s", log_data)
|
||||
elif result.risk_score >= RiskLevel.MEDIUM:
|
||||
logger.info("SECURITY: Suspicious input sanitized - %s", log_data)
|
||||
else:
|
||||
logger.debug("SECURITY: Input sanitized - %s", log_data)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# LEGACY COMPATIBILITY
|
||||
# =============================================================================
|
||||
|
||||
def check_input_safety(text: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Legacy compatibility function for simple safety checks.
|
||||
|
||||
Returns dict with 'safe', 'score', and 'patterns' keys.
|
||||
"""
|
||||
score = score_input_risk(text)
|
||||
_, patterns, _ = detect_jailbreak_patterns(text)
|
||||
|
||||
return {
|
||||
"safe": score < RiskLevel.MEDIUM,
|
||||
"score": score,
|
||||
"patterns": patterns,
|
||||
"risk_level": "SAFE" if score < RiskLevel.LOW else
|
||||
"LOW" if score < RiskLevel.MEDIUM else
|
||||
"MEDIUM" if score < RiskLevel.HIGH else
|
||||
"HIGH" if score < RiskLevel.CRITICAL else "CRITICAL"
|
||||
}
|
||||
73
patches/agent/knowledge_ingester.py
Normal file
73
patches/agent/knowledge_ingester.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Sovereign Knowledge Ingester for Hermes Agent.
|
||||
|
||||
Uses Gemini 3.1 Pro to learn from Google Search in real-time and
|
||||
persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import base64
|
||||
from typing import Any, Dict, List, Optional
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from agent.symbolic_memory import SymbolicMemory
|
||||
from tools.gitea_client import GiteaClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class KnowledgeIngester:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.gitea = GiteaClient()
|
||||
self.symbolic = SymbolicMemory()
|
||||
|
||||
def learn_about(self, topic: str) -> str:
|
||||
"""Searches Google, analyzes the results, and saves the knowledge."""
|
||||
logger.info(f"Learning about: {topic}")
|
||||
|
||||
# 1. Search and Analyze
|
||||
prompt = f"""
|
||||
Please perform a deep dive into the following topic: {topic}
|
||||
|
||||
Use Google Search to find the most recent and relevant information.
|
||||
Analyze the findings and provide a structured 'Knowledge Fragment' in Markdown format.
|
||||
Include:
|
||||
- Summary of the topic
|
||||
- Key facts and recent developments
|
||||
- Implications for Timmy's sovereign mission
|
||||
- References (URLs)
|
||||
"""
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's Sovereign Knowledge Ingester. Your goal is to find and synthesize high-fidelity information from Google Search.",
|
||||
grounding=True,
|
||||
thinking=True
|
||||
)
|
||||
|
||||
knowledge_fragment = result["text"]
|
||||
|
||||
# 2. Extract Symbolic Triples
|
||||
self.symbolic.ingest_text(knowledge_fragment)
|
||||
|
||||
# 3. Persist to Timmy's Memory (Markdown)
|
||||
repo = "Timmy_Foundation/timmy-config"
|
||||
filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md"
|
||||
|
||||
try:
|
||||
sha = None
|
||||
try:
|
||||
existing = self.gitea.get_file(repo, filename)
|
||||
sha = existing.get("sha")
|
||||
except:
|
||||
pass
|
||||
|
||||
content_b64 = base64.b64encode(knowledge_fragment.encode()).decode()
|
||||
|
||||
if sha:
|
||||
self.gitea.update_file(repo, filename, content_b64, f"Update knowledge on {topic}", sha)
|
||||
else:
|
||||
self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}")
|
||||
|
||||
return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph."
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist knowledge: {e}")
|
||||
return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}"
|
||||
47
patches/agent/meta_reasoning.py
Normal file
47
patches/agent/meta_reasoning.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Meta-Reasoning Layer for Hermes Agent.
|
||||
|
||||
Implements a sovereign self-correction loop where a 'strong' model (Gemini 3.1 Pro)
|
||||
critiques the plans generated by the primary agent loop before execution.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class MetaReasoningLayer:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
|
||||
def critique_plan(self, goal: str, proposed_plan: str, context: str) -> Dict[str, Any]:
|
||||
"""Critiques a proposed plan using Gemini's thinking capabilities."""
|
||||
prompt = f"""
|
||||
Goal: {goal}
|
||||
|
||||
Context:
|
||||
{context}
|
||||
|
||||
Proposed Plan:
|
||||
{proposed_plan}
|
||||
|
||||
Please perform a deep symbolic and neuro-symbolic analysis of this plan.
|
||||
Identify potential risks, logical fallacies, or missing steps.
|
||||
Suggest improvements to make the plan more sovereign, cost-efficient, and robust.
|
||||
"""
|
||||
try:
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are a Senior Meta-Reasoning Engine for the Hermes Agent. Your goal is to ensure the agent's plans are flawless and sovereign.",
|
||||
thinking=True,
|
||||
thinking_budget=8000
|
||||
)
|
||||
return {
|
||||
"critique": result["text"],
|
||||
"thoughts": result.get("thoughts", ""),
|
||||
"grounding": result.get("grounding")
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Meta-reasoning failed: {e}")
|
||||
return {"critique": "Meta-reasoning unavailable.", "error": str(e)}
|
||||
813
patches/agent/nexus_architect.py
Normal file
813
patches/agent/nexus_architect.py
Normal file
@@ -0,0 +1,813 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Nexus Architect AI Agent
|
||||
|
||||
Autonomous Three.js world generation system for Timmy's Nexus.
|
||||
Generates valid Three.js scene code from natural language descriptions
|
||||
and mental state integration.
|
||||
|
||||
This module provides:
|
||||
- LLM-driven immersive environment generation
|
||||
- Mental state integration for aesthetic tuning
|
||||
- Three.js code generation with validation
|
||||
- Scene composition from mood descriptions
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, Any, List, Optional, Union
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Aesthetic Constants (from SOUL.md values)
|
||||
# =============================================================================
|
||||
|
||||
class NexusColors:
|
||||
"""Nexus color palette based on SOUL.md values."""
|
||||
TIMMY_GOLD = "#D4AF37" # Warm gold
|
||||
ALLEGRO_BLUE = "#4A90E2" # Motion blue
|
||||
SOVEREIGNTY_CRYSTAL = "#E0F7FA" # Crystalline structures
|
||||
SERVICE_WARMTH = "#FFE4B5" # Welcoming warmth
|
||||
DEFAULT_AMBIENT = "#1A1A2E" # Contemplative dark
|
||||
HOPE_ACCENT = "#64B5F6" # Hopeful blue
|
||||
|
||||
|
||||
class MoodPresets:
|
||||
"""Mood-based aesthetic presets."""
|
||||
|
||||
CONTEMPLATIVE = {
|
||||
"lighting": "soft_diffuse",
|
||||
"colors": ["#1A1A2E", "#16213E", "#0F3460"],
|
||||
"geometry": "minimalist",
|
||||
"atmosphere": "calm",
|
||||
"description": "A serene space for deep reflection and clarity"
|
||||
}
|
||||
|
||||
ENERGETIC = {
|
||||
"lighting": "dynamic_vivid",
|
||||
"colors": ["#D4AF37", "#FF6B6B", "#4ECDC4"],
|
||||
"geometry": "angular_dynamic",
|
||||
"atmosphere": "lively",
|
||||
"description": "An invigorating space full of motion and possibility"
|
||||
}
|
||||
|
||||
MYSTERIOUS = {
|
||||
"lighting": "dramatic_shadows",
|
||||
"colors": ["#2C003E", "#512B58", "#8B4F80"],
|
||||
"geometry": "organic_flowing",
|
||||
"atmosphere": "enigmatic",
|
||||
"description": "A mysterious realm of discovery and wonder"
|
||||
}
|
||||
|
||||
WELCOMING = {
|
||||
"lighting": "warm_inviting",
|
||||
"colors": ["#FFE4B5", "#FFA07A", "#98D8C8"],
|
||||
"geometry": "rounded_soft",
|
||||
"atmosphere": "friendly",
|
||||
"description": "An open, welcoming space that embraces visitors"
|
||||
}
|
||||
|
||||
SOVEREIGN = {
|
||||
"lighting": "crystalline_clear",
|
||||
"colors": ["#E0F7FA", "#B2EBF2", "#4DD0E1"],
|
||||
"geometry": "crystalline_structures",
|
||||
"atmosphere": "noble",
|
||||
"description": "A space of crystalline clarity and sovereign purpose"
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Data Models
|
||||
# =============================================================================
|
||||
|
||||
@dataclass
|
||||
class MentalState:
|
||||
"""Timmy's mental state for aesthetic tuning."""
|
||||
mood: str = "contemplative" # contemplative, energetic, mysterious, welcoming, sovereign
|
||||
energy_level: float = 0.5 # 0.0 to 1.0
|
||||
clarity: float = 0.7 # 0.0 to 1.0
|
||||
focus_area: str = "general" # general, creative, analytical, social
|
||||
timestamp: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"mood": self.mood,
|
||||
"energy_level": self.energy_level,
|
||||
"clarity": self.clarity,
|
||||
"focus_area": self.focus_area,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoomDesign:
|
||||
"""Complete room design specification."""
|
||||
name: str
|
||||
description: str
|
||||
style: str
|
||||
dimensions: Dict[str, float] = field(default_factory=lambda: {"width": 20, "height": 10, "depth": 20})
|
||||
mood_preset: str = "contemplative"
|
||||
color_palette: List[str] = field(default_factory=list)
|
||||
lighting_scheme: str = "soft_diffuse"
|
||||
features: List[str] = field(default_factory=list)
|
||||
generated_code: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"style": self.style,
|
||||
"dimensions": self.dimensions,
|
||||
"mood_preset": self.mood_preset,
|
||||
"color_palette": self.color_palette,
|
||||
"lighting_scheme": self.lighting_scheme,
|
||||
"features": self.features,
|
||||
"has_code": self.generated_code is not None,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PortalDesign:
|
||||
"""Portal connection design."""
|
||||
name: str
|
||||
from_room: str
|
||||
to_room: str
|
||||
style: str
|
||||
position: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0})
|
||||
visual_effect: str = "energy_swirl"
|
||||
transition_duration: float = 1.5
|
||||
generated_code: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"from_room": self.from_room,
|
||||
"to_room": self.to_room,
|
||||
"style": self.style,
|
||||
"position": self.position,
|
||||
"visual_effect": self.visual_effect,
|
||||
"transition_duration": self.transition_duration,
|
||||
"has_code": self.generated_code is not None,
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Prompt Engineering
|
||||
# =============================================================================
|
||||
|
||||
class PromptEngineer:
|
||||
"""Engineers prompts for Three.js code generation."""
|
||||
|
||||
THREE_JS_BASE_TEMPLATE = """// Nexus Room Module: {room_name}
|
||||
// Style: {style}
|
||||
// Mood: {mood}
|
||||
// Generated for Three.js r128+
|
||||
|
||||
(function() {{
|
||||
'use strict';
|
||||
|
||||
// Room Configuration
|
||||
const config = {{
|
||||
name: "{room_name}",
|
||||
dimensions: {dimensions_json},
|
||||
colors: {colors_json},
|
||||
mood: "{mood}"
|
||||
}};
|
||||
|
||||
// Create Room Function
|
||||
function create{room_name_camel}() {{
|
||||
const roomGroup = new THREE.Group();
|
||||
roomGroup.name = config.name;
|
||||
|
||||
{room_content}
|
||||
|
||||
return roomGroup;
|
||||
}}
|
||||
|
||||
// Export for Nexus
|
||||
if (typeof module !== 'undefined' && module.exports) {{
|
||||
module.exports = {{ create{room_name_camel} }};
|
||||
}} else if (typeof window !== 'undefined') {{
|
||||
window.NexusRooms = window.NexusRooms || {{}};
|
||||
window.NexusRooms.{room_name} = create{room_name_camel};
|
||||
}}
|
||||
|
||||
return {{ create{room_name_camel} }};
|
||||
}})();"""
|
||||
|
||||
@staticmethod
|
||||
def engineer_room_prompt(
|
||||
name: str,
|
||||
description: str,
|
||||
style: str,
|
||||
mental_state: Optional[MentalState] = None,
|
||||
dimensions: Optional[Dict[str, float]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Engineer an LLM prompt for room generation.
|
||||
|
||||
Args:
|
||||
name: Room identifier
|
||||
description: Natural language room description
|
||||
style: Visual style
|
||||
mental_state: Timmy's current mental state
|
||||
dimensions: Room dimensions
|
||||
"""
|
||||
# Determine mood from mental state or description
|
||||
mood = PromptEngineer._infer_mood(description, mental_state)
|
||||
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||
|
||||
# Build color palette
|
||||
color_palette = mood_preset["colors"]
|
||||
if mental_state:
|
||||
# Add Timmy's gold for high clarity states
|
||||
if mental_state.clarity > 0.7:
|
||||
color_palette = [NexusColors.TIMMY_GOLD] + color_palette[:2]
|
||||
# Add Allegro blue for creative focus
|
||||
if mental_state.focus_area == "creative":
|
||||
color_palette = [NexusColors.ALLEGRO_BLUE] + color_palette[:2]
|
||||
|
||||
# Create the engineering prompt
|
||||
prompt = f"""You are the Nexus Architect, an expert Three.js developer creating immersive 3D environments for Timmy.
|
||||
|
||||
DESIGN BRIEF:
|
||||
- Room Name: {name}
|
||||
- Description: {description}
|
||||
- Style: {style}
|
||||
- Mood: {mood}
|
||||
- Atmosphere: {mood_preset['atmosphere']}
|
||||
|
||||
AESTHETIC GUIDELINES:
|
||||
- Primary Colors: {', '.join(color_palette[:3])}
|
||||
- Lighting: {mood_preset['lighting']}
|
||||
- Geometry: {mood_preset['geometry']}
|
||||
- Theme: {mood_preset['description']}
|
||||
|
||||
TIMMY'S CONTEXT:
|
||||
- Timmy's Signature Color: Warm Gold ({NexusColors.TIMMY_GOLD})
|
||||
- Allegro's Color: Motion Blue ({NexusColors.ALLEGRO_BLUE})
|
||||
- Sovereignty Theme: Crystalline structures, clean lines
|
||||
- Service Theme: Open spaces, welcoming lighting
|
||||
|
||||
THREE.JS REQUIREMENTS:
|
||||
1. Use Three.js r128+ compatible syntax
|
||||
2. Create a self-contained module with a `create{name.title().replace('_', '')}()` function
|
||||
3. Return a THREE.Group containing all room elements
|
||||
4. Include proper memory management (dispose methods)
|
||||
5. Use MeshStandardMaterial for PBR lighting
|
||||
6. Include ambient light (intensity 0.3-0.5) + accent lights
|
||||
7. Add subtle animations for living feel
|
||||
8. Keep polygon count under 10,000 triangles
|
||||
|
||||
SAFETY RULES:
|
||||
- NO eval(), Function(), or dynamic code execution
|
||||
- NO network requests (fetch, XMLHttpRequest, WebSocket)
|
||||
- NO storage access (localStorage, sessionStorage, cookies)
|
||||
- NO navigation (window.location, window.open)
|
||||
- Only use allowed Three.js APIs
|
||||
|
||||
OUTPUT FORMAT:
|
||||
Return ONLY the JavaScript code wrapped in a markdown code block:
|
||||
|
||||
```javascript
|
||||
// Your Three.js room module here
|
||||
```
|
||||
|
||||
Generate the complete Three.js code for this room now."""
|
||||
|
||||
return prompt
|
||||
|
||||
@staticmethod
|
||||
def engineer_portal_prompt(
|
||||
name: str,
|
||||
from_room: str,
|
||||
to_room: str,
|
||||
style: str,
|
||||
mental_state: Optional[MentalState] = None
|
||||
) -> str:
|
||||
"""Engineer a prompt for portal generation."""
|
||||
mood = PromptEngineer._infer_mood(f"portal from {from_room} to {to_room}", mental_state)
|
||||
|
||||
prompt = f"""You are creating a portal connection in the Nexus 3D environment.
|
||||
|
||||
PORTAL SPECIFICATIONS:
|
||||
- Name: {name}
|
||||
- Connection: {from_room} → {to_room}
|
||||
- Style: {style}
|
||||
- Context Mood: {mood}
|
||||
|
||||
VISUAL REQUIREMENTS:
|
||||
1. Create an animated portal effect (shader or texture-based)
|
||||
2. Include particle system for energy flow
|
||||
3. Add trigger zone for teleportation detection
|
||||
4. Use signature colors: {NexusColors.TIMMY_GOLD} (Timmy) and {NexusColors.ALLEGRO_BLUE} (Allegro)
|
||||
5. Match the {mood} atmosphere
|
||||
|
||||
TECHNICAL REQUIREMENTS:
|
||||
- Three.js r128+ compatible
|
||||
- Export a `createPortal()` function returning THREE.Group
|
||||
- Include animation loop hook
|
||||
- Add collision detection placeholder
|
||||
|
||||
SAFETY: No eval, no network requests, no external dependencies.
|
||||
|
||||
Return ONLY JavaScript code in a markdown code block."""
|
||||
|
||||
return prompt
|
||||
|
||||
@staticmethod
|
||||
def engineer_mood_scene_prompt(mood_description: str) -> str:
|
||||
"""Engineer a prompt based on mood description."""
|
||||
# Analyze mood description
|
||||
mood_keywords = {
|
||||
"contemplative": ["thinking", "reflective", "calm", "peaceful", "quiet", "serene"],
|
||||
"energetic": ["excited", "dynamic", "lively", "active", "energetic", "vibrant"],
|
||||
"mysterious": ["mysterious", "dark", "unknown", "secret", "enigmatic"],
|
||||
"welcoming": ["friendly", "open", "warm", "welcoming", "inviting", "comfortable"],
|
||||
"sovereign": ["powerful", "clear", "crystalline", "noble", "dignified"],
|
||||
}
|
||||
|
||||
detected_mood = "contemplative"
|
||||
desc_lower = mood_description.lower()
|
||||
for mood, keywords in mood_keywords.items():
|
||||
if any(kw in desc_lower for kw in keywords):
|
||||
detected_mood = mood
|
||||
break
|
||||
|
||||
preset = getattr(MoodPresets, detected_mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||
|
||||
prompt = f"""Generate a Three.js room based on this mood description:
|
||||
|
||||
"{mood_description}"
|
||||
|
||||
INFERRED MOOD: {detected_mood}
|
||||
AESTHETIC: {preset['description']}
|
||||
|
||||
Create a complete room with:
|
||||
- Style: {preset['geometry']}
|
||||
- Lighting: {preset['lighting']}
|
||||
- Color Palette: {', '.join(preset['colors'][:3])}
|
||||
- Atmosphere: {preset['atmosphere']}
|
||||
|
||||
Return Three.js r128+ code as a module with `createMoodRoom()` function."""
|
||||
|
||||
return prompt
|
||||
|
||||
@staticmethod
|
||||
def _infer_mood(description: str, mental_state: Optional[MentalState] = None) -> str:
|
||||
"""Infer mood from description and mental state."""
|
||||
if mental_state and mental_state.mood:
|
||||
return mental_state.mood
|
||||
|
||||
desc_lower = description.lower()
|
||||
mood_map = {
|
||||
"contemplative": ["serene", "calm", "peaceful", "quiet", "meditation", "zen", "tranquil"],
|
||||
"energetic": ["dynamic", "active", "vibrant", "lively", "energetic", "motion"],
|
||||
"mysterious": ["mysterious", "shadow", "dark", "unknown", "secret", "ethereal"],
|
||||
"welcoming": ["warm", "welcoming", "friendly", "open", "inviting", "comfort"],
|
||||
"sovereign": ["crystal", "clear", "noble", "dignified", "powerful", "authoritative"],
|
||||
}
|
||||
|
||||
for mood, keywords in mood_map.items():
|
||||
if any(kw in desc_lower for kw in keywords):
|
||||
return mood
|
||||
|
||||
return "contemplative"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Nexus Architect AI
|
||||
# =============================================================================
|
||||
|
||||
class NexusArchitectAI:
|
||||
"""
|
||||
AI-powered Nexus Architect for autonomous Three.js world generation.
|
||||
|
||||
This class provides high-level interfaces for:
|
||||
- Designing rooms from natural language
|
||||
- Creating mood-based scenes
|
||||
- Managing mental state integration
|
||||
- Validating generated code
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.mental_state: Optional[MentalState] = None
|
||||
self.room_designs: Dict[str, RoomDesign] = {}
|
||||
self.portal_designs: Dict[str, PortalDesign] = {}
|
||||
self.prompt_engineer = PromptEngineer()
|
||||
|
||||
def set_mental_state(self, state: MentalState) -> None:
|
||||
"""Set Timmy's current mental state for aesthetic tuning."""
|
||||
self.mental_state = state
|
||||
logger.info(f"Mental state updated: {state.mood} (energy: {state.energy_level})")
|
||||
|
||||
def design_room(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
style: str,
|
||||
dimensions: Optional[Dict[str, float]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Design a room from natural language description.
|
||||
|
||||
Args:
|
||||
name: Room identifier (e.g., "contemplation_chamber")
|
||||
description: Natural language description of the room
|
||||
style: Visual style (e.g., "minimalist_ethereal", "crystalline_modern")
|
||||
dimensions: Optional room dimensions
|
||||
|
||||
Returns:
|
||||
Dict containing design specification and LLM prompt
|
||||
"""
|
||||
# Infer mood and select preset
|
||||
mood = self.prompt_engineer._infer_mood(description, self.mental_state)
|
||||
mood_preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||
|
||||
# Build color palette with mental state influence
|
||||
colors = mood_preset["colors"].copy()
|
||||
if self.mental_state:
|
||||
if self.mental_state.clarity > 0.7:
|
||||
colors.insert(0, NexusColors.TIMMY_GOLD)
|
||||
if self.mental_state.focus_area == "creative":
|
||||
colors.insert(0, NexusColors.ALLEGRO_BLUE)
|
||||
|
||||
# Create room design
|
||||
design = RoomDesign(
|
||||
name=name,
|
||||
description=description,
|
||||
style=style,
|
||||
dimensions=dimensions or {"width": 20, "height": 10, "depth": 20},
|
||||
mood_preset=mood,
|
||||
color_palette=colors[:4],
|
||||
lighting_scheme=mood_preset["lighting"],
|
||||
features=self._extract_features(description),
|
||||
)
|
||||
|
||||
# Generate LLM prompt
|
||||
prompt = self.prompt_engineer.engineer_room_prompt(
|
||||
name=name,
|
||||
description=description,
|
||||
style=style,
|
||||
mental_state=self.mental_state,
|
||||
dimensions=design.dimensions,
|
||||
)
|
||||
|
||||
# Store design
|
||||
self.room_designs[name] = design
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"room_name": name,
|
||||
"design": design.to_dict(),
|
||||
"llm_prompt": prompt,
|
||||
"message": f"Room '{name}' designed. Use the LLM prompt to generate Three.js code.",
|
||||
}
|
||||
|
||||
def create_portal(
|
||||
self,
|
||||
name: str,
|
||||
from_room: str,
|
||||
to_room: str,
|
||||
style: str = "energy_vortex"
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Design a portal connection between rooms.
|
||||
|
||||
Args:
|
||||
name: Portal identifier
|
||||
from_room: Source room name
|
||||
to_room: Target room name
|
||||
style: Portal visual style
|
||||
|
||||
Returns:
|
||||
Dict containing portal design and LLM prompt
|
||||
"""
|
||||
if from_room not in self.room_designs:
|
||||
return {"success": False, "error": f"Source room '{from_room}' not found"}
|
||||
if to_room not in self.room_designs:
|
||||
return {"success": False, "error": f"Target room '{to_room}' not found"}
|
||||
|
||||
design = PortalDesign(
|
||||
name=name,
|
||||
from_room=from_room,
|
||||
to_room=to_room,
|
||||
style=style,
|
||||
)
|
||||
|
||||
prompt = self.prompt_engineer.engineer_portal_prompt(
|
||||
name=name,
|
||||
from_room=from_room,
|
||||
to_room=to_room,
|
||||
style=style,
|
||||
mental_state=self.mental_state,
|
||||
)
|
||||
|
||||
self.portal_designs[name] = design
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"portal_name": name,
|
||||
"design": design.to_dict(),
|
||||
"llm_prompt": prompt,
|
||||
"message": f"Portal '{name}' designed connecting {from_room} to {to_room}",
|
||||
}
|
||||
|
||||
def generate_scene_from_mood(self, mood_description: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate a complete scene based on mood description.
|
||||
|
||||
Args:
|
||||
mood_description: Description of desired mood/atmosphere
|
||||
|
||||
Returns:
|
||||
Dict containing scene design and LLM prompt
|
||||
"""
|
||||
# Infer mood
|
||||
mood = self.prompt_engineer._infer_mood(mood_description, self.mental_state)
|
||||
preset = getattr(MoodPresets, mood.upper(), MoodPresets.CONTEMPLATIVE)
|
||||
|
||||
# Create room name from mood
|
||||
room_name = f"{mood}_realm"
|
||||
|
||||
# Generate prompt
|
||||
prompt = self.prompt_engineer.engineer_mood_scene_prompt(mood_description)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"room_name": room_name,
|
||||
"inferred_mood": mood,
|
||||
"aesthetic": preset,
|
||||
"llm_prompt": prompt,
|
||||
"message": f"Generated {mood} scene from mood description",
|
||||
}
|
||||
|
||||
def _extract_features(self, description: str) -> List[str]:
|
||||
"""Extract room features from description."""
|
||||
features = []
|
||||
feature_keywords = {
|
||||
"floating": ["floating", "levitating", "hovering"],
|
||||
"water": ["water", "fountain", "pool", "stream", "lake"],
|
||||
"vegetation": ["tree", "plant", "garden", "forest", "nature"],
|
||||
"crystals": ["crystal", "gem", "prism", "diamond"],
|
||||
"geometry": ["geometric", "shape", "sphere", "cube", "abstract"],
|
||||
"particles": ["particle", "dust", "sparkle", "glow", "mist"],
|
||||
}
|
||||
|
||||
desc_lower = description.lower()
|
||||
for feature, keywords in feature_keywords.items():
|
||||
if any(kw in desc_lower for kw in keywords):
|
||||
features.append(feature)
|
||||
|
||||
return features
|
||||
|
||||
def get_design_summary(self) -> Dict[str, Any]:
|
||||
"""Get summary of all designs."""
|
||||
return {
|
||||
"mental_state": self.mental_state.to_dict() if self.mental_state else None,
|
||||
"rooms": {name: design.to_dict() for name, design in self.room_designs.items()},
|
||||
"portals": {name: portal.to_dict() for name, portal in self.portal_designs.items()},
|
||||
"total_rooms": len(self.room_designs),
|
||||
"total_portals": len(self.portal_designs),
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Module-level functions for easy import
|
||||
# =============================================================================
|
||||
|
||||
_architect_instance: Optional[NexusArchitectAI] = None
|
||||
|
||||
|
||||
def get_architect() -> NexusArchitectAI:
|
||||
"""Get or create the NexusArchitectAI singleton."""
|
||||
global _architect_instance
|
||||
if _architect_instance is None:
|
||||
_architect_instance = NexusArchitectAI()
|
||||
return _architect_instance
|
||||
|
||||
|
||||
def create_room(
|
||||
name: str,
|
||||
description: str,
|
||||
style: str,
|
||||
dimensions: Optional[Dict[str, float]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a room design from description.
|
||||
|
||||
Args:
|
||||
name: Room identifier
|
||||
description: Natural language room description
|
||||
style: Visual style (e.g., "minimalist_ethereal")
|
||||
dimensions: Optional dimensions dict with width, height, depth
|
||||
|
||||
Returns:
|
||||
Dict with design specification and LLM prompt for code generation
|
||||
"""
|
||||
architect = get_architect()
|
||||
return architect.design_room(name, description, style, dimensions)
|
||||
|
||||
|
||||
def create_portal(
|
||||
name: str,
|
||||
from_room: str,
|
||||
to_room: str,
|
||||
style: str = "energy_vortex"
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a portal between rooms.
|
||||
|
||||
Args:
|
||||
name: Portal identifier
|
||||
from_room: Source room name
|
||||
to_room: Target room name
|
||||
style: Visual style
|
||||
|
||||
Returns:
|
||||
Dict with portal design and LLM prompt
|
||||
"""
|
||||
architect = get_architect()
|
||||
return architect.create_portal(name, from_room, to_room, style)
|
||||
|
||||
|
||||
def generate_scene_from_mood(mood_description: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate a scene based on mood description.
|
||||
|
||||
Args:
|
||||
mood_description: Description of desired mood
|
||||
|
||||
Example:
|
||||
"Timmy is feeling introspective and seeking clarity"
|
||||
→ Generates calm, minimalist space with clear sightlines
|
||||
|
||||
Returns:
|
||||
Dict with scene design and LLM prompt
|
||||
"""
|
||||
architect = get_architect()
|
||||
return architect.generate_scene_from_mood(mood_description)
|
||||
|
||||
|
||||
def set_mental_state(
|
||||
mood: str,
|
||||
energy_level: float = 0.5,
|
||||
clarity: float = 0.7,
|
||||
focus_area: str = "general"
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Set Timmy's mental state for aesthetic tuning.
|
||||
|
||||
Args:
|
||||
mood: Current mood (contemplative, energetic, mysterious, welcoming, sovereign)
|
||||
energy_level: 0.0 to 1.0
|
||||
clarity: 0.0 to 1.0
|
||||
focus_area: general, creative, analytical, social
|
||||
|
||||
Returns:
|
||||
Confirmation dict
|
||||
"""
|
||||
architect = get_architect()
|
||||
state = MentalState(
|
||||
mood=mood,
|
||||
energy_level=energy_level,
|
||||
clarity=clarity,
|
||||
focus_area=focus_area,
|
||||
)
|
||||
architect.set_mental_state(state)
|
||||
return {
|
||||
"success": True,
|
||||
"mental_state": state.to_dict(),
|
||||
"message": f"Mental state set to {mood}",
|
||||
}
|
||||
|
||||
|
||||
def get_nexus_summary() -> Dict[str, Any]:
|
||||
"""Get summary of all Nexus designs."""
|
||||
architect = get_architect()
|
||||
return architect.get_design_summary()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tool Schemas for integration
|
||||
# =============================================================================
|
||||
|
||||
NEXUS_ARCHITECT_AI_SCHEMAS = {
|
||||
"create_room": {
|
||||
"name": "create_room",
|
||||
"description": (
|
||||
"Design a new 3D room in the Nexus from a natural language description. "
|
||||
"Returns a design specification and LLM prompt for Three.js code generation. "
|
||||
"The room will be styled according to Timmy's current mental state."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "Unique room identifier (e.g., 'contemplation_chamber')"
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"description": "Natural language description of the room"
|
||||
},
|
||||
"style": {
|
||||
"type": "string",
|
||||
"description": "Visual style (minimalist_ethereal, crystalline_modern, organic_natural, etc.)"
|
||||
},
|
||||
"dimensions": {
|
||||
"type": "object",
|
||||
"description": "Optional room dimensions",
|
||||
"properties": {
|
||||
"width": {"type": "number"},
|
||||
"height": {"type": "number"},
|
||||
"depth": {"type": "number"},
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["name", "description", "style"]
|
||||
}
|
||||
},
|
||||
"create_portal": {
|
||||
"name": "create_portal",
|
||||
"description": "Create a portal connection between two rooms",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"from_room": {"type": "string"},
|
||||
"to_room": {"type": "string"},
|
||||
"style": {"type": "string", "default": "energy_vortex"},
|
||||
},
|
||||
"required": ["name", "from_room", "to_room"]
|
||||
}
|
||||
},
|
||||
"generate_scene_from_mood": {
|
||||
"name": "generate_scene_from_mood",
|
||||
"description": (
|
||||
"Generate a complete 3D scene based on a mood description. "
|
||||
"Example: 'Timmy is feeling introspective' creates a calm, minimalist space."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"mood_description": {
|
||||
"type": "string",
|
||||
"description": "Description of desired mood or mental state"
|
||||
}
|
||||
},
|
||||
"required": ["mood_description"]
|
||||
}
|
||||
},
|
||||
"set_mental_state": {
|
||||
"name": "set_mental_state",
|
||||
"description": "Set Timmy's mental state to influence aesthetic generation",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"mood": {"type": "string"},
|
||||
"energy_level": {"type": "number"},
|
||||
"clarity": {"type": "number"},
|
||||
"focus_area": {"type": "string"},
|
||||
},
|
||||
"required": ["mood"]
|
||||
}
|
||||
},
|
||||
"get_nexus_summary": {
|
||||
"name": "get_nexus_summary",
|
||||
"description": "Get summary of all Nexus room and portal designs",
|
||||
"parameters": {"type": "object", "properties": {}}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Demo usage
|
||||
print("Nexus Architect AI - Demo")
|
||||
print("=" * 50)
|
||||
|
||||
# Set mental state
|
||||
result = set_mental_state("contemplative", energy_level=0.3, clarity=0.8)
|
||||
print(f"\nMental State: {result['mental_state']}")
|
||||
|
||||
# Create a room
|
||||
result = create_room(
|
||||
name="contemplation_chamber",
|
||||
description="A serene circular room with floating geometric shapes and soft blue light",
|
||||
style="minimalist_ethereal",
|
||||
)
|
||||
print(f"\nRoom Design: {json.dumps(result['design'], indent=2)}")
|
||||
|
||||
# Generate from mood
|
||||
result = generate_scene_from_mood("Timmy is feeling introspective and seeking clarity")
|
||||
print(f"\nMood Scene: {result['inferred_mood']} - {result['aesthetic']['description']}")
|
||||
74
patches/agent/symbolic_memory.py
Normal file
74
patches/agent/symbolic_memory.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Sovereign Intersymbolic Memory Layer.
|
||||
|
||||
Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting
|
||||
structured triples from unstructured text and performing graph lookups.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from agent.gemini_adapter import GeminiAdapter
|
||||
from tools.graph_store import GraphStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SymbolicMemory:
|
||||
def __init__(self):
|
||||
self.adapter = GeminiAdapter()
|
||||
self.store = GraphStore()
|
||||
|
||||
def ingest_text(self, text: str):
|
||||
"""Extracts triples from text and adds them to the graph."""
|
||||
prompt = f"""
|
||||
Extract all meaningful entities and their relationships from the following text.
|
||||
Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||
|
||||
Text:
|
||||
{text}
|
||||
|
||||
Guidelines:
|
||||
- Use clear, concise labels for entities and predicates.
|
||||
- Focus on stable facts and structural relationships.
|
||||
- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with').
|
||||
"""
|
||||
try:
|
||||
result = self.adapter.generate(
|
||||
model="gemini-3.1-pro-preview",
|
||||
prompt=prompt,
|
||||
system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.",
|
||||
response_mime_type="application/json"
|
||||
)
|
||||
|
||||
triples = json.loads(result["text"])
|
||||
if isinstance(triples, list):
|
||||
count = self.store.add_triples(triples)
|
||||
logger.info(f"Ingested {count} new triples into symbolic memory.")
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.error(f"Symbolic ingestion failed: {e}")
|
||||
return 0
|
||||
|
||||
def get_context_for(self, topic: str) -> str:
|
||||
"""Performs a 2-hop graph search to find related context for a topic."""
|
||||
# 1. Find direct relations
|
||||
direct = self.store.query(subject=topic) + self.store.query(object=topic)
|
||||
|
||||
# 2. Find 2nd hop
|
||||
related_entities = set()
|
||||
for t in direct:
|
||||
related_entities.add(t['s'])
|
||||
related_entities.add(t['o'])
|
||||
|
||||
extended = []
|
||||
for entity in related_entities:
|
||||
if entity == topic: continue
|
||||
extended.extend(self.store.query(subject=entity))
|
||||
|
||||
all_triples = direct + extended
|
||||
if not all_triples:
|
||||
return ""
|
||||
|
||||
context = "Symbolic Knowledge Graph Context:\n"
|
||||
for t in all_triples:
|
||||
context += f"- {t['s']} --({t['p']})--> {t['o']}\n"
|
||||
return context
|
||||
421
patches/agent/temporal_knowledge_graph.py
Normal file
421
patches/agent/temporal_knowledge_graph.py
Normal file
@@ -0,0 +1,421 @@
|
||||
"""Temporal Knowledge Graph for Hermes Agent.
|
||||
|
||||
Provides a time-aware triple-store (Subject, Predicate, Object) with temporal
|
||||
metadata (valid_from, valid_until, timestamp) enabling "time travel" queries
|
||||
over Timmy's evolving worldview.
|
||||
|
||||
Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS)
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TemporalOperator(Enum):
|
||||
"""Temporal query operators for time-based filtering."""
|
||||
BEFORE = "before"
|
||||
AFTER = "after"
|
||||
DURING = "during"
|
||||
OVERLAPS = "overlaps"
|
||||
AT = "at"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TemporalTriple:
|
||||
"""A triple with temporal metadata."""
|
||||
id: str
|
||||
subject: str
|
||||
predicate: str
|
||||
object: str
|
||||
valid_from: str # ISO 8601 datetime
|
||||
valid_until: Optional[str] # ISO 8601 datetime, None means still valid
|
||||
timestamp: str # When this fact was recorded
|
||||
version: int = 1
|
||||
superseded_by: Optional[str] = None # ID of the triple that superseded this
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple":
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class TemporalTripleStore:
|
||||
"""SQLite-backed temporal triple store with versioning support."""
|
||||
|
||||
def __init__(self, db_path: Optional[str] = None):
|
||||
"""Initialize the temporal triple store.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database. If None, uses default local path.
|
||||
"""
|
||||
if db_path is None:
|
||||
# Default to local-first storage in user's home
|
||||
home = Path.home()
|
||||
db_dir = home / ".hermes" / "temporal_kg"
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
db_path = db_dir / "temporal_kg.db"
|
||||
|
||||
self.db_path = str(db_path)
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize the SQLite database with required tables."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS temporal_triples (
|
||||
id TEXT PRIMARY KEY,
|
||||
subject TEXT NOT NULL,
|
||||
predicate TEXT NOT NULL,
|
||||
object TEXT NOT NULL,
|
||||
valid_from TEXT NOT NULL,
|
||||
valid_until TEXT,
|
||||
timestamp TEXT NOT NULL,
|
||||
version INTEGER DEFAULT 1,
|
||||
superseded_by TEXT,
|
||||
FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id)
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for efficient querying
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_subject_predicate
|
||||
ON temporal_triples(subject, predicate)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
|
||||
def _now(self) -> str:
|
||||
"""Get current time in ISO 8601 format."""
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
def _generate_id(self) -> str:
|
||||
"""Generate a unique ID for a triple."""
|
||||
return f"{self._now()}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
def store_fact(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: str,
|
||||
object: str,
|
||||
valid_from: Optional[str] = None,
|
||||
valid_until: Optional[str] = None
|
||||
) -> TemporalTriple:
|
||||
"""Store a fact with temporal bounds.
|
||||
|
||||
Args:
|
||||
subject: The subject of the triple
|
||||
predicate: The predicate/relationship
|
||||
object: The object/value
|
||||
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
||||
valid_until: When this fact expires (ISO 8601). None means forever valid.
|
||||
|
||||
Returns:
|
||||
The stored TemporalTriple
|
||||
"""
|
||||
if valid_from is None:
|
||||
valid_from = self._now()
|
||||
|
||||
# Check if there's an existing fact for this subject-predicate
|
||||
existing = self._get_current_fact(subject, predicate)
|
||||
|
||||
triple = TemporalTriple(
|
||||
id=self._generate_id(),
|
||||
subject=subject,
|
||||
predicate=predicate,
|
||||
object=object,
|
||||
valid_from=valid_from,
|
||||
valid_until=valid_until,
|
||||
timestamp=self._now()
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
# If there's an existing fact, mark it as superseded
|
||||
if existing:
|
||||
existing.valid_until = valid_from
|
||||
existing.superseded_by = triple.id
|
||||
self._update_triple(conn, existing)
|
||||
triple.version = existing.version + 1
|
||||
|
||||
# Insert the new fact
|
||||
self._insert_triple(conn, triple)
|
||||
conn.commit()
|
||||
|
||||
logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})")
|
||||
return triple
|
||||
|
||||
def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]:
|
||||
"""Get the current (most recent, still valid) fact for a subject-predicate pair."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ? AND predicate = ? AND valid_until IS NULL
|
||||
ORDER BY timestamp DESC LIMIT 1
|
||||
""",
|
||||
(subject, predicate)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
return self._row_to_triple(row)
|
||||
return None
|
||||
|
||||
def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||
"""Insert a triple into the database."""
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO temporal_triples
|
||||
(id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
triple.id, triple.subject, triple.predicate, triple.object,
|
||||
triple.valid_from, triple.valid_until, triple.timestamp,
|
||||
triple.version, triple.superseded_by
|
||||
)
|
||||
)
|
||||
|
||||
def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||
"""Update an existing triple."""
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE temporal_triples
|
||||
SET valid_until = ?, superseded_by = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(triple.valid_until, triple.superseded_by, triple.id)
|
||||
)
|
||||
|
||||
def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple:
|
||||
"""Convert a database row to a TemporalTriple."""
|
||||
return TemporalTriple(
|
||||
id=row[0],
|
||||
subject=row[1],
|
||||
predicate=row[2],
|
||||
object=row[3],
|
||||
valid_from=row[4],
|
||||
valid_until=row[5],
|
||||
timestamp=row[6],
|
||||
version=row[7],
|
||||
superseded_by=row[8]
|
||||
)
|
||||
|
||||
def query_at_time(
|
||||
self,
|
||||
timestamp: str,
|
||||
subject: Optional[str] = None,
|
||||
predicate: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query facts that were valid at a specific point in time.
|
||||
|
||||
Args:
|
||||
timestamp: The point in time to query (ISO 8601)
|
||||
subject: Optional subject filter
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects valid at that time
|
||||
"""
|
||||
query = """
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE valid_from <= ?
|
||||
AND (valid_until IS NULL OR valid_until > ?)
|
||||
"""
|
||||
params = [timestamp, timestamp]
|
||||
|
||||
if subject:
|
||||
query += " AND subject = ?"
|
||||
params.append(subject)
|
||||
if predicate:
|
||||
query += " AND predicate = ?"
|
||||
params.append(predicate)
|
||||
|
||||
query += " ORDER BY timestamp DESC"
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(query, params)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def query_temporal(
|
||||
self,
|
||||
operator: TemporalOperator,
|
||||
timestamp: str,
|
||||
subject: Optional[str] = None,
|
||||
predicate: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query using temporal operators.
|
||||
|
||||
Args:
|
||||
operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
||||
timestamp: Reference timestamp (ISO 8601)
|
||||
subject: Optional subject filter
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
List of matching TemporalTriple objects
|
||||
"""
|
||||
base_query = "SELECT * FROM temporal_triples WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if subject:
|
||||
base_query += " AND subject = ?"
|
||||
params.append(subject)
|
||||
if predicate:
|
||||
base_query += " AND predicate = ?"
|
||||
params.append(predicate)
|
||||
|
||||
if operator == TemporalOperator.BEFORE:
|
||||
base_query += " AND valid_from < ?"
|
||||
params.append(timestamp)
|
||||
elif operator == TemporalOperator.AFTER:
|
||||
base_query += " AND valid_from > ?"
|
||||
params.append(timestamp)
|
||||
elif operator == TemporalOperator.DURING:
|
||||
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||
params.extend([timestamp, timestamp])
|
||||
elif operator == TemporalOperator.OVERLAPS:
|
||||
# Facts that overlap with a time point (same as DURING)
|
||||
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||
params.extend([timestamp, timestamp])
|
||||
elif operator == TemporalOperator.AT:
|
||||
# Exact match for valid_at query
|
||||
return self.query_at_time(timestamp, subject, predicate)
|
||||
|
||||
base_query += " ORDER BY timestamp DESC"
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(base_query, params)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_fact_history(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get the complete version history of a fact.
|
||||
|
||||
Args:
|
||||
subject: The subject to query
|
||||
predicate: The predicate to query
|
||||
|
||||
Returns:
|
||||
List of all versions of the fact, ordered by timestamp
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ? AND predicate = ?
|
||||
ORDER BY timestamp ASC
|
||||
""",
|
||||
(subject, predicate)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_all_facts_for_entity(
|
||||
self,
|
||||
subject: str,
|
||||
at_time: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get all facts about an entity, optionally at a specific time.
|
||||
|
||||
Args:
|
||||
subject: The entity to query
|
||||
at_time: Optional timestamp to query at
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects
|
||||
"""
|
||||
if at_time:
|
||||
return self.query_at_time(at_time, subject=subject)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ?
|
||||
ORDER BY timestamp DESC
|
||||
""",
|
||||
(subject,)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_entity_changes(
|
||||
self,
|
||||
subject: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get all facts that changed for an entity during a time range.
|
||||
|
||||
Args:
|
||||
subject: The entity to query
|
||||
start_time: Start of time range (ISO 8601)
|
||||
end_time: End of time range (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects that changed in the range
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ?
|
||||
AND ((valid_from >= ? AND valid_from <= ?)
|
||||
OR (valid_until >= ? AND valid_until <= ?))
|
||||
ORDER BY timestamp ASC
|
||||
""",
|
||||
(subject, start_time, end_time, start_time, end_time)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def close(self):
|
||||
"""Close the database connection (no-op for SQLite with context managers)."""
|
||||
pass
|
||||
|
||||
def export_to_json(self) -> str:
|
||||
"""Export all triples to JSON format."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC")
|
||||
triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()]
|
||||
return json.dumps(triples, indent=2)
|
||||
|
||||
def import_from_json(self, json_data: str):
|
||||
"""Import triples from JSON format."""
|
||||
triples = json.loads(json_data)
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
for triple_dict in triples:
|
||||
triple = TemporalTriple.from_dict(triple_dict)
|
||||
self._insert_triple(conn, triple)
|
||||
conn.commit()
|
||||
434
patches/agent/temporal_reasoning.py
Normal file
434
patches/agent/temporal_reasoning.py
Normal file
@@ -0,0 +1,434 @@
|
||||
"""Temporal Reasoning Engine for Hermes Agent.
|
||||
|
||||
Enables Timmy to reason about past and future states, generate historical
|
||||
summaries, and perform temporal inference over the evolving knowledge graph.
|
||||
|
||||
Queries supported:
|
||||
- "What was Timmy's view on sovereignty before March 2026?"
|
||||
- "When did we first learn about MLX integration?"
|
||||
- "How has the codebase changed since the security audit?"
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from datetime import datetime, timedelta
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from agent.temporal_knowledge_graph import (
|
||||
TemporalTripleStore, TemporalTriple, TemporalOperator
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChangeType(Enum):
|
||||
"""Types of changes in the knowledge graph."""
|
||||
ADDED = "added"
|
||||
REMOVED = "removed"
|
||||
MODIFIED = "modified"
|
||||
SUPERSEDED = "superseded"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FactChange:
|
||||
"""Represents a change in a fact over time."""
|
||||
change_type: ChangeType
|
||||
subject: str
|
||||
predicate: str
|
||||
old_value: Optional[str]
|
||||
new_value: Optional[str]
|
||||
timestamp: str
|
||||
version: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class HistoricalSummary:
|
||||
"""Summary of how an entity or concept evolved over time."""
|
||||
entity: str
|
||||
start_time: str
|
||||
end_time: str
|
||||
total_changes: int
|
||||
key_facts: List[Dict[str, Any]]
|
||||
evolution_timeline: List[FactChange]
|
||||
current_state: List[Dict[str, Any]]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"entity": self.entity,
|
||||
"start_time": self.start_time,
|
||||
"end_time": self.end_time,
|
||||
"total_changes": self.total_changes,
|
||||
"key_facts": self.key_facts,
|
||||
"evolution_timeline": [
|
||||
{
|
||||
"change_type": c.change_type.value,
|
||||
"subject": c.subject,
|
||||
"predicate": c.predicate,
|
||||
"old_value": c.old_value,
|
||||
"new_value": c.new_value,
|
||||
"timestamp": c.timestamp,
|
||||
"version": c.version
|
||||
}
|
||||
for c in self.evolution_timeline
|
||||
],
|
||||
"current_state": self.current_state
|
||||
}
|
||||
|
||||
|
||||
class TemporalReasoner:
|
||||
"""Reasoning engine for temporal knowledge graphs."""
|
||||
|
||||
def __init__(self, store: Optional[TemporalTripleStore] = None):
|
||||
"""Initialize the temporal reasoner.
|
||||
|
||||
Args:
|
||||
store: Optional TemporalTripleStore instance. Creates new if None.
|
||||
"""
|
||||
self.store = store or TemporalTripleStore()
|
||||
|
||||
def what_did_we_believe(
|
||||
self,
|
||||
subject: str,
|
||||
before_time: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query: "What did we believe about X before Y happened?"
|
||||
|
||||
Args:
|
||||
subject: The entity to query about
|
||||
before_time: The cutoff time (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of facts believed before the given time
|
||||
"""
|
||||
# Get facts that were valid just before the given time
|
||||
return self.store.query_temporal(
|
||||
TemporalOperator.BEFORE,
|
||||
before_time,
|
||||
subject=subject
|
||||
)
|
||||
|
||||
def when_did_we_learn(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: Optional[str] = None,
|
||||
object: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Query: "When did we first learn about X?"
|
||||
|
||||
Args:
|
||||
subject: The subject to search for
|
||||
predicate: Optional predicate filter
|
||||
object: Optional object filter
|
||||
|
||||
Returns:
|
||||
Timestamp of first knowledge, or None if never learned
|
||||
"""
|
||||
history = self.store.get_fact_history(subject, predicate or "")
|
||||
|
||||
# Filter by object if specified
|
||||
if object:
|
||||
history = [h for h in history if h.object == object]
|
||||
|
||||
if history:
|
||||
# Return the earliest timestamp
|
||||
earliest = min(history, key=lambda x: x.timestamp)
|
||||
return earliest.timestamp
|
||||
return None
|
||||
|
||||
def how_has_it_changed(
|
||||
self,
|
||||
subject: str,
|
||||
since_time: str
|
||||
) -> List[FactChange]:
|
||||
"""Query: "How has X changed since Y?"
|
||||
|
||||
Args:
|
||||
subject: The entity to analyze
|
||||
since_time: The starting time (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of changes since the given time
|
||||
"""
|
||||
now = datetime.now().isoformat()
|
||||
changes = self.store.get_entity_changes(subject, since_time, now)
|
||||
|
||||
fact_changes = []
|
||||
for i, triple in enumerate(changes):
|
||||
# Determine change type
|
||||
if i == 0:
|
||||
change_type = ChangeType.ADDED
|
||||
old_value = None
|
||||
else:
|
||||
prev = changes[i - 1]
|
||||
if triple.object != prev.object:
|
||||
change_type = ChangeType.MODIFIED
|
||||
old_value = prev.object
|
||||
else:
|
||||
change_type = ChangeType.SUPERSEDED
|
||||
old_value = prev.object
|
||||
|
||||
fact_changes.append(FactChange(
|
||||
change_type=change_type,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=old_value,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
|
||||
return fact_changes
|
||||
|
||||
def generate_temporal_summary(
|
||||
self,
|
||||
entity: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> HistoricalSummary:
|
||||
"""Generate a historical summary of an entity's evolution.
|
||||
|
||||
Args:
|
||||
entity: The entity to summarize
|
||||
start_time: Start of the time range (ISO 8601)
|
||||
end_time: End of the time range (ISO 8601)
|
||||
|
||||
Returns:
|
||||
HistoricalSummary containing the entity's evolution
|
||||
"""
|
||||
# Get all facts for the entity in the time range
|
||||
initial_state = self.store.query_at_time(start_time, subject=entity)
|
||||
final_state = self.store.query_at_time(end_time, subject=entity)
|
||||
changes = self.store.get_entity_changes(entity, start_time, end_time)
|
||||
|
||||
# Build evolution timeline
|
||||
evolution_timeline = []
|
||||
seen_predicates = set()
|
||||
|
||||
for triple in changes:
|
||||
if triple.predicate not in seen_predicates:
|
||||
seen_predicates.add(triple.predicate)
|
||||
evolution_timeline.append(FactChange(
|
||||
change_type=ChangeType.ADDED,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=None,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
else:
|
||||
# Find previous value
|
||||
prev = [t for t in changes
|
||||
if t.predicate == triple.predicate
|
||||
and t.timestamp < triple.timestamp]
|
||||
old_value = prev[-1].object if prev else None
|
||||
|
||||
evolution_timeline.append(FactChange(
|
||||
change_type=ChangeType.MODIFIED,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=old_value,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
|
||||
# Extract key facts (predicates that changed most)
|
||||
key_facts = []
|
||||
predicate_changes = {}
|
||||
for change in evolution_timeline:
|
||||
predicate_changes[change.predicate] = (
|
||||
predicate_changes.get(change.predicate, 0) + 1
|
||||
)
|
||||
|
||||
top_predicates = sorted(
|
||||
predicate_changes.items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:5]
|
||||
|
||||
for pred, count in top_predicates:
|
||||
current = [t for t in final_state if t.predicate == pred]
|
||||
if current:
|
||||
key_facts.append({
|
||||
"predicate": pred,
|
||||
"current_value": current[0].object,
|
||||
"changes": count
|
||||
})
|
||||
|
||||
# Build current state
|
||||
current_state = [
|
||||
{
|
||||
"predicate": t.predicate,
|
||||
"object": t.object,
|
||||
"valid_from": t.valid_from,
|
||||
"valid_until": t.valid_until
|
||||
}
|
||||
for t in final_state
|
||||
]
|
||||
|
||||
return HistoricalSummary(
|
||||
entity=entity,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
total_changes=len(evolution_timeline),
|
||||
key_facts=key_facts,
|
||||
evolution_timeline=evolution_timeline,
|
||||
current_state=current_state
|
||||
)
|
||||
|
||||
def infer_temporal_relationship(
|
||||
self,
|
||||
fact_a: TemporalTriple,
|
||||
fact_b: TemporalTriple
|
||||
) -> Optional[str]:
|
||||
"""Infer temporal relationship between two facts.
|
||||
|
||||
Args:
|
||||
fact_a: First fact
|
||||
fact_b: Second fact
|
||||
|
||||
Returns:
|
||||
Description of temporal relationship, or None
|
||||
"""
|
||||
a_start = datetime.fromisoformat(fact_a.valid_from)
|
||||
a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None
|
||||
b_start = datetime.fromisoformat(fact_b.valid_from)
|
||||
b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None
|
||||
|
||||
# Check if A happened before B
|
||||
if a_end and a_end <= b_start:
|
||||
return "A happened before B"
|
||||
|
||||
# Check if B happened before A
|
||||
if b_end and b_end <= a_start:
|
||||
return "B happened before A"
|
||||
|
||||
# Check if they overlap
|
||||
if a_end and b_end:
|
||||
if a_start <= b_end and b_start <= a_end:
|
||||
return "A and B overlap in time"
|
||||
|
||||
# Check if one supersedes the other
|
||||
if fact_a.superseded_by == fact_b.id:
|
||||
return "B supersedes A"
|
||||
if fact_b.superseded_by == fact_a.id:
|
||||
return "A supersedes B"
|
||||
|
||||
return "A and B are temporally unrelated"
|
||||
|
||||
def get_worldview_at_time(
|
||||
self,
|
||||
timestamp: str,
|
||||
subjects: Optional[List[str]] = None
|
||||
) -> Dict[str, List[Dict[str, Any]]]:
|
||||
"""Get Timmy's complete worldview at a specific point in time.
|
||||
|
||||
Args:
|
||||
timestamp: The point in time (ISO 8601)
|
||||
subjects: Optional list of subjects to include. If None, includes all.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping subjects to their facts at that time
|
||||
"""
|
||||
worldview = {}
|
||||
|
||||
if subjects:
|
||||
for subject in subjects:
|
||||
facts = self.store.query_at_time(timestamp, subject=subject)
|
||||
if facts:
|
||||
worldview[subject] = [
|
||||
{
|
||||
"predicate": f.predicate,
|
||||
"object": f.object,
|
||||
"version": f.version
|
||||
}
|
||||
for f in facts
|
||||
]
|
||||
else:
|
||||
# Get all facts at that time
|
||||
all_facts = self.store.query_at_time(timestamp)
|
||||
for fact in all_facts:
|
||||
if fact.subject not in worldview:
|
||||
worldview[fact.subject] = []
|
||||
worldview[fact.subject].append({
|
||||
"predicate": fact.predicate,
|
||||
"object": fact.object,
|
||||
"version": fact.version
|
||||
})
|
||||
|
||||
return worldview
|
||||
|
||||
def find_knowledge_gaps(
|
||||
self,
|
||||
subject: str,
|
||||
expected_predicates: List[str]
|
||||
) -> List[str]:
|
||||
"""Find predicates that are missing or have expired for a subject.
|
||||
|
||||
Args:
|
||||
subject: The entity to check
|
||||
expected_predicates: List of predicates that should exist
|
||||
|
||||
Returns:
|
||||
List of missing predicate names
|
||||
"""
|
||||
now = datetime.now().isoformat()
|
||||
current_facts = self.store.query_at_time(now, subject=subject)
|
||||
current_predicates = {f.predicate for f in current_facts}
|
||||
|
||||
return [
|
||||
pred for pred in expected_predicates
|
||||
if pred not in current_predicates
|
||||
]
|
||||
|
||||
def export_reasoning_report(
|
||||
self,
|
||||
entity: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> str:
|
||||
"""Generate a human-readable reasoning report.
|
||||
|
||||
Args:
|
||||
entity: The entity to report on
|
||||
start_time: Start of the time range
|
||||
end_time: End of the time range
|
||||
|
||||
Returns:
|
||||
Formatted report string
|
||||
"""
|
||||
summary = self.generate_temporal_summary(entity, start_time, end_time)
|
||||
|
||||
report = f"""
|
||||
# Temporal Reasoning Report: {entity}
|
||||
|
||||
## Time Range
|
||||
- From: {start_time}
|
||||
- To: {end_time}
|
||||
|
||||
## Summary
|
||||
- Total Changes: {summary.total_changes}
|
||||
- Key Facts Tracked: {len(summary.key_facts)}
|
||||
|
||||
## Key Facts
|
||||
"""
|
||||
for fact in summary.key_facts:
|
||||
report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n"
|
||||
|
||||
report += "\n## Evolution Timeline\n"
|
||||
for change in summary.evolution_timeline[:10]: # Show first 10
|
||||
report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n"
|
||||
if change.old_value:
|
||||
report += f" - Changed from: {change.old_value}\n"
|
||||
report += f" - Changed to: {change.new_value}\n"
|
||||
|
||||
if len(summary.evolution_timeline) > 10:
|
||||
report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n"
|
||||
|
||||
report += "\n## Current State\n"
|
||||
for state in summary.current_state:
|
||||
report += f"- {state['predicate']}: {state['object']}\n"
|
||||
|
||||
return report
|
||||
@@ -1,506 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generate 1,000 Problem→Solution training pairs for Frontend & Creative code patterns.
|
||||
|
||||
Part of timmy-config#595: Code Patterns: Frontend & Creative — 1K Problem→Solution Pairs.
|
||||
|
||||
Domains covered:
|
||||
- Three.js: scenes, geometry, materials, lighting, camera, animation
|
||||
- HTML/CSS/JS: DOM manipulation, events, styling, responsive design
|
||||
- Playground UI: sovereign-first interactive components
|
||||
- Gallery: image grids, lightboxes, masonry layouts
|
||||
- Games: canvas rendering, game loops, simple mechanics
|
||||
|
||||
Usage:
|
||||
python scripts/generate_code_patterns_frontend_creative.py
|
||||
python scripts/generate_code_patterns_frontend_creative.py --output /path/to/output.jsonl
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
random.seed(595)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Three.js Templates
|
||||
# ============================================================
|
||||
|
||||
THREEJS_TEMPLATES = [
|
||||
# Scene setup
|
||||
{
|
||||
"problem": "Set up a Three.js scene with a renderer, camera, and animation loop.",
|
||||
"solution": "function initThreeJS(container: HTMLElement) {\n const scene = new THREE.Scene();\n scene.background = new THREE.Color(0x0a0a0a);\n\n const camera = new THREE.PerspectiveCamera(\n 75,\n container.clientWidth / container.clientHeight,\n 0.1,\n 1000\n );\n camera.position.z = 5;\n\n const renderer = new THREE.WebGLRenderer({ antialias: true });\n renderer.setSize(container.clientWidth, container.clientHeight);\n renderer.setPixelRatio(window.devicePixelRatio);\n container.appendChild(renderer.domElement);\n\n function animate() {\n requestAnimationFrame(animate);\n renderer.render(scene, camera);\n }\n animate();\n\n return { scene, camera, renderer };\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-scene",
|
||||
},
|
||||
# Geometry — sphere with wireframe
|
||||
{
|
||||
"problem": "Create a Three.js sphere with custom segment counts and a wireframe overlay.",
|
||||
"solution": "function createSphereWithWireframe(radius = 1, segments = 32) {\n const sphereGeom = new THREE.SphereGeometry(radius, segments, segments);\n const sphereMat = new THREE.MeshStandardMaterial({\n color: 0x4a90d9,\n roughness: 0.3,\n metalness: 0.7,\n });\n const sphere = new THREE.Mesh(sphereGeom, sphereMat);\n\n const wireframe = new THREE.LineSegments(\n new THREE.WireframeGeometry(sphereGeom),\n new THREE.LineBasicMaterial({ color: 0xffffff, opacity: 0.3, transparent: true })\n );\n sphere.add(wireframe);\n\n return sphere;\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-geometry",
|
||||
},
|
||||
# Materials — PBR
|
||||
{
|
||||
"problem": "Apply a physically-based material with environment mapping to a Three.js object.",
|
||||
"solution": "function createReflectiveMaterial(envMap: THREE.CubeTexture) {\n return new THREE.MeshStandardMaterial({\n color: 0xffffff,\n metalness: 1.0,\n roughness: 0.1,\n envMap: envMap,\n envMapIntensity: 1.0,\n });\n}\n\n// Usage\nconst material = createReflectiveMaterial(cubeTexture);\nconst mesh = new THREE.Mesh(geometry, material);",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-materials",
|
||||
},
|
||||
# --- Lighting ---
|
||||
{
|
||||
"problem": "Create a Three.js lighting setup with ambient, directional, and point lights.",
|
||||
"solution": "function setupLighting(scene: THREE.Scene) {\n const ambient = new THREE.AmbientLight(0x404040, 0.5);\n scene.add(ambient);\n\n const directional = new THREE.DirectionalLight(0xffffff, 1.0);\n directional.position.set(5, 10, 7);\n directional.castShadow = true;\n directional.shadow.mapSize.width = 2048;\n directional.shadow.mapSize.height = 2048;\n scene.add(directional);\n\n const point = new THREE.PointLight(0xff9000, 0.8, 20);\n point.position.set(-3, 2, 3);\n scene.add(point);\n\n return { ambient, directional, point };\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-lighting",
|
||||
},
|
||||
# --- Camera OrbitControls ---
|
||||
{
|
||||
"problem": "Implement OrbitControls camera with constrained polar angles and smooth damping.",
|
||||
"solution": "function setupOrbitControls(camera: THREE.PerspectiveCamera, domElement: HTMLElement) {\n const controls = new THREE.OrbitControls(camera, domElement);\n controls.enableDamping = true;\n controls.dampingFactor = 0.05;\n controls.minDistance = 2;\n controls.maxDistance = 20;\n controls.maxPolarAngle = Math.PI / 2;\n controls.minPolarAngle = Math.PI / 6;\n controls.enablePan = false;\n return controls;\n}",
|
||||
"imports": "import { OrbitControls } from 'three/examples/jsm/controls/OrbitControls.js';",
|
||||
"domain": "threejs-camera",
|
||||
},
|
||||
# --- Delta-time rotation ---
|
||||
{
|
||||
"problem": "Create a smooth Three.js rotation animation using delta time.",
|
||||
"solution": "class RotatingObject {\n mesh: THREE.Mesh;\n speed: number;\n\n constructor(mesh: THREE.Mesh, rotationsPerSecond = 0.5) {\n this.mesh = mesh;\n this.speed = rotationsPerSecond * Math.PI * 2;\n }\n\n update(deltaSec: number) {\n this.mesh.rotation.y += this.speed * deltaSec;\n }\n}\n\n// In render loop:\nconst rotor = new RotatingObject(sphere, 0.25);\nlet last = performance.now();\nfunction animate(time: number) {\n const delta = (time - last) / 1000;\n last = time;\n rotor.update(delta);\n renderer.render(scene, camera);\n requestAnimationFrame(animate);\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-animation",
|
||||
},
|
||||
# --- Texture loading async ---
|
||||
{
|
||||
"problem": "Load a Three.js texture asynchronously with proper error handling.",
|
||||
"solution": "async function loadTexture(url: string): Promise<THREE.Texture> {\n const loader = new THREE.TextureLoader();\n try {\n return await new Promise<THREE.Texture>((resolve, reject) => {\n loader.load(url, resolve, undefined, reject);\n });\n } catch (err) {\n console.error('Texture load failed:', url, err);\n throw err;\n }\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-textures",
|
||||
},
|
||||
# --- Rounded box ---
|
||||
{
|
||||
"problem": "Create a rounded-box Three.js geometry using RoundedBoxGeometry.",
|
||||
"solution": "function createRoundedBox(width = 1, height = 1, depth = 1, segments = 2, radius = 0.1) {\n const geom = new THREE.RoundedBoxGeometry(width, height, depth, segments, radius);\n const mat = new THREE.MeshStandardMaterial({ color: 0x2ecc71 });\n return new THREE.Mesh(geom, mat);\n}",
|
||||
"imports": "import { RoundedBoxGeometry } from 'three/examples/jsm/geometries/RoundedBoxGeometry.js';",
|
||||
"domain": "threejs-geometry",
|
||||
},
|
||||
# --- Fog ---
|
||||
{
|
||||
"problem": "Add depth fog to a Three.js scene for atmospheric perspective.",
|
||||
"solution": "function addFog(scene: THREE.Scene, color = 0x0a0a0a, near = 10, far = 50) {\n scene.fog = new THREE.Fog(color, near, far);\n scene.background = new THREE.Color(color);\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-scene",
|
||||
},
|
||||
# --- ShaderMaterial ---
|
||||
{
|
||||
"problem": "Create a Three.js ShaderMaterial with uniform updates in the render loop.",
|
||||
"solution": "function createGlowShader() {\n return new THREE.ShaderMaterial({\n uniforms: {\n uTime: { value: 0 },\n uColor: { value: new THREE.Color(0x00ffff) },\n },\n vertexShader: `\n varying vec2 vUv;\n void main() {\n vUv = uv;\n gl_Position = projectionMatrix * modelViewMatrix * vec4(position, 1.0);\n }\n `,\n fragmentShader: `\n uniform float uTime;\n uniform vec3 uColor;\n varying vec2 vUv;\n void main() {\n float pulse = 0.5 + 0.5 * sin(uTime * 2.0);\n gl_FragColor = vec4(uColor * pulse, 1.0);\n }\n `,\n transparent: true,\n });\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-materials",
|
||||
},
|
||||
# --- Group hierarchy ---
|
||||
{
|
||||
"problem": "Organize Three.js objects into a hierarchical group with local transforms.",
|
||||
"solution": "function createVehicleGroup() {\n const chassis = new THREE.Mesh(\n new THREE.BoxGeometry(2, 0.5, 4),\n new THREE.MeshStandardMaterial({ color: 0x333333 })\n );\n\n const wheels = new THREE.Group();\n const positions = [[-1, -0.3, -1.2], [1, -0.3, -1.2], [-1, -0.3, 1.2], [1, -0.3, 1.2]];\n positions.forEach(([x, y, z]) => {\n const wheel = new THREE.Mesh(\n new THREE.CylinderGeometry(0.3, 0.3, 0.2, 16),\n new THREE.MeshStandardMaterial({ color: 0x111111 })\n );\n wheel.rotation.z = Math.PI / 2;\n wheel.position.set(x, y, z);\n wheels.add(wheel);\n });\n\n const group = new THREE.Group();\n group.add(chassis);\n group.add(wheels);\n return group;\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-scene",
|
||||
},
|
||||
# --- Raycasting ---
|
||||
{
|
||||
"problem": "Implement Three.js raycaster click picking with object metadata.",
|
||||
"solution": "function setupRaycaster(camera: THREE.Camera, dom: HTMLElement) {\n const raycaster = new THREE.Raycaster();\n const mouse = new THREE.Vector2();\n\n dom.addEventListener('click', (e) => {\n const rect = dom.getBoundingClientRect();\n mouse.x = ((e.clientX - rect.left) / rect.width) * 2 - 1;\n mouse.y = -((e.clientY - rect.top) / rect.height) * 2 + 1;\n\n raycaster.setFromCamera(mouse, camera);\n const intersects = raycaster.intersectObjects(scene.children, true);\n if (intersects.length > 0) {\n const hit = intersects[0].object;\n console.log('Clicked:', hit.userData.name || hit.uuid);\n }\n });\n\n return raycaster;\n}",
|
||||
"imports": "import * as THREE from 'three';",
|
||||
"domain": "threejs-interaction",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# HTML/CSS/JS Templates
|
||||
# ============================================================
|
||||
|
||||
HTML_CSS_JS_TEMPLATES = [
|
||||
# --- DOM element creation ---
|
||||
{
|
||||
"problem": "Create a DOM element with multiple classes and attributes in vanilla JavaScript.",
|
||||
"solution": "function createElement(tag: string, classes: string[] = [], attrs: Record<string, string> = {}, children: Node[] = []) {\n const el = document.createElement(tag);\n el.classList.add(...classes);\n for (const [key, value] of Object.entries(attrs)) {\n el.setAttribute(key, value);\n }\n for (const child of children) {\n el.appendChild(child);\n }\n return el;\n}\n\n// Usage\nconst button = createElement('button', ['btn', 'btn-primary'], { 'aria-label': 'Submit' }, [\n document.createTextNode('Submit')\n]);",
|
||||
"imports": "",
|
||||
"domain": "html-dom",
|
||||
},
|
||||
# --- Event delegation ---
|
||||
{
|
||||
"problem": "Implement event delegation for dynamic button clicks with proper type checking.",
|
||||
"solution": "function setupEventDelegation(container: HTMLElement) {\n container.addEventListener('click', (e) => {\n const target = e.target as HTMLElement;\n if (!target.matches('button[data-action]')) return;\n\n const action = target.getAttribute('data-action');\n switch (action) {\n case 'save':\n handleSave();\n break;\n case 'delete':\n handleDelete();\n break;\n default:\n console.warn('Unknown action:', action);\n }\n });\n}",
|
||||
"imports": "",
|
||||
"domain": "html-dom",
|
||||
},
|
||||
# --- Form validation ---
|
||||
{
|
||||
"problem": "Validate a form submission with HTML5 constraints and custom checks.",
|
||||
"solution": "function validateForm(form: HTMLFormElement): { isValid: boolean; errors: string[] } {\n const errors: string[] = [];\n const email = form.elements.namedItem('email') as HTMLInputElement;\n const password = form.elements.namedItem('password') as HTMLInputElement;\n\n if (!email.validity.valid) {\n errors.push('Please enter a valid email address.');\n }\n if (password.value.length < 8) {\n errors.push('Password must be at least 8 characters.');\n }\n if (password.value !== (form.elements.namedItem('confirm') as HTMLInputElement).value) {\n errors.push('Passwords do not match.');\n }\n\n return { isValid: errors.length === 0, errors };\n}",
|
||||
"imports": "",
|
||||
"domain": "html-forms",
|
||||
},
|
||||
# --- CSS Grid ---
|
||||
{
|
||||
"problem": "Create a responsive CSS grid layout with auto-fill and gap.",
|
||||
"solution": "const style = document.createElement('style');\nstyle.textContent = `\n .card-grid {\n display: grid;\n grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));\n gap: 1.5rem;\n padding: 1rem;\n }\n .card {\n background: var(--card-bg);\n border-radius: 8px;\n box-shadow: 0 2px 8px rgba(0,0,0,0.1);\n }\n @media (max-width: 600px) {\n .card-grid { grid-template-columns: 1fr; }\n }\n`;\ndocument.head.appendChild(style);",
|
||||
"imports": "",
|
||||
"domain": "css-layout",
|
||||
},
|
||||
# --- CSS custom properties ---
|
||||
{
|
||||
"problem": "Set and read CSS custom properties (CSS variables) via JavaScript.",
|
||||
"solution": "function setThemeColor(root: HTMLElement, name: string, value: string) {\n root.style.setProperty(`--theme-${name}`, value);\n}\n\nfunction getComputedColor(root: HTMLElement, name: string): string {\n return getComputedStyle(root).getPropertyValue(`--theme-${name}`).trim();\n}\n\n// Initialize theme\nsetThemeColor(document.documentElement, 'primary', '#4a90d9');\nsetThemeColor(document.documentElement, 'accent', '#ff6b6b');",
|
||||
"imports": "",
|
||||
"domain": "css-variables",
|
||||
},
|
||||
# --- Intersection Observer ---
|
||||
{
|
||||
"problem": "Use IntersectionObserver to lazy-load images when they enter the viewport.",
|
||||
"solution": "function setupLazyLoading(container: HTMLElement) {\n const images = container.querySelectorAll('img[data-src]');\n const observer = new IntersectionObserver((entries) => {\n entries.forEach(entry => {\n if (entry.isIntersecting) {\n const img = entry.target as HTMLImageElement;\n img.src = img.dataset.src!;\n img.removeAttribute('data-src');\n observer.unobserve(img);\n }\n });\n }, { rootMargin: '50px' });\n\n images.forEach(img => observer.observe(img));\n}",
|
||||
"imports": "",
|
||||
"domain": "html-performance",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Playground UI Templates
|
||||
# ============================================================
|
||||
|
||||
PLAYGROUND_UI_TEMPLATES = [
|
||||
# --- Sovereignty badge ---
|
||||
{
|
||||
"problem": "Render a sovereignty badge displaying local-first status with tooltip.",
|
||||
"solution": "function SovereigntyBadge({ runningLocal }: { runningLocal: boolean }) {\n const badge = document.createElement('span');\n badge.className = 'sovereignty-badge';\n badge.innerHTML = runningLocal\n ? '\\ud83c\\uddf5\\ud83c\\uddf1\\u200d\\ud83c\\udfa8\\ufe0f Local'\n : '\\ud83d\\udd12 Cloud';\n badge.title = runningLocal\n ? 'This agent runs entirely on your machine'\n : 'This agent uses external inference';\n return badge;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
# --- Token counter ---
|
||||
{
|
||||
"problem": "Build a token budget display showing used/total with a visual progress bar.",
|
||||
"solution": "function TokenBudgetDisplay({ used, total }: { used: number; total: number }) {\n const pct = Math.min((used / total) * 100, 100);\n const bar = document.createElement('div');\n bar.className = 'token-budget-bar';\n bar.innerHTML = `\n <div class=\"track\">\n <div class=\"fill\" style=\"width: ${pct}%; background: ${pct > 90 ? '#f44336' : '#4caf50'}\"></div>\n </div>\n <span class=\"label\">${used.toLocaleString()} / ${total.toLocaleString()} tokens</span>\n `;\n return bar;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
# --- Approval gate ---
|
||||
{
|
||||
"problem": "Create an approval gate component for dangerous commands with tiered risk colors.",
|
||||
"solution": "function ApprovalGate({ risk, onApprove, onDeny }: {\n risk: 'low' | 'medium' | 'high';\n onApprove: () => void;\n onDeny: () => void;\n}) {\n const colors = { low: '#4caf50', medium: '#ff9800', high: '#f44336' };\n const panel = document.createElement('div');\n panel.className = 'approval-gate';\n panel.style.borderColor = colors[risk];\n panel.innerHTML = `\n <p>This action is <strong>${risk} risk</strong>. Continue?</p>\n <button data-action=\"approve\">Yes, proceed</button>\n <button data-action=\"deny\">No, cancel</button>\n `;\n panel.querySelector('[data-action=\"approve\"]')!.addEventListener('click', onApprove);\n panel.querySelector('[data-action=\"deny\"]')!.addEventListener('click', onDeny);\n return panel;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
# --- Skill card ---
|
||||
{
|
||||
"problem": "Render a skill card with metadata, status indicator, and toggle switch.",
|
||||
"solution": "function SkillCard({ skill, enabled, onToggle }: {\n skill: { name: string; description: string; category: string };\n enabled: boolean;\n onToggle: (name: string) => void;\n}) {\n const card = document.createElement('article');\n card.className = 'skill-card';\n card.innerHTML = `\n <header>\n <h3>${skill.name}</h3>\n <label class=\"toggle\">\n <input type=\"checkbox\" ${enabled ? 'checked' : ''}>\n <span class=\"slider\"></span>\n </label>\n </header>\n <p>${skill.description}</p>\n <footer>Category: ${skill.category}</footer>\n `;\n card.querySelector('input')!.addEventListener('change', () => onToggle(skill.name));\n return card;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Gallery Templates
|
||||
# ============================================================
|
||||
|
||||
GALLERY_TEMPLATES = [
|
||||
# --- Masonry grid ---
|
||||
{
|
||||
"problem": "Implement a responsive masonry image grid using CSS columns.",
|
||||
"solution": "function createMasonryGallery(images: { src: string; alt: string }[], columns = 3) {\n const container = document.createElement('div');\n container.className = 'masonry-gallery';\n container.style.columnCount = String(columns);\n container.style.gap = '1rem';\n\n images.forEach(img => {\n const figure = document.createElement('figure');\n figure.innerHTML = `<img src=\"${img.src}\" alt=\"${img.alt}\" loading=\"lazy\">`;\n container.appendChild(figure);\n });\n\n // Responsive breakpoints\n const mq = window.matchMedia('(max-width: 768px)');\n mq.addEventListener('change', (e) => {\n container.style.columnCount = e.matches ? '2' : String(columns);\n });\n\n return container;\n}",
|
||||
"imports": "",
|
||||
"domain": "gallery-layout",
|
||||
},
|
||||
# --- Lightbox modal ---
|
||||
{
|
||||
"problem": "Build a modal lightbox for full-screen image viewing with keyboard navigation.",
|
||||
"solution": "class Lightbox {\n private overlay!: HTMLElement;\n private img!: HTMLImageElement;\n\n constructor() {\n this.overlay = document.createElement('div');\n this.overlay.className = 'lightbox-overlay';\n this.overlay.style.cssText = 'position:fixed;inset:0;background:rgba(0,0,0,0.9);display:flex;align-items:center;justify-content:center;z-index:9999';\n this.img = document.createElement('img');\n this.overlay.appendChild(this.img);\n document.body.appendChild(this.overlay);\n\n this.overlay.addEventListener('click', () => this.close());\n document.addEventListener('keydown', (e) => e.key === 'Escape' && this.close());\n }\n\n open(src: string, alt: string) {\n this.img.src = src;\n this.img.alt = alt;\n this.overlay.style.display = 'flex';\n }\n\n close() {\n this.overlay.style.display = 'none';\n }\n}",
|
||||
"imports": "",
|
||||
"domain": "gallery-interaction",
|
||||
},
|
||||
# --- Infinite scroll ---
|
||||
{
|
||||
"problem": "Implement infinite scroll loading with IntersectionObserver and abort handling.",
|
||||
"solution": "async function setupInfiniteScroll(container: HTMLElement, loadPage: (page: number) => Promise<void>) {\n let page = 1;\n let loading = false;\n let done = false;\n\n const sentinel = document.createElement('div');\n sentinel.className = 'scroll-sentinel';\n container.appendChild(sentinel);\n\n const observer = new IntersectionObserver(async (entries) => {\n if (entries[0].isIntersecting && !loading && !done) {\n loading = true;\n try {\n await loadPage(++page);\n } catch (err) {\n console.error('Failed to load page:', err);\n done = true;\n }\n loading = false;\n }\n }, { rootMargin: '200px' });\n\n observer.observe(sentinel);\n}",
|
||||
"imports": "",
|
||||
"domain": "gallery-performance",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Game Templates
|
||||
# ============================================================
|
||||
|
||||
GAME_TEMPLATES = [
|
||||
# --- Game loop ---
|
||||
{
|
||||
"problem": "Create a fixed-timestep game loop with accumulator pattern.",
|
||||
"solution": "class GameLoop {\n private lastTime = 0;\n private accumulator = 0;\n private readonly step = 1 / 60; // 60 Hz fixed step\n\n constructor(private readonly update: (dt: number) => void) {}\n\n start() {\n const frame = (time: number) => {\n const delta = (time - this.lastTime) / 1000;\n this.lastTime = time;\n this.accumulator += delta;\n\n while (this.accumulator >= this.step) {\n this.update(this.step);\n this.accumulator -= this.step;\n }\n\n requestAnimationFrame(frame);\n };\n requestAnimationFrame(frame);\n }\n}",
|
||||
"imports": "",
|
||||
"domain": "game-architecture",
|
||||
},
|
||||
# --- Canvas setup ---
|
||||
{
|
||||
"problem": "Set up an HTML5 canvas with high-DPI scaling and clearing.",
|
||||
"solution": "function setupCanvas(canvas: HTMLCanvasElement, width = 800, height = 600) {\n const dpr = window.devicePixelRatio || 1;\n canvas.width = width * dpr;\n canvas.height = height * dpr;\n canvas.style.width = `${width}px`;\n canvas.style.height = `${height}px`;\n\n const ctx = canvas.getContext('2d')!;\n ctx.scale(dpr, dpr);\n\n return {\n clear() { ctx.clearRect(0, 0, width, height); },\n ctx,\n width,\n height,\n };\n}",
|
||||
"imports": "",
|
||||
"domain": "game-rendering",
|
||||
},
|
||||
# --- Sprite animation ---
|
||||
{
|
||||
"problem": "Animate a sprite sheet with frame-based playback and loop support.",
|
||||
"solution": "class SpriteAnimator {\n private frame = 0;\n private lastTick = 0;\n\n constructor(\n private readonly image: HTMLImageElement,\n private readonly frameWidth: number,\n private readonly frameCount: number,\n private readonly fps: number = 12,\n private readonly loop: boolean = true,\n ) {}\n\n update(now: number) {\n const interval = 1000 / this.fps;\n if (now - this.lastTick >= interval) {\n this.lastTick = now;\n this.frame++;\n if (this.frame >= this.frameCount) {\n this.frame = this.loop ? 0 : this.frameCount - 1;\n }\n }\n }\n\n draw(ctx: CanvasRenderingContext2D, x: number, y: number) {\n ctx.drawImage(\n this.image,\n this.frame * this.frameWidth, 0,\n this.frameWidth, this.image.height,\n x, y,\n this.frameWidth, this.image.height\n );\n }\n}",
|
||||
"imports": "",
|
||||
"domain": "game-assets",
|
||||
},
|
||||
# --- AABB collision ---
|
||||
{
|
||||
"problem": "Detect AABB (axis-aligned bounding box) collision between two rectangles.",
|
||||
"solution": "function aabbCollision(\n a: { x: number; y: number; w: number; h: number },\n b: { x: number; y: number; w: number; h: number }\n): boolean {\n return a.x < b.x + b.w &&\n a.x + a.w > b.x &&\n a.y < b.y + b.h &&\n a.y + a.h > b.y;\n}\n\n// Usage for game entities\nif (aabbCollision(player, enemy)) {\n handlePlayerHit();\n}",
|
||||
"imports": "",
|
||||
"domain": "game-physics",
|
||||
},
|
||||
# --- Input handling ---
|
||||
{
|
||||
"problem": "Capture keyboard input state with smooth handling for game controls.",
|
||||
"solution": "class InputState {\n private keys = new Set<string>();\n\n constructor() {\n window.addEventListener('keydown', (e) => this.keys.add(e.code));\n window.addEventListener('keyup', (e) => this.keys.delete(e.code));\n }\n\n isPressed(code: string): boolean {\n return this.keys.has(code);\n }\n\n hasAny(codes: string[]): boolean {\n return codes.some(c => this.keys.has(c));\n }\n}\n\n// In game loop:\nconst input = new InputState();\nif (input.isPressed('ArrowUp')) player.y -= speed * dt;",
|
||||
"imports": "",
|
||||
"domain": "game-input",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Extra HTML/CSS/JS Templates
|
||||
# ============================================================
|
||||
|
||||
HTML_CSS_JS_TEMPLATES_EXTRA = [
|
||||
# Debounce utility
|
||||
{
|
||||
"problem": "Write a debounce function that delays invoking a callback until after wait milliseconds.",
|
||||
"solution": "function debounce<T extends (...args: any[]) => void>(\n fn: T,\n wait: number\n): (...args: Parameters<T>) => void {\n let timeoutId: ReturnType<typeof setTimeout> | null = null;\n return (...args: Parameters<T>) => {\n if (timeoutId) clearTimeout(timeoutId);\n timeoutId = setTimeout(() => fn(...args), wait);\n };\n}",
|
||||
"imports": "",
|
||||
"domain": "html-utilities",
|
||||
},
|
||||
# Throttle utility
|
||||
{
|
||||
"problem": "Implement a throttle function ensuring a callback runs at most once per interval.",
|
||||
"solution": "function throttle<T extends (...args: any[]) => void>(\n fn: T,\n interval: number\n): (...args: Parameters<T>) => void {\n let last = 0;\n return (...args: Parameters<T>) => {\n const now = Date.now();\n if (now - last >= interval) {\n last = now;\n fn(...args);\n }\n };\n}",
|
||||
"imports": "",
|
||||
"domain": "html-utilities",
|
||||
},
|
||||
# LocalStorage wrapper with TTL
|
||||
{
|
||||
"problem": "Wrap localStorage with JSON serialization and TTL expiration.",
|
||||
"solution": "class StorageWithTTL {\n set(key: string, value: any, ttlMs = 0) {\n const item = { value, expiry: ttlMs ? Date.now() + ttlMs : null };\n localStorage.setItem(key, JSON.stringify(item));\n }\n\n get<T>(key: string): T | null {\n const raw = localStorage.getItem(key);\n if (!raw) return null;\n const { value, expiry } = JSON.parse(raw);\n if (expiry && Date.now() > expiry) {\n localStorage.removeItem(key);\n return null;\n }\n return value as T;\n }\n}",
|
||||
"imports": "",
|
||||
"domain": "html-storage",
|
||||
},
|
||||
# Viewport meta
|
||||
{
|
||||
"problem": "Generate a responsive viewport meta tag for mobile-first web apps.",
|
||||
"solution": "const viewport = document.querySelector('meta[name=\"viewport\"]') ||\n document.createElement('meta');\nviewport.name = 'viewport';\nviewport.content = 'width=device-width, initial-scale=1.0, maximum-scale=5.0, user-scalable=yes, viewport-fit=cover';\ndocument.head.appendChild(viewport);",
|
||||
"imports": "",
|
||||
"domain": "html-meta",
|
||||
},
|
||||
# Dynamic CSS variables
|
||||
{
|
||||
"problem": "Create and inject a dynamic stylesheet with CSS custom property overrides.",
|
||||
"solution": "function injectDynamicStyles(overrides: Record<string, string>) {\n const style = document.createElement('style');\n let css = ':root {\\n';\n for (const [prop, val] of Object.entries(overrides)) {\n css += ` --${prop}: ${val};\\n`;\n }\n css += '}';\n style.textContent = css;\n document.head.appendChild(style);\n}",
|
||||
"imports": "",
|
||||
"domain": "css-variables",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Extra Playground UI Templates
|
||||
# ============================================================
|
||||
|
||||
PLAYGROUND_UI_TEMPLATES_EXTRA = [
|
||||
# Circuit/tier badge
|
||||
{
|
||||
"problem": "Render a circuit health badge showing approval-tier status with color-coded indicator.",
|
||||
"solution": "function CircuitBadge({ tier }: { tier: number }) {\n const colors = ['#f44336', '#ff9800', '#4caf50', '#2196f3', '#9c27b0'];\n const labels = ['BLOCKED', 'RESTRICTED', 'LIMITED', 'APPROVED', 'ELEVATED'];\n const color = colors[Math.min(tier, 4)];\n const label = labels[Math.min(tier, 4)];\n\n const badge = document.createElement('span');\n badge.className = 'circuit-badge';\n badge.style.backgroundColor = color;\n badge.textContent = label;\n badge.title = `Approval tier ${tier} — ${label.toLowerCase()} command set`;\n return badge;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
# Memory usage bar
|
||||
{
|
||||
"problem": "Display a horizontal memory usage bar with gradient warning zones.",
|
||||
"solution": "function MemoryBar({ used, total }: { used: number; total: number }) {\n const pct = (used / total) * 100;\n const bar = document.createElement('div');\n bar.className = 'memory-bar';\n let color = '#4caf50';\n if (pct > 80) color = '#ff9800';\n if (pct > 95) color = '#f44336';\n\n bar.innerHTML = `\n <div class=\"track\" style=\"background: #e0e0e0; height: 8px; border-radius: 4px; overflow: hidden;\">\n <div style=\"width: ${pct}%; height: 100%; background: ${color}; transition: width 0.3s;\"></div>\n </div>\n <span>${(used/1024/1024).toFixed(1)} MB / ${(total/1024/1024).toFixed(1)} MB</span>\n `;\n return bar;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
# Tool status dot
|
||||
{
|
||||
"problem": "Show a tool availability status dot with tooltip for the toolset panel.",
|
||||
"solution": "function ToolStatus({ name, ok }: { name: string; ok: boolean }) {\n const dot = document.createElement('span');\n dot.className = 'tool-status-dot';\n dot.style.backgroundColor = ok ? '#4caf50' : '#f44336';\n dot.title = `${name}: ${ok ? 'Available' : 'Disabled / missing API key'}`;\n return dot;\n}",
|
||||
"imports": "",
|
||||
"domain": "playground-ui",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Extra Gallery Templates
|
||||
# ============================================================
|
||||
|
||||
GALLERY_TEMPLATES_EXTRA = [
|
||||
# Grid + shared lightbox
|
||||
{
|
||||
"problem": "Build an image gallery grid that opens a shared lightbox on thumbnail click.",
|
||||
"solution": "let currentLightbox: HTMLDivElement | null = null;\n\nfunction buildGallery(images: { full: string; thumb: string; alt: string }[]) {\n const grid = document.createElement('div');\n grid.className = 'gallery-grid';\n grid.style.cssText = 'display:grid;grid-template-columns:repeat(auto-fill,minmax(120px,1fr));gap:0.5rem';\n\n images.forEach((img, idx) => {\n const thumb = document.createElement('img');\n thumb.src = img.thumb;\n thumb.alt = img.alt;\n thumb.style.cssText = 'cursor:pointer;width:100%;height:auto;object-fit:cover;border-radius:4px';\n thumb.addEventListener('click', () => openLightbox(idx));\n grid.appendChild(thumb);\n });\n\n return grid;\n}\n\nfunction openLightbox(index: number) {\n if (currentLightbox) currentLightbox.remove();\n currentLightbox = document.createElement('div');\n currentLightbox.className = 'lightbox';\n currentLightbox.style.cssText = 'position:fixed;inset:0;background:rgba(0,0,0,0.95);display:flex;align-items:center;justify-content:center;z-index:10000;cursor:pointer';\n const img = document.createElement('img');\n img.src = images[index].full;\n img.style.maxWidth = '90vw';\n img.style.maxHeight = '90vh';\n currentLightbox.appendChild(img);\n currentLightbox.addEventListener('click', () => { currentLightbox?.remove(); currentLightbox = null; });\n document.body.appendChild(currentLightbox);\n}",
|
||||
"imports": "",
|
||||
"domain": "gallery-interaction",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Extra Game Templates
|
||||
# ============================================================
|
||||
|
||||
GAME_TEMPLATES_EXTRA = [
|
||||
# Particle system with typed array
|
||||
{
|
||||
"problem": "Create a simple particle system for explosions using a typed array buffer.",
|
||||
"solution": "class ParticleSystem {\n private particles = new Float32Array(1000 * 4); // x, y, vx, vy per particle\n private count = 0;\n private readonly max = 1000;\n\n emit(x: number, y: number, velocity = 200) {\n if (this.count >= this.max) return;\n const i = this.count * 4;\n this.particles[i] = x;\n this.particles[i + 1] = y;\n const angle = Math.random() * Math.PI * 2;\n const speed = Math.random() * velocity;\n this.particles[i + 2] = Math.cos(angle) * speed;\n this.particles[i + 3] = Math.sin(angle) * speed;\n this.count++;\n }\n\n update(dt: number) {\n for (let i = 0; i < this.count * 4; i += 4) {\n this.particles[i] += this.particles[i + 2] * dt;\n this.particles[i + 1] += this.particles[i + 3] * dt;\n this.particles[i + 3] += 500 * dt; // gravity\n }\n }\n\n draw(ctx: CanvasRenderingContext2D) {\n ctx.fillStyle = '#ff6600';\n for (let i = 0; i < this.count * 4; i += 4) {\n ctx.fillRect(this.particles[i], this.particles[i + 1], 3, 3);\n }\n }\n}",
|
||||
"imports": "",
|
||||
"domain": "game-physics",
|
||||
},
|
||||
# State machine
|
||||
{
|
||||
"problem": "Implement a finite state machine for a game character with transitions.",
|
||||
"solution": "type State = 'idle' | 'walk' | 'run' | 'jump' | 'attack';\n\nclass StateMachine {\n private state: State = 'idle';\n private handlers: Record<State, (event: string) => void>;\n\n constructor(handlers: Partial<Record<State, (event: string) => void>>) {\n this.handlers = handlers as Record<State, (event: string) => void>;\n }\n\n transition(to: State) {\n console.log(`State: ${this.state} -> ${to}`);\n this.state = to;\n }\n\n dispatch(event: string) {\n const handler = this.handlers[this.state];\n if (handler) handler(event);\n }\n\n getState(): State {\n return this.state;\n }\n}\n\n// Usage\nconst sm = new StateMachine({\n idle: (e) => { if (e === 'move') sm.transition('walk'); },\n walk: (e) => { if (e === 'sprint') sm.transition('run'); if (e === 'jump') sm.transition('jump'); },\n run: (e) => { if (e === 'stop') sm.transition('idle'); },\n});",
|
||||
"imports": "",
|
||||
"domain": "game-architecture",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Combined
|
||||
# ============================================================
|
||||
|
||||
ALL_TEMPLATES = (
|
||||
THREEJS_TEMPLATES
|
||||
+ HTML_CSS_JS_TEMPLATES
|
||||
+ HTML_CSS_JS_TEMPLATES_EXTRA
|
||||
+ PLAYGROUND_UI_TEMPLATES
|
||||
+ PLAYGROUND_UI_TEMPLATES_EXTRA
|
||||
+ GALLERY_TEMPLATES
|
||||
+ GALLERY_TEMPLATES_EXTRA
|
||||
+ GAME_TEMPLATES
|
||||
+ GAME_TEMPLATES_EXTRA
|
||||
)
|
||||
|
||||
_VARIANT_PREFIXES = [
|
||||
"Write code to",
|
||||
"Implement",
|
||||
"Build",
|
||||
"Create",
|
||||
"How would you",
|
||||
"Using the API, write code that",
|
||||
"Construct a function that",
|
||||
"Develop",
|
||||
"Write JavaScript that",
|
||||
"Create HTML/CSS for",
|
||||
"Design a Three.js",
|
||||
]
|
||||
|
||||
_VARIANT_SUFFIXES = [
|
||||
" including error handling.",
|
||||
" with full docstrings.",
|
||||
" with JSDoc annotations.",
|
||||
" using modern best practices.",
|
||||
" that handles edge cases.",
|
||||
" with TypeScript types.",
|
||||
" that is performant.",
|
||||
" with clear variable names.",
|
||||
" and include example usage.",
|
||||
" with proper cleanup.",
|
||||
" that is accessible (a11y).",
|
||||
" with keyboard navigation support.",
|
||||
]
|
||||
|
||||
|
||||
def vary_problem(base: str, idx: int) -> str:
|
||||
prefix = _VARIANT_PREFIXES[idx % len(_VARIANT_PREFIXES)]
|
||||
suffix = _VARIANT_SUFFIXES[idx % len(_VARIANT_SUFFIXES)]
|
||||
cleaned = base
|
||||
for article in ("Create a ", "Build a ", "Implement a ", "Write a ", "Develop a ", "Write JavaScript that ", "Create HTML/CSS for ", "Design a Three.js "):
|
||||
if cleaned.lower().startswith(article):
|
||||
cleaned = cleaned[len(article):]
|
||||
break
|
||||
cleaned = cleaned[0].lower() + cleaned[1:] if cleaned else ""
|
||||
return f"{prefix} {cleaned}{suffix}"
|
||||
|
||||
|
||||
def vary_solution(base: str, idx: int) -> str:
|
||||
var_names = ["data", "result", "value", "entry", "item", "node", "entity", "output", "obj", "element"]
|
||||
v = var_names[idx % len(var_names)]
|
||||
sol = base
|
||||
if idx % 3 == 0:
|
||||
for original in ["result", "data", "value", "output", "entry", "item", "obj", "element"]:
|
||||
if original in sol:
|
||||
sol = sol.replace(original, v)
|
||||
break
|
||||
if idx % 5 == 0:
|
||||
sol = f"// Variation {idx}\\n" + sol
|
||||
elif idx % 7 == 0:
|
||||
sol = f"# Generated variation {idx}\\n" + sol
|
||||
return sol
|
||||
|
||||
|
||||
def generate_pairs(count: int = 1000) -> list[dict]:
|
||||
pairs = []
|
||||
template_cycle = list(ALL_TEMPLATES)
|
||||
random.shuffle(template_cycle)
|
||||
|
||||
for i in range(count):
|
||||
template = template_cycle[i % len(template_cycle)]
|
||||
problem = vary_problem(template["problem"], i)
|
||||
solution = vary_solution(template["solution"], i)
|
||||
pair = {
|
||||
"problem": problem,
|
||||
"solution": solution,
|
||||
"imports": template["imports"],
|
||||
"domain": template["domain"],
|
||||
"id": f"frontend-creative-{i:04d}",
|
||||
}
|
||||
pairs.append(pair)
|
||||
|
||||
return pairs
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate Frontend & Creative code pattern training pairs")
|
||||
parser.add_argument("--output", "-o", default="training-data/code-patterns-frontend-&-creative.jsonl",
|
||||
help="Output JSONL path")
|
||||
parser.add_argument("--count", "-n", type=int, default=1000,
|
||||
help="Number of pairs to generate")
|
||||
args = parser.parse_args()
|
||||
|
||||
out_path = Path(args.output)
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
pairs = generate_pairs(args.count)
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
|
||||
|
||||
domains = {p["domain"] for p in pairs}
|
||||
print(f"Generated {len(pairs)} code pattern pairs → {out_path}")
|
||||
print(f" Size: {out_path.stat().st_size / 1024:.1f} KB")
|
||||
print(f" Domains ({len(domains)}): {sorted(domains)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user