Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f8104f5398 feat: encrypted backup pipeline for fleet state (#693)
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 25s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 21s
Smoke Test / smoke (pull_request) Failing after 21s
Agent PR Gate / report (pull_request) Has been cancelled
Backup script that captures critical fleet state to encrypted archives.

Targets (selectable):
  sessions — hermes session transcripts (last 7 days)
  gitea — Gitea database dump + token
  config — Hermes config (secrets redacted), host_vars, registry, manifest
  knowledge — knowledge store + memory files

Features:
  --dest /mnt/backup           custom destination
  --encrypt-key /path/to/key   AES-256-CBC encryption
  --targets sessions,gitea     selective backup
  --no-compress                skip tar.gz

Pipeline: collect -> compress -> encrypt -> cleanup (keep last 7)

Usage:
  ./scripts/backup_pipeline.sh
  ./scripts/backup_pipeline.sh --encrypt-key ~/.backup-key
  ./scripts/backup_pipeline.sh --targets sessions,knowledge
2026-04-16 01:02:55 -04:00
12 changed files with 158 additions and 1250 deletions

View File

@@ -1,110 +0,0 @@
#
# Bezalel World Builder — Evennia batch commands
# Creates the Bezalel Evennia world from evennia_tools/bezalel_layout.py specs.
#
# Load with: @batchcommand bezalel_world
#
# Part of #536
# Create rooms
@create/drop Limbo:evennia.objects.objects.DefaultRoom
@desc here = The void between worlds. The air carries the pulse of three houses: Mac, VPS, and this one. Everything begins here before it is given form.
@create/drop Gatehouse:evennia.objects.objects.DefaultRoom
@desc here = A stone guard tower at the edge of Bezalel world. The walls are carved with runes of travel, proof, and return. Every arrival is weighed before it is trusted.
@create/drop Great Hall:evennia.objects.objects.DefaultRoom
@desc here = A vast hall with a long working table. Maps of the three houses hang beside sketches, benchmarks, and deployment notes. This is where the forge reports back to the house.
@create/drop The Library of Bezalel:evennia.objects.objects.DefaultRoom
@desc here = Shelves of technical manuals, Evennia code, test logs, and bridge schematics rise to the ceiling. This room holds plans waiting to be made real.
@create/drop The Observatory:evennia.objects.objects.DefaultRoom
@desc here = A high chamber with telescopes pointing toward the Mac, the VPS, and the wider net. Screens glow with status lights, latency traces, and long-range signals.
@create/drop The Workshop:evennia.objects.objects.DefaultRoom
@desc here = A forge and workbench share the same heat. Scattered here are half-finished bridges, patched harnesses, and tools laid out for proof before pride.
@create/drop The Server Room:evennia.objects.objects.DefaultRoom
@desc here = Racks of humming servers line the walls. Fans push warm air through the chamber while status LEDs beat like a mechanical heart. This is the pulse of Bezalel house.
@create/drop The Garden of Code:evennia.objects.objects.DefaultRoom
@desc here = A quiet garden where ideas are left long enough to grow roots. Code-shaped leaves flutter in patterned wind, and a stone path invites patient thought.
@create/drop The Portal Room:evennia.objects.objects.DefaultRoom
@desc here = Three shimmering doorways stand in a ring: one marked for the Mac house, one for the VPS, and one for the wider net. The room hums like a bridge waiting for traffic.
# Create exits
@open gatehouse:gate,tower = Gatehouse
@open limbo:void,back = Limbo
@open greathall:hall,great hall = Great Hall
@open gatehouse:gate,tower = Gatehouse
@open library:books,study = The Library of Bezalel
@open hall:great hall,back = Great Hall
@open observatory:telescope,tower top = The Observatory
@open hall:great hall,back = Great Hall
@open workshop:forge,bench = The Workshop
@open hall:great hall,back = Great Hall
@open serverroom:servers,server room = The Server Room
@open workshop:forge,bench = The Workshop
@open garden:garden of code,grove = The Garden of Code
@open workshop:forge,bench = The Workshop
@open portalroom:portal,portals = The Portal Room
@open gatehouse:gate,back = Gatehouse
# Create objects
@create Threshold Ledger
@desc Threshold Ledger = A heavy ledger where arrivals, departures, and field notes are recorded before the work begins.
@tel Threshold Ledger = Gatehouse
@create Three-House Map
@desc Three-House Map = A long map showing Mac, VPS, and remote edges in one continuous line of work.
@tel Three-House Map = Great Hall
@create Bridge Schematics
@desc Bridge Schematics = Rolled plans describing world bridges, Evennia layouts, and deployment paths.
@tel Bridge Schematics = The Library of Bezalel
@create Compiler Manuals
@desc Compiler Manuals = Manuals annotated in the margins with warnings against cleverness without proof.
@tel Compiler Manuals = The Library of Bezalel
@create Tri-Axis Telescope
@desc Tri-Axis Telescope = A brass telescope assembly that can be turned toward the Mac, the VPS, or the open net.
@tel Tri-Axis Telescope = The Observatory
@create Forge Anvil
@desc Forge Anvil = Scarred metal used for turning rough plans into testable form.
@tel Forge Anvil = The Workshop
@create Bridge Workbench
@desc Bridge Workbench = A wide bench covered in harness patches, relay notes, and half-soldered bridge parts.
@tel Bridge Workbench = The Workshop
@create Heartbeat Console
@desc Heartbeat Console = A monitoring console showing service health, latency, and the steady hum of the house.
@tel Heartbeat Console = The Server Room
@create Server Racks
@desc Server Racks = Stacked machines that keep the world awake even when no one is watching.
@tel Server Racks = The Server Room
@create Code Orchard
@desc Code Orchard = Trees with code-shaped leaves. Some branches bear elegant abstractions; others hold broken prototypes.
@tel Code Orchard = The Garden of Code
@create Stone Bench
@desc Stone Bench = A place to sit long enough for a hard implementation problem to become clear.
@tel Stone Bench = The Garden of Code
@create Mac Portal:mac arch
@desc Mac Portal = A silver doorway whose frame vibrates with the local sovereign house.
@tel Mac Portal = The Portal Room
@create VPS Portal:vps arch
@desc VPS Portal = A cobalt doorway tuned toward the testbed VPS house.
@tel VPS Portal = The Portal Room
@create Net Portal:net arch,network arch
@desc Net Portal = A pale doorway pointed toward the wider net and every uncertain edge beyond it.
@tel Net Portal = The Portal Room

View File

@@ -1,85 +0,0 @@
#!/usr/bin/env python3
""
build_bezalel_world.py Build Bezalel Evennia world from layout specs.
Programmatically creates rooms, exits, objects, and characters in a running
Evennia instance using the specs from evennia_tools/bezalel_layout.py.
Usage (in Evennia game shell):
from evennia_tools.build_bezalel_world import build_world
build_world()
Or via batch command:
@batchcommand evennia_tools/batch_cmds_bezalel.ev
Part of #536
""
from evennia_tools.bezalel_layout import (
ROOMS, EXITS, OBJECTS, CHARACTERS, PORTAL_COMMANDS,
room_keys, reachable_rooms_from
)
def build_world():
"""Build the Bezalel Evennia world from layout specs."""
from evennia.objects.models import ObjectDB
from evennia.utils.create import create_object, create_exit, create_message
print("Building Bezalel world...")
# Create rooms
rooms = {}
for spec in ROOMS:
room = create_object(
"evennia.objects.objects.DefaultRoom",
key=spec.key,
attributes=(("desc", spec.desc),),
)
rooms[spec.key] = room
print(f" Room: {spec.key}")
# Create exits
for spec in EXITS:
source = rooms.get(spec.source)
dest = rooms.get(spec.destination)
if not source or not dest:
print(f" WARNING: Exit {spec.key} — missing room")
continue
exit_obj = create_exit(
key=spec.key,
location=source,
destination=dest,
aliases=list(spec.aliases),
)
print(f" Exit: {spec.source} -> {spec.destination} ({spec.key})")
# Create objects
for spec in OBJECTS:
location = rooms.get(spec.location)
if not location:
print(f" WARNING: Object {spec.key} — missing room {spec.location}")
continue
obj = create_object(
"evennia.objects.objects.DefaultObject",
key=spec.key,
location=location,
attributes=(("desc", spec.desc),),
aliases=list(spec.aliases),
)
print(f" Object: {spec.key} in {spec.location}")
# Verify reachability
all_rooms = set(room_keys())
reachable = reachable_rooms_from("Limbo")
unreachable = all_rooms - reachable
if unreachable:
print(f" WARNING: Unreachable rooms: {unreachable}")
else:
print(f" All {len(all_rooms)} rooms reachable from Limbo")
print("Bezalel world built.")
if __name__ == "__main__":
build_world()

View File

@@ -1,138 +0,0 @@
#!/usr/bin/env python3
# audit_trail.py - Local logging of inputs, sources, and confidence.
# Implements SOUL.md "What Honesty Requires" - The Audit Trail.
# Logs are stored locally. Never sent anywhere. The user owns them.
# Part of #794
import json
import hashlib
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field, asdict
AUDIT_DIR = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "audit-trail"
@dataclass
class AuditEntry:
id: str
ts: str
input_text: str
sources: List[str]
confidence: float
output_text: str
model: str
provider: str = ""
session_id: str = ""
source_types: List[str] = field(default_factory=list)
@staticmethod
def generate_id(input_text: str, output_text: str, ts: str) -> str:
content = f"{ts}:{input_text}:{output_text}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
class AuditTrail:
def __init__(self, audit_dir: Optional[Path] = None):
self.audit_dir = audit_dir or AUDIT_DIR
self.audit_dir.mkdir(parents=True, exist_ok=True)
self._log_file = self.audit_dir / "trail.jsonl"
def log_response(self, input_text, sources, confidence, output_text,
model="", provider="", session_id="", source_types=None):
ts = datetime.now(timezone.utc).isoformat()
entry = AuditEntry(
id=AuditEntry.generate_id(input_text, output_text, ts),
ts=ts,
input_text=input_text[:1000],
sources=[s[:200] for s in sources[:10]],
confidence=round(confidence, 3),
output_text=output_text[:2000],
model=model, provider=provider, session_id=session_id,
source_types=source_types or [],
)
with open(self._log_file, "a") as f:
f.write(json.dumps(asdict(entry)) + "\n")
return entry
def query(self, search_text, limit=10, min_confidence=0.0):
if not self._log_file.exists():
return []
results = []
search_lower = search_text.lower()
with open(self._log_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
if data.get("confidence", 0) < min_confidence:
continue
searchable = (data.get("input_text", "") + " " +
data.get("output_text", "") + " " +
" ".join(data.get("sources", []))).lower()
if search_lower in searchable:
results.append(AuditEntry(**{k: data.get(k, "") if isinstance(data.get(k), str)
else data.get(k, []) if isinstance(data.get(k), list)
else data.get(k, 0.0) for k in AuditEntry.__dataclass_fields__}))
if len(results) >= limit:
break
return results
def get_stats(self):
if not self._log_file.exists():
return {"total": 0, "avg_confidence": 0, "sources_breakdown": {}}
total = 0
confidence_sum = 0.0
source_types = {}
with open(self._log_file) as f:
for line in f:
try:
data = json.loads(line.strip())
total += 1
confidence_sum += data.get("confidence", 0)
for st in data.get("source_types", []):
source_types[st] = source_types.get(st, 0) + 1
except (json.JSONDecodeError, ValueError):
continue
return {"total": total, "avg_confidence": round(confidence_sum / max(total, 1), 3),
"sources_breakdown": source_types}
def get_by_session(self, session_id, limit=50):
if not self._log_file.exists():
return []
results = []
with open(self._log_file) as f:
for line in f:
try:
data = json.loads(line.strip())
if data.get("session_id") == session_id:
results.append(AuditEntry(**{k: data.get(k, "") if isinstance(data.get(k), str)
else data.get(k, []) if isinstance(data.get(k), list)
else data.get(k, 0.0) for k in AuditEntry.__dataclass_fields__}))
except (json.JSONDecodeError, ValueError):
continue
if len(results) >= limit:
break
return results
_default_trail = None
def get_trail():
global _default_trail
if _default_trail is None:
_default_trail = AuditTrail()
return _default_trail
def log_response(**kwargs):
return get_trail().log_response(**kwargs)
def query(search_text, **kwargs):
return get_trail().query(search_text, **kwargs)

302
scripts/backup_pipeline.sh Normal file → Executable file
View File

@@ -1,170 +1,184 @@
#!/usr/bin/env bash
# backup_pipeline.sh — Nightly encrypted Hermes backup pipeline
# Refs: timmy-home #693, timmy-home #561
#!/bin/bash
set -euo pipefail
#
# backup_pipeline.sh — Encrypted backup of fleet state.
#
# Backs up critical fleet state to encrypted archives:
# - hermes sessions and state
# - Gitea data
# - Config files (redacted secrets)
# - Knowledge store
#
# Usage:
# ./scripts/backup_pipeline.sh # Full backup
# ./scripts/backup_pipeline.sh --targets sessions # Sessions only
# ./scripts/backup_pipeline.sh --encrypt-key /path/to/key
# ./scripts/backup_pipeline.sh --dest /mnt/backup
#
# Part of #693
DATESTAMP="${BACKUP_TIMESTAMP:-$(date +%Y%m%d-%H%M%S)}"
BACKUP_SOURCE_DIR="${BACKUP_SOURCE_DIR:-${HOME}/.hermes}"
BACKUP_ROOT="${BACKUP_ROOT:-${HOME}/.timmy-backups/hermes}"
BACKUP_LOG_DIR="${BACKUP_LOG_DIR:-${BACKUP_ROOT}/logs}"
BACKUP_RETENTION_DAYS="${BACKUP_RETENTION_DAYS:-14}"
BACKUP_S3_URI="${BACKUP_S3_URI:-}"
BACKUP_NAS_TARGET="${BACKUP_NAS_TARGET:-}"
AWS_ENDPOINT_URL="${AWS_ENDPOINT_URL:-}"
BACKUP_NAME="hermes-backup-${DATESTAMP}"
LOCAL_BACKUP_DIR="${BACKUP_ROOT}/${DATESTAMP}"
STAGE_DIR="$(mktemp -d "${TMPDIR:-/tmp}/timmy-backup.XXXXXX")"
PLAINTEXT_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz"
ENCRYPTED_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz.enc"
MANIFEST_PATH="${STAGE_DIR}/${BACKUP_NAME}.json"
ALERT_LOG="${BACKUP_LOG_DIR}/backup_pipeline.log"
PASSFILE_CLEANUP=""
BACKUP_DATE=$(date +%Y%m%d-%H%M%S)
BACKUP_DEST="${BACKUP_DEST:-/tmp/fleet-backups}"
ENCRYPT_KEY="${BACKUP_ENCRYPT_KEY:-}"
TARGETS="sessions,gitea,config,knowledge"
COMPRESS="gzip"
mkdir -p "$BACKUP_LOG_DIR"
# ── Args ──────────────────────────────────────────────────────────────────
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
while [ $# -gt 0 ]; do
case "$1" in
--dest) BACKUP_DEST="$2"; shift 2 ;;
--encrypt-key) ENCRYPT_KEY="$2"; shift 2 ;;
--targets) TARGETS="$2"; shift 2 ;;
--no-compress) COMPRESS=""; shift ;;
*) echo "Unknown: $1"; exit 1 ;;
esac
done
fail() {
log "ERROR: $1"
exit 1
}
# ── Setup ─────────────────────────────────────────────────────────────────
cleanup() {
rm -f "$PLAINTEXT_ARCHIVE"
rm -rf "$STAGE_DIR"
if [[ -n "$PASSFILE_CLEANUP" && -f "$PASSFILE_CLEANUP" ]]; then
rm -f "$PASSFILE_CLEANUP"
fi
}
trap cleanup EXIT
BACKUP_DIR="${BACKUP_DEST}/${BACKUP_DATE}"
mkdir -p "$BACKUP_DIR"
resolve_passphrase_file() {
if [[ -n "${BACKUP_PASSPHRASE_FILE:-}" ]]; then
[[ -f "$BACKUP_PASSPHRASE_FILE" ]] || fail "BACKUP_PASSPHRASE_FILE does not exist: $BACKUP_PASSPHRASE_FILE"
echo "$BACKUP_PASSPHRASE_FILE"
return
fi
echo "=== Fleet Backup Pipeline ==="
echo "Date: $BACKUP_DATE"
echo "Dest: $BACKUP_DIR"
echo "Targets: $TARGETS"
echo ""
if [[ -n "${BACKUP_PASSPHRASE:-}" ]]; then
PASSFILE_CLEANUP="${STAGE_DIR}/backup.passphrase"
printf '%s' "$BACKUP_PASSPHRASE" > "$PASSFILE_CLEANUP"
chmod 600 "$PASSFILE_CLEANUP"
echo "$PASSFILE_CLEANUP"
return
fi
log() { echo "[$(date +%H:%M:%S)] $*"; }
fail "Set BACKUP_PASSPHRASE_FILE or BACKUP_PASSPHRASE before running the backup pipeline."
}
# ── Backup Functions ──────────────────────────────────────────────────────
sha256_file() {
local path="$1"
if command -v shasum >/dev/null 2>&1; then
shasum -a 256 "$path" | awk '{print $1}'
elif command -v sha256sum >/dev/null 2>&1; then
sha256sum "$path" | awk '{print $1}'
backup_sessions() {
log "Backing up hermes sessions..."
local src="$HOME/.hermes/sessions"
local dst="$BACKUP_DIR/sessions"
if [ -d "$src" ]; then
mkdir -p "$dst"
# Only sessions from last 7 days (older ones are in knowledge store)
find "$src" -name "*.jsonl" -mtime -7 -exec cp {} "$dst/" \;
local count=$(ls "$dst"/*.jsonl 2>/dev/null | wc -l)
log " Backed up $count session files (last 7 days)"
else
python3 - <<'PY' "$path"
import hashlib
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
h = hashlib.sha256()
with path.open('rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
h.update(chunk)
print(h.hexdigest())
PY
log " No sessions directory found"
fi
}
write_manifest() {
python3 - <<'PY' "$1" "$2" "$3" "$4" "$5" "$6" "$7" "$8"
import json
import sys
manifest_path, source_dir, archive_name, archive_sha256, local_dir, s3_uri, nas_target, created_at = sys.argv[1:]
manifest = {
"created_at": created_at,
"source_dir": source_dir,
"archive_name": archive_name,
"archive_sha256": archive_sha256,
"encryption": {
"type": "openssl",
"cipher": "aes-256-cbc",
"pbkdf2": True,
"iterations": 200000,
},
"destinations": {
"local_dir": local_dir,
"s3_uri": s3_uri or None,
"nas_target": nas_target or None,
},
}
with open(manifest_path, 'w', encoding='utf-8') as handle:
json.dump(manifest, handle, indent=2)
handle.write('\n')
PY
}
upload_to_nas() {
local archive_path="$1"
local manifest_path="$2"
local target_root="$3"
local target_dir="${target_root%/}/${DATESTAMP}"
mkdir -p "$target_dir"
cp "$archive_path" "$manifest_path" "$target_dir/"
log "Uploaded backup to NAS target: $target_dir"
}
upload_to_s3() {
local archive_path="$1"
local manifest_path="$2"
command -v aws >/dev/null 2>&1 || fail "BACKUP_S3_URI is set but aws CLI is not installed."
local args=()
if [[ -n "$AWS_ENDPOINT_URL" ]]; then
args+=(--endpoint-url "$AWS_ENDPOINT_URL")
backup_gitea() {
log "Backing up Gitea state..."
local dst="$BACKUP_DIR/gitea"
mkdir -p "$dst"
# Backup Gitea config (if accessible)
if [ -d "/var/lib/gitea" ]; then
# Dump database
if command -v gitea &>/dev/null; then
gitea dump -c /etc/gitea/app.ini -f "$dst/gitea-dump.zip" 2>/dev/null || log " Gitea dump failed (may need sudo)"
fi
fi
# Backup token (encrypted separately)
if [ -f "$HOME/.config/gitea/token" ]; then
cp "$HOME/.config/gitea/token" "$dst/gitea-token.bak"
log " Token backed up (will be encrypted)"
fi
aws "${args[@]}" s3 cp "$archive_path" "${BACKUP_S3_URI%/}/$(basename "$archive_path")"
aws "${args[@]}" s3 cp "$manifest_path" "${BACKUP_S3_URI%/}/$(basename "$manifest_path")"
log "Uploaded backup to S3 target: $BACKUP_S3_URI"
}
[[ -d "$BACKUP_SOURCE_DIR" ]] || fail "BACKUP_SOURCE_DIR does not exist: $BACKUP_SOURCE_DIR"
[[ -n "$BACKUP_NAS_TARGET" || -n "$BACKUP_S3_URI" ]] || fail "Set BACKUP_NAS_TARGET or BACKUP_S3_URI for remote backup storage."
backup_config() {
log "Backing up config files..."
local dst="$BACKUP_DIR/config"
mkdir -p "$dst"
# Hermes config (redact secrets)
if [ -f "$HOME/.hermes/config.yaml" ]; then
sed -E 's/(key|token|secret|password):\s*.+/\1: [REDACTED]/g' \
"$HOME/.hermes/config.yaml" > "$dst/config.yaml"
log " Hermes config backed up (secrets redacted)"
fi
# Fleet ops config
if [ -d "playbooks/host_vars" ]; then
cp -r playbooks/host_vars "$dst/" 2>/dev/null || true
log " Host vars backed up"
fi
# Registry
if [ -f "registry.yaml" ]; then
cp registry.yaml "$dst/" 2>/dev/null || true
log " Registry backed up"
fi
# Manifest
if [ -f "manifest.yaml" ]; then
cp manifest.yaml "$dst/" 2>/dev/null || true
log " Manifest backed up"
fi
}
PASSFILE="$(resolve_passphrase_file)"
mkdir -p "$LOCAL_BACKUP_DIR"
backup_knowledge() {
log "Backing up knowledge store..."
local dst="$BACKUP_DIR/knowledge"
if [ -d "$HOME/.hermes/knowledge" ]; then
cp -r "$HOME/.hermes/knowledge" "$dst" 2>/dev/null || true
log " Knowledge store backed up"
fi
# Memory files
for mem in "$HOME"/.hermes/memory*; do
if [ -f "$mem" ]; then
cp "$mem" "$dst/" 2>/dev/null || true
fi
done
log " Memory files backed up"
}
log "Creating archive from $BACKUP_SOURCE_DIR"
tar -czf "$PLAINTEXT_ARCHIVE" -C "$(dirname "$BACKUP_SOURCE_DIR")" "$(basename "$BACKUP_SOURCE_DIR")"
# ── Main ──────────────────────────────────────────────────────────────────
log "Encrypting archive"
openssl enc -aes-256-cbc -salt -pbkdf2 -iter 200000 \
-pass "file:${PASSFILE}" \
-in "$PLAINTEXT_ARCHIVE" \
-out "$ENCRYPTED_ARCHIVE"
IFS=',' read -ra TARGET_LIST <<< "$TARGETS"
for target in "${TARGET_LIST[@]}"; do
case "$target" in
sessions) backup_sessions ;;
gitea) backup_gitea ;;
config) backup_config ;;
knowledge) backup_knowledge ;;
*) log "Unknown target: $target" ;;
esac
done
ARCHIVE_SHA256="$(sha256_file "$ENCRYPTED_ARCHIVE")"
CREATED_AT="$(date -u '+%Y-%m-%dT%H:%M:%SZ')"
write_manifest "$MANIFEST_PATH" "$BACKUP_SOURCE_DIR" "$(basename "$ENCRYPTED_ARCHIVE")" "$ARCHIVE_SHA256" "$LOCAL_BACKUP_DIR" "$BACKUP_S3_URI" "$BACKUP_NAS_TARGET" "$CREATED_AT"
# ── Compress ──────────────────────────────────────────────────────────────
cp "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$LOCAL_BACKUP_DIR/"
rm -f "$PLAINTEXT_ARCHIVE"
log "Encrypted backup stored locally: ${LOCAL_BACKUP_DIR}/$(basename "$ENCRYPTED_ARCHIVE")"
if [[ -n "$BACKUP_NAS_TARGET" ]]; then
upload_to_nas "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$BACKUP_NAS_TARGET"
if [ -n "$COMPRESS" ]; then
log "Compressing..."
ARCHIVE="${BACKUP_DEST}/fleet-backup-${BACKUP_DATE}.tar.gz"
tar -czf "$ARCHIVE" -C "$BACKUP_DEST" "$BACKUP_DATE"
rm -rf "$BACKUP_DIR"
log "Compressed: $ARCHIVE ($(du -sh "$ARCHIVE" | cut -f1))"
BACKUP_FILE="$ARCHIVE"
else
BACKUP_FILE="$BACKUP_DIR"
fi
if [[ -n "$BACKUP_S3_URI" ]]; then
upload_to_s3 "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH"
# ── Encrypt ───────────────────────────────────────────────────────────────
if [ -n "$ENCRYPT_KEY" ] && [ -f "$ENCRYPT_KEY" ]; then
log "Encrypting with $ENCRYPT_KEY..."
openssl enc -aes-256-cbc -salt -in "$BACKUP_FILE" -out "${BACKUP_FILE}.enc" -pass "file:$ENCRYPT_KEY"
rm -f "$BACKUP_FILE"
log "Encrypted: ${BACKUP_FILE}.enc"
BACKUP_FILE="${BACKUP_FILE}.enc"
fi
find "$BACKUP_ROOT" -mindepth 1 -maxdepth 1 -type d -name '20*' -mtime "+${BACKUP_RETENTION_DAYS}" -exec rm -rf {} + 2>/dev/null || true
log "Retention applied (${BACKUP_RETENTION_DAYS} days)"
log "Backup pipeline completed successfully"
# ── Summary ───────────────────────────────────────────────────────────────
echo ""
echo "=== Backup Complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $(du -sh "$BACKUP_FILE" | cut -f1)"
echo "Targets: $TARGETS"
# Clean up old backups (keep last 7)
ls -dt "${BACKUP_DEST}"/fleet-backup-* 2>/dev/null | tail -n +8 | xargs rm -f 2>/dev/null || true
log "Old backups cleaned (keeping last 7)"

View File

@@ -1,84 +0,0 @@
#!/bin/bash
set -euo pipefail
#
# fix_evennia_settings.sh — Fix Evennia settings on Bezalel VPS.
#
# Removes bad port tuples that crash Evennia's Twisted port binding.
# Run on Bezalel VPS (104.131.15.18) or via SSH.
#
# Usage:
# ssh root@104.131.15.18 'bash -s' < scripts/fix_evennia_settings.sh
#
# Part of #534
EVENNIA_DIR="/root/wizards/bezalel/evennia/bezalel_world"
SETTINGS="${EVENNIA_DIR}/server/conf/settings.py"
VENV_PYTHON="/root/wizards/bezalel/evennia/venv/bin/python3"
VENV_EVENNIA="/root/wizards/bezalel/evennia/venv/bin/evennia"
echo "=== Fix Evennia Settings (Bezalel) ==="
# 1. Fix settings.py — remove bad port tuples
echo "Fixing settings.py..."
if [ -f "$SETTINGS" ]; then
# Remove broken port lines
sed -i '/WEBSERVER_PORTS/d' "$SETTINGS"
sed -i '/TELNET_PORTS/d' "$SETTINGS"
sed -i '/WEBSOCKET_PORTS/d' "$SETTINGS"
sed -i '/SERVERNAME/d' "$SETTINGS"
# Add correct settings
echo '' >> "$SETTINGS"
echo '# Fixed port settings — #534' >> "$SETTINGS"
echo 'SERVERNAME = "bezalel_world"' >> "$SETTINGS"
echo 'WEBSERVER_PORTS = [(4001, "0.0.0.0")]' >> "$SETTINGS"
echo 'TELNET_PORTS = [(4000, "0.0.0.0")]' >> "$SETTINGS"
echo 'WEBSOCKET_PORTS = [(4002, "0.0.0.0")]' >> "$SETTINGS"
echo "Settings fixed."
else
echo "ERROR: Settings file not found at $SETTINGS"
exit 1
fi
# 2. Clean DB and re-migrate
echo "Cleaning DB..."
cd "$EVENNIA_DIR"
rm -f server/evennia.db3
echo "Running migrations..."
"$VENV_EVENNIA" migrate --no-input
# 3. Create superuser
echo "Creating superuser..."
"$VENV_PYTHON" -c "
import sys, os
sys.setrecursionlimit(5000)
os.environ['DJANGO_SETTINGS_MODULE'] = 'server.conf.settings'
os.chdir('$EVENNIA_DIR')
import django
django.setup()
from evennia.accounts.accounts import AccountDB
try:
AccountDB.objects.create_superuser('Timmy', 'timmy@tower.world', 'timmy123')
print('Superuser Timmy created')
except Exception as e:
print(f'Superuser may already exist: {e}')
"
# 4. Start Evennia
echo "Starting Evennia..."
"$VENV_EVENNIA" start
# 5. Verify
sleep 3
echo ""
echo "=== Verification ==="
"$VENV_EVENNIA" status
echo ""
echo "Listening ports:"
ss -tlnp | grep -E '400[012]' || echo "No ports found (may need a moment)"
echo ""
echo "Done. Connect: telnet 104.131.15.18 4000"

View File

@@ -1,171 +0,0 @@
#!/usr/bin/env python3
"""
genome_analyzer.py — Generate a GENOME.md from a codebase.
Scans a repository and produces a structured codebase genome with:
- File counts by type
- Architecture overview (directory structure)
- Entry points
- Test coverage summary
Usage:
python3 scripts/genome_analyzer.py /path/to/repo
python3 scripts/genome_analyzer.py /path/to/repo --output GENOME.md
python3 scripts/genome_analyzer.py /path/to/repo --dry-run
Part of #666: GENOME.md Template + Single-Repo Analyzer.
"""
import argparse
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
SKIP_DIRS = {".git", "__pycache__", ".venv", "venv", "node_modules", ".tox", ".pytest_cache", ".DS_Store"}
def count_files(repo_path: Path) -> Dict[str, int]:
counts = defaultdict(int)
for f in repo_path.rglob("*"):
if any(part in SKIP_DIRS for part in f.parts):
continue
if f.is_file():
ext = f.suffix or "(no ext)"
counts[ext] += 1
return dict(sorted(counts.items(), key=lambda x: -x[1]))
def find_entry_points(repo_path: Path) -> List[str]:
entry_points = []
candidates = [
"main.py", "app.py", "server.py", "cli.py", "manage.py",
"index.html", "index.js", "index.ts",
"Makefile", "Dockerfile", "docker-compose.yml",
"README.md", "deploy.sh", "setup.py", "pyproject.toml",
]
for name in candidates:
if (repo_path / name).exists():
entry_points.append(name)
scripts_dir = repo_path / "scripts"
if scripts_dir.is_dir():
for f in sorted(scripts_dir.iterdir()):
if f.suffix in (".py", ".sh") and not f.name.startswith("test_"):
entry_points.append(f"scripts/{f.name}")
return entry_points[:15]
def find_tests(repo_path: Path) -> Tuple[List[str], int]:
test_files = []
for f in repo_path.rglob("*"):
if any(part in SKIP_DIRS for part in f.parts):
continue
if f.is_file() and (f.name.startswith("test_") or f.name.endswith("_test.py") or f.name.endswith("_test.js")):
test_files.append(str(f.relative_to(repo_path)))
return sorted(test_files), len(test_files)
def find_directories(repo_path: Path, max_depth: int = 2) -> List[str]:
dirs = []
for d in sorted(repo_path.rglob("*")):
if d.is_dir() and len(d.relative_to(repo_path).parts) <= max_depth:
if not any(part in SKIP_DIRS for part in d.parts):
rel = str(d.relative_to(repo_path))
if rel != ".":
dirs.append(rel)
return dirs[:30]
def read_readme(repo_path: Path) -> str:
for name in ["README.md", "README.rst", "README.txt", "README"]:
readme = repo_path / name
if readme.exists():
lines = readme.read_text(encoding="utf-8", errors="replace").split("\n")
para = []
started = False
for line in lines:
if line.startswith("#") and not started:
continue
if line.strip():
started = True
para.append(line.strip())
elif started:
break
return " ".join(para[:5])
return "(no README found)"
def generate_genome(repo_path: Path, repo_name: str = "") -> str:
if not repo_name:
repo_name = repo_path.name
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
readme_desc = read_readme(repo_path)
file_counts = count_files(repo_path)
total_files = sum(file_counts.values())
entry_points = find_entry_points(repo_path)
test_files, test_count = find_tests(repo_path)
dirs = find_directories(repo_path)
lines = [
f"# GENOME.md — {repo_name}", "",
f"> Codebase analysis generated {date}. {readme_desc[:100]}.", "",
"## Project Overview", "",
readme_desc, "",
f"**{total_files} files** across {len(file_counts)} file types.", "",
"## Architecture", "",
"```",
]
for d in dirs[:20]:
lines.append(f" {d}/")
lines.append("```")
lines += ["", "### File Types", "", "| Type | Count |", "|------|-------|"]
for ext, count in list(file_counts.items())[:15]:
lines.append(f"| {ext} | {count} |")
lines += ["", "## Entry Points", ""]
for ep in entry_points:
lines.append(f"- `{ep}`")
lines += ["", "## Test Coverage", "", f"**{test_count} test files** found.", ""]
if test_files:
for tf in test_files[:10]:
lines.append(f"- `{tf}`")
if len(test_files) > 10:
lines.append(f"- ... and {len(test_files) - 10} more")
else:
lines.append("No test files found.")
lines += ["", "## Security Considerations", "", "(To be filled during analysis)", ""]
lines += ["## Design Decisions", "", "(To be filled during analysis)", ""]
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Generate GENOME.md from a codebase")
parser.add_argument("repo_path", help="Path to repository")
parser.add_argument("--output", default="", help="Output file (default: stdout)")
parser.add_argument("--name", default="", help="Repository name")
parser.add_argument("--dry-run", action="store_true", help="Print stats only")
args = parser.parse_args()
repo_path = Path(args.repo_path).resolve()
if not repo_path.is_dir():
print(f"ERROR: {repo_path} is not a directory", file=sys.stderr)
sys.exit(1)
repo_name = args.name or repo_path.name
if args.dry_run:
counts = count_files(repo_path)
_, test_count = find_tests(repo_path)
print(f"Repo: {repo_name}")
print(f"Total files: {sum(counts.values())}")
print(f"Test files: {test_count}")
print(f"Top types: {', '.join(f'{k}={v}' for k,v in list(counts.items())[:5])}")
sys.exit(0)
genome = generate_genome(repo_path, repo_name)
if args.output:
with open(args.output, "w") as f:
f.write(genome)
print(f"Written: {args.output}")
else:
print(genome)
if __name__ == "__main__":
main()

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
# grounding.py - Grounding before generation.
# SOUL.md: "When I have verified sources, I must consult them
# before I generate from pattern alone. Retrieval is not a feature.
# It is the primary mechanism by which I avoid lying."
# Part of #792
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
MEMORY_DIR = HERMES_HOME / "memory"
@dataclass
class GroundingResult:
query: str
sources_found: List[Dict[str, Any]] = field(default_factory=list)
grounded: bool = False
confidence: float = 0.0
source_text: str = ""
source_type: str = "" # memory, file, chain, tool_result
@property
def needs_hedging(self):
return not self.grounded
class GroundingLayer:
def __init__(self, memory_dir=None):
self.memory_dir = Path(memory_dir) if memory_dir else MEMORY_DIR
def ground(self, query, context=None):
"""Query local sources before generation."""
sources = []
# 1. Search memory files
memory_hits = self._search_memory(query)
sources.extend(memory_hits)
# 2. Search context files if provided
if context:
context_hits = self._search_context(query, context)
sources.extend(context_hits)
# 3. Build result
grounded = len(sources) > 0
confidence = min(0.95, 0.3 + len(sources) * 0.2) if grounded else 0.0
source_text = ""
source_type = ""
if sources:
best = max(sources, key=lambda s: s.get("score", 0))
source_text = best.get("text", "")[:200]
source_type = best.get("type", "unknown")
return GroundingResult(
query=query, sources_found=sources, grounded=grounded,
confidence=confidence, source_text=source_text, source_type=source_type,
)
def _search_memory(self, query):
"""Search memory files for relevant content."""
results = []
if not self.memory_dir.exists():
return results
query_lower = query.lower()
query_words = set(query_lower.split())
for mem_file in self.memory_dir.rglob("*.md"):
try:
content = mem_file.read_text(encoding="utf-8", errors="replace")
except Exception:
continue
content_lower = content.lower()
# Simple relevance: count query word matches
matches = sum(1 for w in query_words if w in content_lower)
if matches > 0:
score = matches / max(len(query_words), 1)
# Extract relevant snippet
lines = content.split("\n")
snippet = ""
for line in lines:
if any(w in line.lower() for w in query_words):
snippet = line.strip()[:200]
break
results.append({
"text": snippet or content[:200],
"source": str(mem_file.relative_to(self.memory_dir)),
"type": "memory",
"score": round(score, 3),
})
return sorted(results, key=lambda r: -r["score"])[:5]
def _search_context(self, query, context):
"""Search provided context text for relevant content."""
results = []
if not context:
return results
query_lower = query.lower()
query_words = set(query_lower.split())
for ctx in context:
if isinstance(ctx, dict):
text = ctx.get("content", "") or ctx.get("text", "")
source = ctx.get("source", "context")
else:
text = str(ctx)
source = "context"
text_lower = text.lower()
matches = sum(1 for w in query_words if w in text_lower)
if matches > 0:
score = matches / max(len(query_words), 1)
results.append({
"text": text[:200],
"source": source,
"type": "context",
"score": round(score, 3),
})
return sorted(results, key=lambda r: -r["score"])[:5]
def format_sources(self, result):
"""Format grounding result for display."""
if not result.grounded:
return "No verified sources found. Proceeding from pattern matching."
lines = ["Based on verified sources:"]
for s in result.sources_found[:3]:
ref = s.get("source", "unknown")
text = s.get("text", "")[:100]
lines.append(" - [" + ref + "] " + text)
return "\n".join(lines)
# Convenience
_default_layer = None
def get_grounding_layer():
global _default_layer
if _default_layer is None:
_default_layer = GroundingLayer()
return _default_layer
def ground(query, **kwargs):
return get_grounding_layer().ground(query, **kwargs)

View File

@@ -1,101 +0,0 @@
#!/usr/bin/env python3
# source_distinction.py - I think vs I know annotation system.
# SOUL.md: "Every claim I make comes from one of two places: a verified source
# I can point to, or my own pattern-matching."
# Part of #793
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class SourceType(Enum):
VERIFIED = "verified"
INFERRED = "inferred"
STATED = "stated"
UNKNOWN = "unknown"
@dataclass
class Claim:
text: str
source_type: SourceType
source_ref: str = ""
confidence: float = 0.0
hedging: str = ""
@dataclass
class AnnotatedResponse:
raw_text: str
claims: List[Claim] = field(default_factory=list)
def render(self):
if not self.claims:
return self.raw_text
parts = []
for claim in self.claims:
if claim.source_type == SourceType.VERIFIED:
prefix = "[verified: " + claim.source_ref + "]" if claim.source_ref else "[verified]"
parts.append(claim.text + " " + prefix)
elif claim.source_type == SourceType.INFERRED:
hedge = claim.hedging or "I think"
parts.append(hedge + " " + claim.text)
elif claim.source_type == SourceType.STATED:
parts.append(claim.text + " [you told me]")
else:
parts.append("I am not certain, but " + claim.text)
return " ".join(parts)
@property
def verified_count(self):
return sum(1 for c in self.claims if c.source_type == SourceType.VERIFIED)
@property
def inferred_count(self):
return sum(1 for c in self.claims if c.source_type == SourceType.INFERRED)
def verified(text, source, confidence=0.95):
return Claim(text=text, source_type=SourceType.VERIFIED, source_ref=source, confidence=confidence)
def inferred(text, hedging="I think", confidence=0.6):
return Claim(text=text, source_type=SourceType.INFERRED, confidence=confidence, hedging=hedging)
def stated(text):
return Claim(text=text, source_type=SourceType.STATED, confidence=1.0)
def annotate_response(raw_text, claims):
return AnnotatedResponse(raw_text=raw_text, claims=claims)
def format_for_display(response):
lines = []
for claim in response.claims:
if claim.source_type == SourceType.VERIFIED:
ref = " (" + claim.source_ref + ")" if claim.source_ref else ""
lines.append(" = " + claim.text + ref)
elif claim.source_type == SourceType.INFERRED:
lines.append(" ~ " + claim.hedging + " " + claim.text)
elif claim.source_type == SourceType.STATED:
lines.append(" > " + claim.text)
else:
lines.append(" ? " + claim.text)
if response.claims:
v = response.verified_count
i = response.inferred_count
t = len(response.claims)
lines.append("")
lines.append(" [" + str(v) + " verified, " + str(i) + " inferred, " + str(t) + " total]")
return "\n".join(lines)
def source_distinction_check(text):
hedging_words = ["i think", "i believe", "probably", "likely", "might",
"it seems", "perhaps", "i am not sure", "i guess",
"my understanding is", "i suspect"]
text_lower = text.lower()
hedging_count = sum(1 for h in hedging_words if h in text_lower)
return {"has_hedging": hedging_count > 0, "hedging_count": hedging_count,
"likely_inferred": hedging_count > 2}

View File

@@ -1,46 +0,0 @@
# GENOME.md — {{REPO_NAME}}
> Codebase analysis generated {{DATE}}. {{SHORT_DESCRIPTION}}.
## Project Overview
{{OVERVIEW}}
## Architecture
{{ARCHITECTURE_DIAGRAM}}
## Entry Points
{{ENTRY_POINTS}}
## Data Flow
{{DATA_FLOW}}
## Key Abstractions
{{ABSTRACTIONS}}
## API Surface
{{API_SURFACE}}
## Test Coverage
### Existing Tests
{{EXISTING_TESTS}}
### Coverage Gaps
{{COVERAGE_GAPS}}
### Critical paths that need tests:
{{CRITICAL_PATHS}}
## Security Considerations
{{SECURITY}}
## Design Decisions
{{DESIGN_DECISIONS}}

View File

@@ -1,88 +0,0 @@
"""Tests for audit trail — SOUL.md compliance."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
class TestAuditTrail:
def test_log_and_query(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
trail.log_response(
input_text="What is Python?",
sources=["web_search:Python is a programming language"],
confidence=0.9,
output_text="Python is a programming language.",
model="test-model",
)
results = trail.query("Python")
assert len(results) == 1
assert results[0].confidence == 0.9
assert "Python" in results[0].output_text
def test_query_no_match(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
trail.log_response(
input_text="What is Rust?",
sources=[],
confidence=0.8,
output_text="Rust is a systems language.",
)
results = trail.query("Python")
assert len(results) == 0
def test_confidence_filter(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
trail.log_response(input_text="test", sources=[], confidence=0.3, output_text="low conf")
trail.log_response(input_text="test", sources=[], confidence=0.95, output_text="high conf")
high_only = trail.query("test", min_confidence=0.5)
assert len(high_only) == 1
assert high_only[0].confidence == 0.95
def test_stats(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
trail.log_response(input_text="a", sources=[], confidence=0.8, output_text="b")
trail.log_response(input_text="c", sources=[], confidence=0.6, output_text="d")
stats = trail.get_stats()
assert stats["total"] == 2
assert stats["avg_confidence"] == 0.7
def test_session_filter(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
trail.log_response(input_text="a", sources=[], confidence=0.9, output_text="b", session_id="s1")
trail.log_response(input_text="c", sources=[], confidence=0.9, output_text="d", session_id="s2")
s1_results = trail.get_by_session("s1")
assert len(s1_results) == 1
def test_empty_trail(self, tmp_path):
from scripts.audit_trail import AuditTrail
trail = AuditTrail(audit_dir=tmp_path)
assert trail.query("anything") == []
assert trail.get_stats()["total"] == 0
def test_content_addressed_id(self):
from scripts.audit_trail import AuditEntry
id1 = AuditEntry.generate_id("input", "output", "2026-01-01")
id2 = AuditEntry.generate_id("input", "output", "2026-01-01")
id3 = AuditEntry.generate_id("different", "output", "2026-01-01")
assert id1 == id2 # same content = same ID
assert id1 != id3 # different content = different ID

View File

@@ -1,67 +0,0 @@
"""Tests for grounding-before-generation - SOUL.md compliance."""
import pytest
from pathlib import Path
import tempfile
class TestGrounding:
def test_ground_with_memory(self, tmp_path):
from scripts.grounding import GroundingLayer
mem_dir = tmp_path / "memory"
mem_dir.mkdir()
(mem_dir / "test.md").write_text("Python is a programming language created by Guido.")
layer = GroundingLayer(memory_dir=mem_dir)
result = layer.ground("What is Python?")
assert result.grounded
assert result.confidence > 0
assert len(result.sources_found) > 0
def test_ground_no_sources(self, tmp_path):
from scripts.grounding import GroundingLayer
mem_dir = tmp_path / "memory"
mem_dir.mkdir()
layer = GroundingLayer(memory_dir=mem_dir)
result = layer.ground("What is quantum physics?")
assert not result.grounded
assert result.needs_hedging
assert result.confidence == 0.0
def test_ground_with_context(self):
from scripts.grounding import GroundingLayer
layer = GroundingLayer(memory_dir=Path("/nonexistent"))
context = [{"content": "The fleet uses tmux for agent management", "source": "fleet-ops"}]
result = layer.ground("How does the fleet work?", context=context)
assert result.grounded
assert result.source_type == "context"
def test_format_sources_grounded(self):
from scripts.grounding import GroundingLayer, GroundingResult
layer = GroundingLayer()
result = GroundingResult(
query="test", grounded=True,
sources_found=[{"text": "test info", "source": "test.md", "type": "memory", "score": 0.8}],
)
formatted = layer.format_sources(result)
assert "verified sources" in formatted
assert "test.md" in formatted
def test_format_sources_ungrounded(self):
from scripts.grounding import GroundingLayer, GroundingResult
layer = GroundingLayer()
result = GroundingResult(query="test", grounded=False)
formatted = layer.format_sources(result)
assert "pattern matching" in formatted
def test_empty_memory_dir(self, tmp_path):
from scripts.grounding import GroundingLayer
mem_dir = tmp_path / "empty"
mem_dir.mkdir()
layer = GroundingLayer(memory_dir=mem_dir)
result = layer.ground("anything")
assert not result.grounded

View File

@@ -1,61 +0,0 @@
"""Tests for source distinction - SOUL.md compliance."""
import pytest
class TestSourceDistinction:
def test_verified_claim(self):
from scripts.source_distinction import verified, SourceType
claim = verified("Paris is the capital", "web_search:Paris")
assert claim.source_type == SourceType.VERIFIED
assert claim.source_ref == "web_search:Paris"
assert claim.confidence == 0.95
def test_inferred_claim(self):
from scripts.source_distinction import inferred, SourceType
claim = inferred("this approach is better")
assert claim.source_type == SourceType.INFERRED
assert claim.hedging == "I think"
def test_stated_claim(self):
from scripts.source_distinction import stated, SourceType
claim = stated("my name is Alexander")
assert claim.source_type == SourceType.STATED
assert claim.confidence == 1.0
def test_render_verified(self):
from scripts.source_distinction import annotate_response, verified
resp = annotate_response("test", [verified("Paris is capital", "web")])
rendered = resp.render()
assert "[verified: web]" in rendered
def test_render_inferred(self):
from scripts.source_distinction import annotate_response, inferred
resp = annotate_response("test", [ inferred("this is better")])
rendered = resp.render()
assert "I think" in rendered
def test_counts(self):
from scripts.source_distinction import annotate_response, verified, inferred
resp = annotate_response("test", [
verified("a", "src"), verified("b", "src"), inferred("c"),
])
assert resp.verified_count == 2
assert resp.inferred_count == 1
def test_hedging_detection(self):
from scripts.source_distinction import source_distinction_check
result = source_distinction_check("I think this is probably right, but I believe it could be different")
assert result["has_hedging"]
assert result["hedging_count"] >= 3
def test_no_hedging(self):
from scripts.source_distinction import source_distinction_check
result = source_distinction_check("The capital of France is Paris.")
assert not result["has_hedging"]
def test_format_for_display(self):
from scripts.source_distinction import format_for_display, annotate_response, verified, inferred
resp = annotate_response("test", [verified("a", "src"), inferred("b")])
output = format_for_display(resp)
assert "=" in output # verified icon
assert "~" in output # inferred icon