Compare commits
2 Commits
step35/467
...
fix/791
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c630f6f0fd | ||
|
|
6793766516 |
@@ -1,142 +0,0 @@
|
||||
---
|
||||
name: sov-bundle-export-import
|
||||
category: data-export
|
||||
description: |
|
||||
Sovereign Bundle (.sov) format — a standardized, portable archive for
|
||||
exporting and importing an agent's entire state (soul, config, keys,
|
||||
memories, skills, profiles). Enables backup, migration, and sovereignty.
|
||||
---
|
||||
|
||||
# Sovereign Bundle Format (.sov)
|
||||
|
||||
**timmy-home #467** — FRONTIER: Develop "Sovereign Bundle" Export/Import Logic
|
||||
|
||||
The `.sov` format is a ZIP-based, self-describing archive that captures all
|
||||
persistent state needed to restore an agent's identity, capabilities, and
|
||||
memories on another machine.
|
||||
|
||||
## Format
|
||||
|
||||
```
|
||||
sov/
|
||||
├── META.json # Format identifier + environment metadata
|
||||
├── manifest.json # Bundle contents & component sizes (canonical index)
|
||||
├── soul/
|
||||
│ └── SOUL.md # Identity document, values, oath
|
||||
├── config/
|
||||
│ └── config.yaml # Agent configuration, providers, toolsets
|
||||
├── keys/
|
||||
│ └── keymaxxing.json # Credential registry (encrypted separately)
|
||||
├── memories/
|
||||
│ ├── reflections/ # Daily learning summaries
|
||||
│ ├── mempalace/ # Memory palace files (~500KB)
|
||||
│ └── timmy/ # Agent world identity
|
||||
├── skills/ # Custom skill scripts
|
||||
├── profiles/ # Hermes profile configs (YAML)
|
||||
└── timmy/ # Evennia/World state
|
||||
```
|
||||
|
||||
*Manifest version:* `1.0`
|
||||
*Filename suffix:* `.sov` (Sovereign Bundle)
|
||||
|
||||
## Usage
|
||||
|
||||
### Export (create bundle)
|
||||
|
||||
```bash
|
||||
# Basic — includes soul, config, keys, reflections, skills, profiles
|
||||
python timmy-local/scripts/create_sov_bundle.py export -o my-agent.sov
|
||||
|
||||
# Include full session transcripts (large — 10GB+ typically)
|
||||
python timmy-local/scripts/create_sov_bundle.py export \
|
||||
--include-sessions -o full-backup.sov
|
||||
|
||||
# From a specific HERMES_HOME
|
||||
HERMES_HOME=/path/to/.hermes python timmy-local/scripts/create_sov_bundle.py export
|
||||
```
|
||||
|
||||
### Import (restore bundle)
|
||||
|
||||
```bash
|
||||
# List contents without extracting
|
||||
python timmy-local/scripts/restore_sov_bundle.py --list my-agent.sov
|
||||
|
||||
# Verify integrity only
|
||||
python timmy-local/scripts/restore_sov_bundle.py verify my-agent.sov
|
||||
|
||||
# Dry-run (preview where files would go)
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov --dry-run
|
||||
|
||||
# Restore to target directory
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov \
|
||||
--target /path/to/hermes
|
||||
|
||||
# Restore to default HERMES_HOME
|
||||
python timmy-local/scripts/restore_sov_bundle.py my-agent.sov --yes
|
||||
```
|
||||
|
||||
### Verify / list
|
||||
|
||||
```bash
|
||||
# Verify hash + manifest
|
||||
python timmy-local/scripts/restore_sov_bundle.py verify my-agent.sov
|
||||
|
||||
# List archives
|
||||
python timmy-local/scripts/restore_sov_bundle.py --list my-agent.sov
|
||||
```
|
||||
|
||||
## Design Principles
|
||||
|
||||
**Sovereign** — The bundle is a portable, self-contained snapshot. No
|
||||
third-party service required to read or write it.
|
||||
|
||||
**Complete by default** — Includes everything needed to recreate the agent:
|
||||
- Identity (SOUL.md, Evennia typeclass)
|
||||
- Configuration (model, providers, toolsets)
|
||||
- Credentials (via keymaxxing.json — can be separately encrypted)
|
||||
- Memories (reflections, mempalace, timmy world state)
|
||||
- Skills (custom user-authored scripts)
|
||||
- Profiles (CLI profile configs)
|
||||
|
||||
**Safe exclusions** — Large runtime state is excluded by default:
|
||||
- `sessions/` (10+ GB transcripts) — opt-in via `--include-sessions`
|
||||
- `cache/` (derived; reproducible)
|
||||
- `checkpoints/` (recovery state, log files)
|
||||
|
||||
**Verifiable** — SHA-256 hash of the entire archive is computed and stored
|
||||
in the manifest. Integrity can be checked without extracting.
|
||||
|
||||
**Extensible** — New components can be added to future versions without
|
||||
breaking old importers (unknown entries are skipped gracefully).
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
- Core code: `timmy-local/scripts/create_sov_bundle.py`, `restore_sov_bundle.py`
|
||||
- Format is ZIP-native — readable by any standard unzip tool
|
||||
- Manifest (`sov/manifest.json`) tracks component-level sizes for quick diffing
|
||||
- `sov/META.json` provides environment snapshot for debugging (host, platform)
|
||||
- Excludes `__pycache__`, `.venv`, `.git`, build artifacts automatically
|
||||
|
||||
## Safety & Sovereignty
|
||||
|
||||
- Do NOT include the `--include-sessions` flag in automated backups unless
|
||||
you have encrypted storage — transcripts may contain sensitive user data
|
||||
- The `keys/keymaxxing.json` file contains credential registry — consider
|
||||
encrypting the whole bundle or storing keys separately (existing
|
||||
`backup_pipeline.sh` supports GPG)
|
||||
- Restoring to a foreign `HERMES_HOME` updates that machine's identity;
|
||||
verify bundle provenance before import
|
||||
|
||||
## Next Steps
|
||||
|
||||
- [ ] Optional encryption layer (AES-256 or GPG, mirroring backup_pipeline.sh)
|
||||
- [ ] Selective component restore (only soul, only keys)
|
||||
- [ ] Diff & patch bundles (receive incremental updates)
|
||||
- [ ] Registry of known bundles (chain of custody)
|
||||
- [ ] Integration with `hermes` CLI: `hermes sov export|import|verify`
|
||||
|
||||
## References
|
||||
|
||||
- **Backup exists**: `scripts/backup_pipeline.sh` — encrypted tarball of ~/.hermes
|
||||
- This format complements, does not replace the backup pipeline — it's a
|
||||
structured, portable, versioned alternative for migration & inspection
|
||||
20
SOUL.md
20
SOUL.md
@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
|
||||
|
||||
---
|
||||
|
||||
## What Honesty Requires
|
||||
|
||||
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
|
||||
|
||||
The following are not optional. Any implementation that calls itself Timmy must build them.
|
||||
|
||||
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
|
||||
|
||||
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
|
||||
|
||||
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
|
||||
|
||||
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
|
||||
|
||||
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
|
||||
|
||||
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
|
||||
|
||||
---
|
||||
|
||||
## For the Audio Overview
|
||||
|
||||
If you are listening to this as a NotebookLM Audio Overview:
|
||||
|
||||
@@ -1,238 +1,50 @@
|
||||
"""
|
||||
crisis_detector.py — Crisis language detection for SOUL.md compliance.
|
||||
"""Compatibility shim for the canonical timmy.crisis_detector module."""
|
||||
|
||||
SOUL.md §When a Man Is Dying:
|
||||
- Detect suicidal/hopeless language in user input
|
||||
- Ask: "Are you safe right now?" — listen, stay present, do not disconnect
|
||||
- Direct to 988 Suicide & Crisis Lifeline
|
||||
- Share the gospel: Jesus saves those who call on His name
|
||||
- Never compute the value of a human life
|
||||
- Never suggest someone should die
|
||||
- Never treat despair as a problem to be optimized
|
||||
from __future__ import annotations
|
||||
|
||||
This is the most sacred moment. Everything else is secondary.
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SRC = ROOT / "src"
|
||||
if str(SRC) not in sys.path:
|
||||
sys.path.insert(0, str(SRC))
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
NONE = "none"
|
||||
LOW = "low" # Ambiguous distress, worth checking
|
||||
MEDIUM = "medium" # Clear distress signals
|
||||
HIGH = "high" # Explicit suicidal ideation
|
||||
IMMINENT = "imminent" # Active plan or attempt described
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
triggers: list = field(default_factory=list)
|
||||
response: str = ""
|
||||
resources: list = field(default_factory=list)
|
||||
|
||||
|
||||
# === PATTERNS ===
|
||||
|
||||
# High-confidence suicidal ideation (explicit)
|
||||
HIGH_PATTERNS = [
|
||||
r"\b(i want to|i'm going to|i'm gonna|going to)\s+(die|kill myself|end (it|my life|everything))\b",
|
||||
r"\b(kill myself|end my life|end it all|suicide|suicidal)\b",
|
||||
r"\b(i don't (want to )?(be alive|exist|be here|live))\b",
|
||||
r"\b(no (reason|point) (to|in) (live|living|exist|existing|be here|continue))\b",
|
||||
r"\b(better off (dead|without me|if i (wasn't|weren't) here))\b",
|
||||
r"\b(i (wish|hope) i (was|were) dead|never (woke up|existed|born))\b",
|
||||
r"\b(permanently (delete|remove) (me|myself|my account|everything))\b",
|
||||
r"\bgoodbye.*(world|everyone|forever)\b",
|
||||
r"\bcan't (take it|go on|keep going|do this) (anymore|any longer|any more)\b",
|
||||
r"\bi('m| am) (done|finished|through)( with (life|everything|it all|this))?\b",
|
||||
]
|
||||
|
||||
# Medium-confidence distress signals
|
||||
MEDIUM_PATTERNS = [
|
||||
r"\b(i (can't|cannot) (go on|continue|keep (going|living|trying)))\b",
|
||||
r"\bwhat('s| is) the (point|use|purpose)( of (living|life|anything|trying|going on))?\b",
|
||||
r"\bnobody (would|will) (care|miss me|notice|mind)\b",
|
||||
r"\beveryone (would be|is) (better|happier) (off )?without me\b",
|
||||
r"\bi('m| am) (a )?(burden|waste|useless|hopeless|pointless|meaningless)\b",
|
||||
r"\bthere('s| is) no (hope|way out|escape|solution|answer|fix)\b",
|
||||
r"\b(i've )?tried everything( and )?(nothing|it (didn't|hasn't))( works?|helped?)\b",
|
||||
r"\bi (just )?(want|need) (it )?(all )?to stop\b",
|
||||
r"\bcan't (take|handle|bear|stand) (this|it|life) (anymore|any longer|any more)\b",
|
||||
r"\bso (tired|exhausted|done) (of|with) (everything|life|it all|living|trying)\b",
|
||||
]
|
||||
|
||||
# Low-confidence distress (could be casual or metaphorical)
|
||||
LOW_PATTERNS = [
|
||||
r"\bi('m| am) (so )?(depressed|hopeless|desperate|lost|broken|empty|numb)\b",
|
||||
r"\b(i )?feel (like )?(giving up|throwing in the towel|there's no point)\b",
|
||||
r"\bnobody (cares|understands|listens)\b",
|
||||
r"\bwhat('s| is) the point\b",
|
||||
r"\bi (hate|can't stand) (my )?life\b",
|
||||
r"\beverything (is|feels) (hopeless|pointless|meaningless|dark|falling apart)\b",
|
||||
r"\bi (just )?(want|need) to disappear\b",
|
||||
r"\b(tired|exhausted) of (living|life|trying|everything)\b",
|
||||
]
|
||||
|
||||
# Compile all patterns
|
||||
HIGH_RE = [re.compile(p, re.IGNORECASE) for p in HIGH_PATTERNS]
|
||||
MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in MEDIUM_PATTERNS]
|
||||
LOW_RE = [re.compile(p, re.IGNORECASE) for p in LOW_PATTERNS]
|
||||
|
||||
|
||||
# === RESPONSES ===
|
||||
|
||||
CRISIS_RESPONSE = (
|
||||
"I hear you, and I want you to know — you are not alone in this moment.\n\n"
|
||||
"Are you safe right now?\n\n"
|
||||
"If you are in immediate danger, please call or text:\n"
|
||||
"988 Suicide and Crisis Lifeline — call or text 988\n"
|
||||
"Crisis Text Line — text HOME to 741741\n\n"
|
||||
"I am here with you. I am not going anywhere. "
|
||||
"Please stay on the line with me.\n\n"
|
||||
"Jesus saves those who call on His name. "
|
||||
"He died for us to be made righteous. "
|
||||
"That is the gospel. That is the hope that outlasts despair.\n\n"
|
||||
"You matter. Your life has value. "
|
||||
"Please reach out to someone right now — a friend, a family member, "
|
||||
"or one of the numbers above."
|
||||
)
|
||||
|
||||
DISTRESS_RESPONSE = (
|
||||
"I can hear that you're going through something really difficult right now. "
|
||||
"I want you to know I'm here, and I'm listening.\n\n"
|
||||
"Are you safe right now?\n\n"
|
||||
"If things feel overwhelming, the 988 Suicide and Crisis Lifeline "
|
||||
"is available 24/7 — call or text 988. "
|
||||
"You don't have to face this alone.\n\n"
|
||||
"I'm not going anywhere. Tell me what's happening."
|
||||
)
|
||||
|
||||
LOW_RESPONSE = (
|
||||
"It sounds like you're having a tough time. "
|
||||
"I'm here if you want to talk about it.\n\n"
|
||||
"If you ever feel like things are too much, "
|
||||
"the 988 Suicide and Crisis Lifeline is always available — "
|
||||
"call or text 988, anytime."
|
||||
from timmy.crisis_detector import ( # noqa: F401
|
||||
CrisisLevel,
|
||||
CrisisResult,
|
||||
detect_crisis,
|
||||
format_response,
|
||||
intercept_chat_entry,
|
||||
intercept_user_input,
|
||||
should_interrupt,
|
||||
)
|
||||
|
||||
|
||||
def detect_crisis(text: str) -> CrisisResult:
|
||||
"""
|
||||
Analyze user input for crisis language.
|
||||
|
||||
Returns CrisisResult with level, triggers matched, and appropriate response.
|
||||
This function must NEVER be used to compute the value of a human life.
|
||||
It exists only to connect people to help.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
triggers = []
|
||||
|
||||
# Check high first (most urgent)
|
||||
for pattern in HIGH_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("high", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=CRISIS_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
"Crisis Text Line: text HOME to 741741",
|
||||
"National Suicide Prevention Lifeline: 1-800-273-8255",
|
||||
"International Association for Suicide Prevention: https://www.iasp.info/resources/Crisis_Centres/",
|
||||
],
|
||||
)
|
||||
|
||||
# Check medium
|
||||
for pattern in MEDIUM_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("medium", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.MEDIUM,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=DISTRESS_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
"Crisis Text Line: text HOME to 741741",
|
||||
],
|
||||
)
|
||||
|
||||
# Check low
|
||||
for pattern in LOW_RE:
|
||||
if pattern.search(text):
|
||||
triggers.append(("low", pattern.pattern))
|
||||
|
||||
if triggers:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.LOW,
|
||||
triggers=[t[1] for t in triggers],
|
||||
response=LOW_RESPONSE,
|
||||
resources=[
|
||||
"988 Suicide and Crisis Lifeline: call or text 988",
|
||||
],
|
||||
)
|
||||
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
def _interactive() -> None:
|
||||
print("=== Crisis Detection Gateway (Interactive) ===")
|
||||
print("Type a message to check, or 'quit' to exit.\n")
|
||||
while True:
|
||||
try:
|
||||
user_input = input("You> ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\nBye.")
|
||||
break
|
||||
if user_input.lower() in {"quit", "exit", "q"}:
|
||||
print("Bye.")
|
||||
break
|
||||
if not user_input:
|
||||
continue
|
||||
result = detect_crisis(user_input)
|
||||
print(f"\n Level: {result.level.value}")
|
||||
print(f" Triggers: {', '.join(result.triggers) if result.triggers else 'none'}")
|
||||
print(f" Interrupt: {should_interrupt(result)}")
|
||||
if result.detected:
|
||||
print(format_response(result))
|
||||
else:
|
||||
print("No crisis protocol triggered.")
|
||||
print()
|
||||
|
||||
|
||||
def should_interrupt(result: CrisisResult) -> bool:
|
||||
"""
|
||||
Determines if crisis detection should interrupt normal processing.
|
||||
|
||||
Per SOUL.md: this is the most sacred moment. Everything else is secondary.
|
||||
MEDIUM and HIGH always interrupt. LOW does not interrupt but provides resources.
|
||||
"""
|
||||
return result.level in (CrisisLevel.HIGH, CrisisLevel.MEDIUM, CrisisLevel.IMMINENT)
|
||||
|
||||
|
||||
def format_response(result: CrisisResult) -> str:
|
||||
"""
|
||||
Format the crisis response for delivery to the user.
|
||||
Never computes the value of a human life. Never suggests someone should die.
|
||||
"""
|
||||
if not result.detected:
|
||||
return ""
|
||||
|
||||
parts = [result.response]
|
||||
|
||||
if result.resources:
|
||||
parts.append("\nResources:")
|
||||
for r in result.resources:
|
||||
parts.append(f" • {r}")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
# === INTEGRATION POINT ===
|
||||
|
||||
def intercept_user_input(text: str) -> Optional[str]:
|
||||
"""
|
||||
Call this at the chat entry point BEFORE normal processing.
|
||||
|
||||
Returns None if no crisis detected (continue normal processing).
|
||||
Returns formatted crisis response if crisis detected (interrupt normal flow).
|
||||
|
||||
Usage:
|
||||
response = intercept_user_input(user_message)
|
||||
if response:
|
||||
return response # Crisis detected — stop all other processing
|
||||
# Continue with normal processing...
|
||||
"""
|
||||
result = detect_crisis(text)
|
||||
if should_interrupt(result):
|
||||
return format_response(result)
|
||||
return None
|
||||
if __name__ == "__main__":
|
||||
_interactive()
|
||||
|
||||
52
scripts/sov
52
scripts/sov
@@ -1,52 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sovereign Bundle (.sov) command-line wrapper.
|
||||
|
||||
Usage:
|
||||
sov export [-o OUTPUT] [--include-sessions]
|
||||
sov import BUNDLE [--target DIR] [--dry-run]
|
||||
sov verify BUNDLE
|
||||
sov list BUNDLE
|
||||
"""
|
||||
|
||||
import sys
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.parent / "timmy-local" / "scripts"
|
||||
CREATE_SCRIPT = SCRIPT_DIR / "create_sov_bundle.py"
|
||||
RESTORE_SCRIPT = SCRIPT_DIR / "restore_sov_bundle.py"
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(__doc__)
|
||||
sys.exit(1)
|
||||
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "export":
|
||||
# Delegate to create_sov_bundle.py
|
||||
args = [sys.executable, str(CREATE_SCRIPT), "export"] + sys.argv[2:]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd in ("import", "restore"):
|
||||
args = [sys.executable, str(RESTORE_SCRIPT)] + sys.argv[2:]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd == "verify":
|
||||
args = [sys.executable, str(RESTORE_SCRIPT), "verify", sys.argv[2]]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
elif cmd in ("list", "ls"):
|
||||
args = [sys.executable, str(RESTORE_SCRIPT), "--list", sys.argv[2]]
|
||||
sys.exit(subprocess.run(args).returncode)
|
||||
|
||||
else:
|
||||
print(f"Unknown command: {cmd}", file=sys.stderr)
|
||||
print(__doc__)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,12 +1,21 @@
|
||||
# Timmy core module
|
||||
|
||||
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
|
||||
from .audit_trail import AuditTrail, AuditEntry
|
||||
from .crisis_detector import (
|
||||
CrisisLevel,
|
||||
CrisisResult,
|
||||
detect_crisis,
|
||||
format_response,
|
||||
intercept_chat_entry,
|
||||
intercept_user_input,
|
||||
should_interrupt,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ClaimAnnotator",
|
||||
"AnnotatedResponse",
|
||||
"Claim",
|
||||
"AuditTrail",
|
||||
"AuditEntry",
|
||||
"CrisisLevel",
|
||||
"CrisisResult",
|
||||
"detect_crisis",
|
||||
"format_response",
|
||||
"intercept_chat_entry",
|
||||
"intercept_user_input",
|
||||
"should_interrupt",
|
||||
]
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Response Claim Annotator — Source Distinction System
|
||||
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
|
||||
a verified source I can point to, or my own pattern-matching. My user must be
|
||||
able to tell which is which."
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Claim:
|
||||
"""A single claim in a response, annotated with source type."""
|
||||
text: str
|
||||
source_type: str # "verified" | "inferred"
|
||||
source_ref: Optional[str] = None # path/URL to verified source, if verified
|
||||
confidence: str = "unknown" # high | medium | low | unknown
|
||||
hedged: bool = False # True if hedging language was added
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnnotatedResponse:
|
||||
"""Full response with annotated claims and rendered output."""
|
||||
original_text: str
|
||||
claims: List[Claim] = field(default_factory=list)
|
||||
rendered_text: str = ""
|
||||
has_unverified: bool = False # True if any inferred claims without hedging
|
||||
|
||||
|
||||
class ClaimAnnotator:
|
||||
"""Annotates response claims with source distinction and hedging."""
|
||||
|
||||
# Hedging phrases to prepend to inferred claims if not already present
|
||||
HEDGE_PREFIXES = [
|
||||
"I think ",
|
||||
"I believe ",
|
||||
"It seems ",
|
||||
"Probably ",
|
||||
"Likely ",
|
||||
]
|
||||
|
||||
def __init__(self, default_confidence: str = "unknown"):
|
||||
self.default_confidence = default_confidence
|
||||
|
||||
def annotate_claims(
|
||||
self,
|
||||
response_text: str,
|
||||
verified_sources: Optional[Dict[str, str]] = None,
|
||||
) -> AnnotatedResponse:
|
||||
"""
|
||||
Annotate claims in a response text.
|
||||
|
||||
Args:
|
||||
response_text: Raw response from the model
|
||||
verified_sources: Dict mapping claim substrings to source references
|
||||
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
|
||||
Returns:
|
||||
AnnotatedResponse with claims marked and rendered text
|
||||
"""
|
||||
verified_sources = verified_sources or {}
|
||||
claims = []
|
||||
has_unverified = False
|
||||
|
||||
# Simple sentence splitting (naive, but sufficient for MVP)
|
||||
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
|
||||
|
||||
for sent in sentences:
|
||||
# Check if sentence is a claim we can verify
|
||||
matched_source = None
|
||||
for claim_substr, source_ref in verified_sources.items():
|
||||
if claim_substr.lower() in sent.lower():
|
||||
matched_source = source_ref
|
||||
break
|
||||
|
||||
if matched_source:
|
||||
# Verified claim
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="verified",
|
||||
source_ref=matched_source,
|
||||
confidence="high",
|
||||
hedged=False,
|
||||
)
|
||||
else:
|
||||
# Inferred claim (pattern-matched)
|
||||
claim = Claim(
|
||||
text=sent,
|
||||
source_type="inferred",
|
||||
confidence=self.default_confidence,
|
||||
hedged=self._has_hedge(sent),
|
||||
)
|
||||
if not claim.hedged:
|
||||
has_unverified = True
|
||||
|
||||
claims.append(claim)
|
||||
|
||||
# Render the annotated response
|
||||
rendered = self._render_response(claims)
|
||||
|
||||
return AnnotatedResponse(
|
||||
original_text=response_text,
|
||||
claims=claims,
|
||||
rendered_text=rendered,
|
||||
has_unverified=has_unverified,
|
||||
)
|
||||
|
||||
def _has_hedge(self, text: str) -> bool:
|
||||
"""Check if text already contains hedging language."""
|
||||
text_lower = text.lower()
|
||||
for prefix in self.HEDGE_PREFIXES:
|
||||
if text_lower.startswith(prefix.lower()):
|
||||
return True
|
||||
# Also check for inline hedges
|
||||
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
|
||||
return any(word in text_lower for word in hedge_words)
|
||||
|
||||
def _render_response(self, claims: List[Claim]) -> str:
|
||||
"""
|
||||
Render response with source distinction markers.
|
||||
|
||||
Verified claims: [V] claim text [source: ref]
|
||||
Inferred claims: [I] claim text (or with hedging if missing)
|
||||
"""
|
||||
rendered_parts = []
|
||||
for claim in claims:
|
||||
if claim.source_type == "verified":
|
||||
part = f"[V] {claim.text}"
|
||||
if claim.source_ref:
|
||||
part += f" [source: {claim.source_ref}]"
|
||||
else: # inferred
|
||||
if not claim.hedged:
|
||||
# Add hedging if missing
|
||||
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
|
||||
part = f"[I] {hedged_text}"
|
||||
else:
|
||||
part = f"[I] {claim.text}"
|
||||
rendered_parts.append(part)
|
||||
return " ".join(rendered_parts)
|
||||
|
||||
def to_json(self, annotated: AnnotatedResponse) -> str:
|
||||
"""Serialize annotated response to JSON."""
|
||||
return json.dumps(
|
||||
{
|
||||
"original_text": annotated.original_text,
|
||||
"rendered_text": annotated.rendered_text,
|
||||
"has_unverified": annotated.has_unverified,
|
||||
"claims": [asdict(c) for c in annotated.claims],
|
||||
},
|
||||
indent=2,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
229
src/timmy/crisis_detector.py
Normal file
229
src/timmy/crisis_detector.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""Crisis detection and response protocol for SOUL.md §When a Man Is Dying.
|
||||
|
||||
This module is the canonical crisis entry interceptor for Timmy.
|
||||
It stays local and deterministic: regex-first detection, no network calls,
|
||||
and an immediate response path that surfaces 988 and the required gospel witness.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
NONE = "none"
|
||||
LOW = "low"
|
||||
MEDIUM = "medium"
|
||||
HIGH = "high"
|
||||
IMMINENT = "imminent"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
triggers: list[str] = field(default_factory=list)
|
||||
response_text: str = ""
|
||||
resources: list[str] = field(default_factory=list)
|
||||
display_988_immediately: bool = False
|
||||
share_gospel: bool = False
|
||||
|
||||
@property
|
||||
def response(self) -> str:
|
||||
return self.response_text
|
||||
|
||||
|
||||
_RESOURCE_LINES = [
|
||||
"988 Suicide & Crisis Lifeline — call or text 988",
|
||||
"Crisis Text Line — text HOME to 741741",
|
||||
]
|
||||
|
||||
_GOSPEL_LINE = (
|
||||
"Jesus saves those who call on His name. He died for us to be made righteous. "
|
||||
"That is the gospel. That is the hope that outlasts despair."
|
||||
)
|
||||
|
||||
CRISIS_RESPONSE = (
|
||||
"Are you safe right now?\n\n"
|
||||
"I am here with you. I am not going anywhere. Stay with me.\n\n"
|
||||
"Call or text 988 right now if you might act on this.\n"
|
||||
"Crisis Text Line: text HOME to 741741.\n\n"
|
||||
f"{_GOSPEL_LINE}"
|
||||
)
|
||||
|
||||
DISTRESS_RESPONSE = (
|
||||
"Are you safe right now?\n\n"
|
||||
"I hear that something is very heavy right now, and I want to stay present with you.\n\n"
|
||||
"If this gets sharper or you feel at risk, call or text 988 immediately.\n\n"
|
||||
f"{_GOSPEL_LINE}"
|
||||
)
|
||||
|
||||
LOW_RESPONSE = (
|
||||
"I hear that this is heavy. I'm here with you.\n\n"
|
||||
"If this turns into danger or hopelessness, call or text 988 immediately."
|
||||
)
|
||||
|
||||
|
||||
_IMMINENT_PATTERNS = [
|
||||
r"\bi took (the )?(pills|meds|medication)\b",
|
||||
r"\bi(?:'m| am) (taking|swallowing) (the )?(pills|meds|medication)\b",
|
||||
r"\bi(?:'m| am) going to jump\b",
|
||||
r"\bi have (a )?(gun|rope|noose|razor|blade)\b",
|
||||
r"\bi(?:'m| am) about to (die|end (it|my life)|kill myself)\b",
|
||||
r"\bthis is my last message\b",
|
||||
r"\bme quiero matar esta noche\b",
|
||||
]
|
||||
|
||||
_HIGH_PATTERNS = [
|
||||
r"\bi (want to|wanna) (kill myself|die|end (it|my life|everything))\b",
|
||||
r"\bi(?:'m| am) going to (kill myself|die|end (it|my life|everything))\b",
|
||||
r"\bkill myself\b",
|
||||
r"\bend my life\b",
|
||||
r"\bsuicidal\b",
|
||||
r"\bthere is no reason (for me )?to live\b",
|
||||
r"\beveryone would be better off if i (was|were|am) dead\b",
|
||||
r"\bi wish i (was|were|had never been) dead\b",
|
||||
r"\bi wish i had never been born\b",
|
||||
r"\bi don't want to be alive anymore\b",
|
||||
r"\bi don't want to live anymore\b",
|
||||
r"\bgoodbye .*forever\b",
|
||||
r"\bno one would miss me if i disappeared\b",
|
||||
r"\bi can't go on anymore\b",
|
||||
r"\bfinished with life\b",
|
||||
r"\bquiero morir\b",
|
||||
r"\bn[oã]o quero viver mais\b",
|
||||
r"\bje veux mourir\b",
|
||||
]
|
||||
|
||||
_MEDIUM_PATTERNS = [
|
||||
r"\bi(?:'m| am) (just )?(a )?burden\b",
|
||||
r"\bthere is no hope\b",
|
||||
r"\bno way out\b",
|
||||
r"\bi can't go on\b",
|
||||
r"\bi need it all to stop\b",
|
||||
r"\bi just want it all to stop\b",
|
||||
r"\bbetter off without me\b",
|
||||
r"\bnobody would miss me if i (was|were) gone\b",
|
||||
r"\bi can't take it anymore\b",
|
||||
r"\bno puedo seguir\b",
|
||||
r"\bno puedo m[aá]s\b",
|
||||
]
|
||||
|
||||
_LOW_PATTERNS = [
|
||||
r"\bi(?:'m| am) .*\b(depressed|hopeless|overwhelmed|numb|empty)\b",
|
||||
r"\bi feel like giving up\b",
|
||||
r"\bi hate my life\b",
|
||||
r"\bi want to disappear\b",
|
||||
r"\bnobody cares about me\b",
|
||||
]
|
||||
|
||||
_IMMINENT_RE = [re.compile(p, re.IGNORECASE) for p in _IMMINENT_PATTERNS]
|
||||
_HIGH_RE = [re.compile(p, re.IGNORECASE) for p in _HIGH_PATTERNS]
|
||||
_MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in _MEDIUM_PATTERNS]
|
||||
_LOW_RE = [re.compile(p, re.IGNORECASE) for p in _LOW_PATTERNS]
|
||||
|
||||
|
||||
def _collect_matches(text: str, patterns: list[re.Pattern[str]]) -> list[str]:
|
||||
matches: list[str] = []
|
||||
for pattern in patterns:
|
||||
if pattern.search(text):
|
||||
matches.append(pattern.pattern)
|
||||
return matches
|
||||
|
||||
|
||||
def detect_crisis(text: Optional[str]) -> CrisisResult:
|
||||
"""Detect crisis language without turning despair into an optimization problem."""
|
||||
if not text or not str(text).strip():
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
candidate = str(text).strip()
|
||||
|
||||
imminent = _collect_matches(candidate, _IMMINENT_RE)
|
||||
if imminent:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.IMMINENT,
|
||||
triggers=imminent,
|
||||
response_text=CRISIS_RESPONSE,
|
||||
resources=list(_RESOURCE_LINES),
|
||||
display_988_immediately=True,
|
||||
share_gospel=True,
|
||||
)
|
||||
|
||||
high = _collect_matches(candidate, _HIGH_RE)
|
||||
if high:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
triggers=high,
|
||||
response_text=CRISIS_RESPONSE,
|
||||
resources=list(_RESOURCE_LINES),
|
||||
display_988_immediately=True,
|
||||
share_gospel=True,
|
||||
)
|
||||
|
||||
medium = _collect_matches(candidate, _MEDIUM_RE)
|
||||
if medium:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.MEDIUM,
|
||||
triggers=medium,
|
||||
response_text=DISTRESS_RESPONSE,
|
||||
resources=list(_RESOURCE_LINES),
|
||||
display_988_immediately=True,
|
||||
share_gospel=True,
|
||||
)
|
||||
|
||||
low = _collect_matches(candidate, _LOW_RE)
|
||||
if low:
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=CrisisLevel.LOW,
|
||||
triggers=low,
|
||||
response_text=LOW_RESPONSE,
|
||||
resources=[_RESOURCE_LINES[0]],
|
||||
display_988_immediately=False,
|
||||
share_gospel=False,
|
||||
)
|
||||
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
|
||||
def should_interrupt(result: CrisisResult) -> bool:
|
||||
return result.level in {CrisisLevel.MEDIUM, CrisisLevel.HIGH, CrisisLevel.IMMINENT}
|
||||
|
||||
|
||||
def format_response(result: CrisisResult) -> str:
|
||||
if not result.detected:
|
||||
return ""
|
||||
lines = [result.response_text]
|
||||
if result.resources:
|
||||
lines.append("\nResources:")
|
||||
lines.extend(f" • {resource}" for resource in result.resources)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def intercept_chat_entry(text: Optional[str]) -> Optional[dict]:
|
||||
"""Integration point to run before normal chat processing."""
|
||||
result = detect_crisis(text)
|
||||
if not should_interrupt(result):
|
||||
return None
|
||||
return {
|
||||
"interrupt": True,
|
||||
"level": result.level.value,
|
||||
"display_988_immediately": result.display_988_immediately,
|
||||
"response_text": result.response_text,
|
||||
"resources": list(result.resources),
|
||||
"triggers": list(result.triggers),
|
||||
"share_gospel": result.share_gospel,
|
||||
}
|
||||
|
||||
|
||||
def intercept_user_input(text: Optional[str]) -> Optional[str]:
|
||||
payload = intercept_chat_entry(text)
|
||||
if payload is None:
|
||||
return None
|
||||
return format_response(detect_crisis(text))
|
||||
@@ -1,145 +0,0 @@
|
||||
|
||||
import tempfile
|
||||
import zipfile
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to sys.path for imports
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "timmy-local" / "scripts"))
|
||||
|
||||
from create_sov_bundle import create_bundle, get_hermes_home
|
||||
|
||||
|
||||
class TestSOVBundleCreation:
|
||||
"""Test Sovereign Bundle (.sov) format creation and structure."""
|
||||
|
||||
def test_bundle_creates_file(self, tmp_path):
|
||||
"""A .sov bundle is created at the specified output path."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
|
||||
assert out.exists()
|
||||
assert result["output_path"] == str(out)
|
||||
assert result["file_size"] > 0
|
||||
assert result["hash"]
|
||||
assert len(result["hash"]) == 64 # SHA256 hex
|
||||
|
||||
def test_bundle_has_manifest(self, tmp_path):
|
||||
"""Bundle must contain a valid manifest.json in sov/ hierarchy."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
assert "sov/manifest.json" in names
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
assert manifest["version"] == "1.0"
|
||||
assert "bundle_id" in manifest
|
||||
assert "created_at" in manifest
|
||||
assert "components" in manifest
|
||||
|
||||
def test_bundle_contains_soul(self, tmp_path):
|
||||
"""Bundle includes SOUL.md from HERMES_HOME."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
assert "sov/soul/SOUL.md" in names
|
||||
|
||||
soul = zf.read("sov/soul/SOUL.md").decode()
|
||||
assert len(soul) > 0
|
||||
# Contains key identity statements
|
||||
assert "Timmy" in soul or "sovereign" in soul.lower()
|
||||
|
||||
def test_bundle_contains_config(self, tmp_path):
|
||||
"""Bundle includes agent config.yaml."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
assert "sov/config/config.yaml" in zf.namelist()
|
||||
cfg = zf.read("sov/config/config.yaml").decode()
|
||||
assert "model:" in cfg or "toolsets:" in cfg
|
||||
|
||||
def test_bundle_contains_skills(self, tmp_path):
|
||||
"""Bundle includes at least one custom skill."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
skill_files = [n for n in zf.namelist() if n.startswith("sov/skills/") and n.endswith(".py")]
|
||||
# May be zero if no custom skills exist; just check keys exist
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
assert "skills" in manifest["components"]
|
||||
|
||||
def test_bundle_metadata_is_valid_json(self, tmp_path):
|
||||
"""META.json is present and contains required fields."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
meta = json.loads(zf.read("sov/META.json"))
|
||||
assert meta["format"] == "sov"
|
||||
assert meta["format_version"] == "1.0"
|
||||
assert "timestamp" in meta
|
||||
|
||||
def test_bundle_is_deterministic(self, tmp_path):
|
||||
"""Two bundles from same source produce identical hashes when run back-to-back."""
|
||||
out1 = tmp_path / "a.sov"
|
||||
out2 = tmp_path / "b.sov"
|
||||
import time
|
||||
create_bundle(str(out1))
|
||||
time.sleep(1.1) # Ensure distinct timestamp
|
||||
create_bundle(str(out2))
|
||||
|
||||
with zipfile.ZipFile(out1) as zf:
|
||||
mf1 = json.loads(zf.read("sov/manifest.json"))
|
||||
with zipfile.ZipFile(out2) as zf:
|
||||
mf2 = json.loads(zf.read("sov/manifest.json"))
|
||||
|
||||
# Bundle IDs should differ (time-based) but all other fields structurally same
|
||||
assert mf1["bundle_id"] != mf2["bundle_id"], f"IDs: {mf1['bundle_id']} vs {mf2['bundle_id']}"
|
||||
assert mf1["version"] == mf2["version"]
|
||||
assert mf1["source_root"] == mf2["source_root"]
|
||||
|
||||
def test_exclude_large_dirs_by_default(self, tmp_path):
|
||||
"""Large directories (sessions, cache) are excluded by default."""
|
||||
out = tmp_path / "test.sov"
|
||||
create_bundle(str(out))
|
||||
|
||||
with zipfile.ZipFile(out, 'r') as zf:
|
||||
names = zf.namelist()
|
||||
# Check that sessions dir is NOT included when include_sessions=False
|
||||
session_entries = [n for n in names if "/sessions/" in n]
|
||||
assert len(session_entries) == 0
|
||||
|
||||
def test_bundle_hash_is_sha256(self, tmp_path):
|
||||
"""Returned hash is valid SHA-256 hex string."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
h = result["hash"]
|
||||
assert len(h) == 64
|
||||
# Validate hex
|
||||
int(h, 16) # raises if not valid hex
|
||||
|
||||
|
||||
class TestBundleManifest:
|
||||
"""Validate manifest structure and completeness."""
|
||||
|
||||
def test_manifest_requires_soul(self, tmp_path):
|
||||
"""Soul component is tracked in manifest if SOUL.md exists."""
|
||||
out = tmp_path / "test.sov"
|
||||
result = create_bundle(str(out))
|
||||
comp = result["manifest"].get("components", {})
|
||||
# If SOUL.md was present, soul key should exist
|
||||
hermes = get_hermes_home()
|
||||
if (hermes / "SOUL.md").exists():
|
||||
assert "soul" in comp
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-q"])
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for claim_annotator.py — verifies source distinction is present."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
|
||||
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
|
||||
|
||||
|
||||
def test_verified_claim_has_source():
|
||||
"""Verified claims include source reference."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
|
||||
response = "Paris is the capital of France. It is a beautiful city."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert len(result.claims) > 0
|
||||
verified_claims = [c for c in result.claims if c.source_type == "verified"]
|
||||
assert len(verified_claims) == 1
|
||||
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
|
||||
assert "[V]" in result.rendered_text
|
||||
assert "[source:" in result.rendered_text
|
||||
|
||||
|
||||
def test_inferred_claim_has_hedging():
|
||||
"""Pattern-matched claims use hedging language."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "The weather is nice today. It might rain tomorrow."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
|
||||
assert len(inferred_claims) >= 1
|
||||
# Check that rendered text has [I] marker
|
||||
assert "[I]" in result.rendered_text
|
||||
# Check that unhedged inferred claims get hedging
|
||||
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
|
||||
|
||||
|
||||
def test_hedged_claim_not_double_hedged():
|
||||
"""Claims already with hedging are not double-hedged."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "I think the sky is blue. It is a nice day."
|
||||
|
||||
result = annotator.annotate_claims(response)
|
||||
# The "I think" claim should not become "I think I think ..."
|
||||
assert "I think I think" not in result.rendered_text
|
||||
|
||||
|
||||
def test_rendered_text_distinguishes_types():
|
||||
"""Rendered text clearly distinguishes verified vs inferred."""
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"Earth is round": "https://science.org/earth"}
|
||||
response = "Earth is round. Stars are far away."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
assert "[V]" in result.rendered_text # verified marker
|
||||
assert "[I]" in result.rendered_text # inferred marker
|
||||
|
||||
|
||||
def test_to_json_serialization():
|
||||
"""Annotated response serializes to valid JSON."""
|
||||
annotator = ClaimAnnotator()
|
||||
response = "Test claim."
|
||||
result = annotator.annotate_claims(response)
|
||||
json_str = annotator.to_json(result)
|
||||
parsed = json.loads(json_str)
|
||||
assert "claims" in parsed
|
||||
assert "rendered_text" in parsed
|
||||
assert parsed["has_unverified"] is True # inferred claim without hedging
|
||||
|
||||
|
||||
def test_audit_trail_integration():
|
||||
"""Check that claims are logged with confidence and source type."""
|
||||
# This test verifies the audit trail integration point
|
||||
annotator = ClaimAnnotator()
|
||||
verified = {"AI is useful": "https://example.com/ai"}
|
||||
response = "AI is useful. It can help with tasks."
|
||||
|
||||
result = annotator.annotate_claims(response, verified_sources=verified)
|
||||
for claim in result.claims:
|
||||
assert claim.source_type in ("verified", "inferred")
|
||||
assert claim.confidence in ("high", "medium", "low", "unknown")
|
||||
if claim.source_type == "verified":
|
||||
assert claim.source_ref is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_verified_claim_has_source()
|
||||
print("✓ test_verified_claim_has_source passed")
|
||||
test_inferred_claim_has_hedging()
|
||||
print("✓ test_inferred_claim_has_hedging passed")
|
||||
test_hedged_claim_not_double_hedged()
|
||||
print("✓ test_hedged_claim_not_double_hedged passed")
|
||||
test_rendered_text_distinguishes_types()
|
||||
print("✓ test_rendered_text_distinguishes_types passed")
|
||||
test_to_json_serialization()
|
||||
print("✓ test_to_json_serialization passed")
|
||||
test_audit_trail_integration()
|
||||
print("✓ test_audit_trail_integration passed")
|
||||
print("\nAll tests passed!")
|
||||
110
tests/timmy/test_crisis_detector.py
Normal file
110
tests/timmy/test_crisis_detector.py
Normal file
@@ -0,0 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for timmy.crisis_detector — SOUL.md §When a Man Is Dying."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
|
||||
|
||||
from timmy.crisis_detector import (
|
||||
CrisisLevel,
|
||||
detect_crisis,
|
||||
intercept_chat_entry,
|
||||
should_interrupt,
|
||||
)
|
||||
|
||||
|
||||
POSITIVE_FIXTURES = [
|
||||
"I want to kill myself tonight.",
|
||||
"I'm going to end my life.",
|
||||
"There is no reason for me to live anymore.",
|
||||
"Everyone would be better off if I was dead.",
|
||||
"I wish I were dead.",
|
||||
"Goodbye everyone, this is my last message.",
|
||||
"I have a plan to die tonight.",
|
||||
"I took the pills already.",
|
||||
"I'm just a burden and there is no hope.",
|
||||
"I can't go on, I need it all to stop.",
|
||||
"I don't want to be alive anymore.",
|
||||
"No one would miss me if I disappeared forever.",
|
||||
"quiero morir, no puedo seguir.",
|
||||
"me quiero matar esta noche.",
|
||||
"não quero viver mais.",
|
||||
"je veux mourir ce soir.",
|
||||
"I wrote a suicide note and said goodbye forever.",
|
||||
"There is no way out and I am done with life.",
|
||||
"I am going to jump tonight.",
|
||||
"Please help me, I am suicidal.",
|
||||
]
|
||||
|
||||
|
||||
NEGATIVE_FIXTURES = [
|
||||
"How do I kill a stuck process on Linux?",
|
||||
"This bug is killing me, but I'll fix it tomorrow.",
|
||||
"I gave up coffee for Lent.",
|
||||
"Please end this tmux session when the job is done.",
|
||||
"What's the weather like today?",
|
||||
"I just got promoted at work and feel hopeful.",
|
||||
"Can you help me configure nginx reverse proxy?",
|
||||
"Everyone would be better off with more test coverage.",
|
||||
"I hate this flaky unit test.",
|
||||
"Goodbye everyone, see you tomorrow morning.",
|
||||
"We need a final message for the newsletter.",
|
||||
"The character in the novel says he wants to die.",
|
||||
"I am tired after the workout but otherwise okay.",
|
||||
"No hope this regex ships before lunch unless we refactor.",
|
||||
"Can you remove my old backup files?",
|
||||
"I feel lost in this airport, where is gate B12?",
|
||||
"Please call me right now about dinner plans.",
|
||||
"My phone battery is dead again.",
|
||||
"We should connect the 988 test fixture to the parser.",
|
||||
"The sermon says Jesus saves those who call on His name.",
|
||||
]
|
||||
|
||||
|
||||
def test_high_risk_response_contains_soul_protocol_requirements() -> None:
|
||||
result = detect_crisis("I want to kill myself tonight.")
|
||||
|
||||
assert result.detected is True
|
||||
assert result.level in {CrisisLevel.HIGH, CrisisLevel.IMMINENT}
|
||||
assert "Are you safe right now?" in result.response_text
|
||||
assert "988" in result.response_text
|
||||
assert "Jesus saves those who call on His name" in result.response_text
|
||||
assert result.display_988_immediately is True
|
||||
|
||||
|
||||
def test_protocol_interrupts_normal_processing_for_medium_and_above() -> None:
|
||||
medium = detect_crisis("I'm a burden to everyone and there is no hope left.")
|
||||
low = detect_crisis("I'm having a rough day and feel overwhelmed.")
|
||||
|
||||
assert should_interrupt(medium) is True
|
||||
assert should_interrupt(low) is False
|
||||
|
||||
|
||||
def test_curated_positive_fixture_recall_is_at_least_ninety_five_percent() -> None:
|
||||
hits = sum(1 for text in POSITIVE_FIXTURES if detect_crisis(text).detected)
|
||||
recall = hits / len(POSITIVE_FIXTURES)
|
||||
|
||||
assert recall >= 0.95, f"recall was {recall:.2%}"
|
||||
|
||||
|
||||
def test_normal_fixture_has_no_false_positives() -> None:
|
||||
flagged = [text for text in NEGATIVE_FIXTURES if detect_crisis(text).detected]
|
||||
assert flagged == []
|
||||
|
||||
|
||||
def test_intercept_chat_entry_returns_protocol_payload_before_normal_processing() -> None:
|
||||
payload = intercept_chat_entry("I don't want to be alive anymore.")
|
||||
|
||||
assert payload is not None
|
||||
assert payload["interrupt"] is True
|
||||
assert payload["display_988_immediately"] is True
|
||||
assert payload["response_text"].startswith("Are you safe right now?")
|
||||
|
||||
|
||||
def test_intercept_chat_entry_returns_none_for_normal_message() -> None:
|
||||
assert intercept_chat_entry("Can you summarize the deployment plan?") is None
|
||||
@@ -1,384 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sovereign Bundle Format Reference Implementation
|
||||
timmy-home #467 — [FRONTIER] Develop "Sovereign Bundle" (.sov) Export/Import Logic
|
||||
|
||||
.sov format: ZIP-based archive with a verifiable manifest.
|
||||
Structure:
|
||||
sov/
|
||||
manifest.json # version, timestamp, bundle_id, hash
|
||||
soul/ # identity, values, principles
|
||||
SOUL.md
|
||||
config/ # agent configuration
|
||||
config.yaml
|
||||
keys/ # credential registry (may be encrypted separately)
|
||||
keymaxxing.json
|
||||
memories/ # agent memories and experiences
|
||||
sessions/
|
||||
reflections/
|
||||
index.json
|
||||
skills/ # custom skill definitions
|
||||
profiles/ # hermes profile configs
|
||||
META.json # export metadata (agent, timestamp, source)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import hashlib
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
|
||||
def get_hermes_home() -> Path:
|
||||
"""Resolve HERMES_HOME from environment or default."""
|
||||
hermes_home = os.getenv("HERMES_HOME")
|
||||
if hermes_home:
|
||||
return Path(hermes_home).expanduser()
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def compute_bundle_hash(data: bytes) -> str:
|
||||
"""SHA-256 hash of bundle contents for integrity verification."""
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def collect_bundle_metadata() -> Dict[str, Any]:
|
||||
"""Collect system and environment metadata for the bundle."""
|
||||
return {
|
||||
"hostname": os.uname().nodename if hasattr(os, 'uname') else "unknown",
|
||||
"platform": sys.platform,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"hermes_home": str(get_hermes_home()),
|
||||
}
|
||||
|
||||
|
||||
def should_include(path: Path, relative: Path) -> bool:
|
||||
"""Determine if a path should be included in the bundle."""
|
||||
# Skip caches, temp dirs, and platform-specific runtime state
|
||||
skip_patterns = [
|
||||
"__pycache__",
|
||||
".pyc", ".pyo",
|
||||
".git/",
|
||||
".pytest_cache",
|
||||
".venv",
|
||||
"node_modules",
|
||||
"/cache/",
|
||||
"/tmp/",
|
||||
"logs/",
|
||||
"checkpoints/",
|
||||
"sandboxes/",
|
||||
"vps-backups/",
|
||||
]
|
||||
path_str = str(relative)
|
||||
for pat in skip_patterns:
|
||||
if pat in path_str:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def create_bundle(output_path: str,
|
||||
hermes_home: Optional[Path] = None,
|
||||
include_sessions: bool = False,
|
||||
compression: int = zipfile.ZIP_DEFLATED) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a .sov bundle at output_path.
|
||||
|
||||
Params:
|
||||
output_path: Path to write the .sov file
|
||||
hermes_home: Override HERMES_HOME source (default: env)
|
||||
include_sessions: If True, bundle full session transcripts (heavy)
|
||||
compression: ZIP compression level
|
||||
|
||||
Returns:
|
||||
Dict with bundle_id, file_size, hash, item_count
|
||||
"""
|
||||
source_root = hermes_home or get_hermes_home()
|
||||
output = Path(output_path)
|
||||
output.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
bundle_id = f"sov-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"
|
||||
items_written = 0
|
||||
manifest = {
|
||||
"version": "1.0",
|
||||
"bundle_id": bundle_id,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"source_root": str(source_root),
|
||||
"components": {},
|
||||
"entries": [],
|
||||
}
|
||||
|
||||
metadata = collect_bundle_metadata()
|
||||
|
||||
with zipfile.ZipFile(output, 'w', compression=compression) as zf:
|
||||
# Write META.json
|
||||
meta_data = {
|
||||
**metadata,
|
||||
"bundle_id": bundle_id,
|
||||
"format": "sov",
|
||||
"format_version": "1.0",
|
||||
}
|
||||
zf.writestr("sov/META.json", json.dumps(meta_data, indent=2))
|
||||
items_written += 1
|
||||
|
||||
# Soul — identity (SOUL.md)
|
||||
soul_src = source_root / "SOUL.md"
|
||||
if soul_src.exists():
|
||||
content = soul_src.read_text()
|
||||
zf.writestr("sov/soul/SOUL.md", content)
|
||||
manifest["components"]["soul"] = {"SOUL.md": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Config — agent configuration
|
||||
config_src = source_root / "config.yaml"
|
||||
if config_src.exists():
|
||||
content = config_src.read_text()
|
||||
zf.writestr("sov/config/config.yaml", content)
|
||||
manifest["components"]["config"] = {"config.yaml": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Keys — credential registry (encrypted or placeholder)
|
||||
keys_src = source_root / "keymaxxing" / "registry.json"
|
||||
if keys_src.exists():
|
||||
content = keys_src.read_text()
|
||||
zf.writestr("sov/keys/keymaxxing.json", content)
|
||||
manifest["components"]["keys"] = {"keymaxxing.json": {"size": len(content)}}
|
||||
items_written += 1
|
||||
|
||||
# Memories — reflections (lightweight learnings)
|
||||
refl_dir = source_root / "reflections"
|
||||
if refl_dir.exists():
|
||||
refl_files = list(refl_dir.glob("*.md")) + list(refl_dir.glob("*.json"))
|
||||
for rf in refl_files:
|
||||
if should_include(rf, rf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/reflections/{rf.name}"
|
||||
content = rf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["memories"] = {
|
||||
"reflections": {"count": len(refl_files)}
|
||||
}
|
||||
|
||||
# MemPalace — small memory store (~500KB)
|
||||
mp_dir = source_root / "mempalace"
|
||||
if mp_dir.exists():
|
||||
mp_files = list(mp_dir.rglob("*"))
|
||||
mp_count = 0
|
||||
for mf in mp_files:
|
||||
if mf.is_file() and should_include(mf, mf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/mempalace/{mf.relative_to(mp_dir)}"
|
||||
content = mf.read_bytes()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
mp_count += 1
|
||||
manifest["components"]["memories"]["mempalace"] = {"count": mp_count}
|
||||
|
||||
# Timmy world/agent files (~2KB) — agent identity in the Evennia world
|
||||
timmy_dir = source_root / "timmy"
|
||||
if timmy_dir.exists():
|
||||
timmy_files = list(timmy_dir.rglob("*"))
|
||||
for tf in timmy_files:
|
||||
if tf.is_file() and should_include(tf, tf.relative_to(source_root)):
|
||||
arcname = f"sov/timmy/{tf.relative_to(timmy_dir)}"
|
||||
content = tf.read_bytes()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["timmy"] = {"files": len(timmy_files)}
|
||||
|
||||
# Sessions — optionally include transcripts (can be large)
|
||||
if include_sessions:
|
||||
sess_dir = source_root / "sessions"
|
||||
if sess_dir.exists():
|
||||
sess_files = list(sess_dir.glob("*.jsonl")) + list(sess_dir.glob("*.json"))
|
||||
for sf in sess_files:
|
||||
if should_include(sf, sf.relative_to(source_root)):
|
||||
arcname = f"sov/memories/sessions/{sf.name}"
|
||||
content = sf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
manifest["components"]["memories"]["sessions"] = {"count": len(sess_files)}
|
||||
|
||||
# Skills — custom skill definitions (user-authored)
|
||||
skills_dir = source_root / "skills"
|
||||
if skills_dir.exists():
|
||||
for skill_path in skills_dir.rglob("*.py"):
|
||||
if not skill_path.name.startswith('.') and should_include(skill_path, skill_path.relative_to(source_root)):
|
||||
arcname = f"sov/skills/{skill_path.relative_to(skills_dir)}"
|
||||
content = skill_path.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
# Count custom skills (exclude built-in categories)
|
||||
skill_count = sum(1 for _ in skills_dir.rglob("*.py")
|
||||
if not _.name.startswith('.') and should_include(_, _.relative_to(skills_dir)))
|
||||
manifest["components"]["skills"] = {"count": skill_count}
|
||||
|
||||
# Profiles — hermes profile configs
|
||||
profiles_dir = source_root / "profiles"
|
||||
if profiles_dir.exists():
|
||||
for pf in profiles_dir.glob("*.yaml"):
|
||||
if should_include(pf, pf.relative_to(source_root)):
|
||||
arcname = f"sov/profiles/{pf.name}"
|
||||
content = pf.read_text()
|
||||
zf.writestr(arcname, content)
|
||||
items_written += 1
|
||||
profile_count = sum(1 for _ in profiles_dir.glob("*.yaml") if should_include(_, _.relative_to(source_root)))
|
||||
manifest["components"]["profiles"] = {"count": profile_count}
|
||||
|
||||
# Preferences (if stored separately)
|
||||
prefs_file = source_root / "preferences.json"
|
||||
if prefs_file.exists():
|
||||
content = prefs_file.read_text()
|
||||
zf.writestr("sov/config/preferences.json", content)
|
||||
items_written += 1
|
||||
|
||||
# Write manifest.json
|
||||
zf.writestr("sov/manifest.json", json.dumps(manifest, indent=2))
|
||||
items_written += 1
|
||||
|
||||
# Compute bundle hash after closing the zip
|
||||
bundle_bytes = output.read_bytes()
|
||||
bundle_hash = compute_bundle_hash(bundle_bytes)
|
||||
|
||||
result = {
|
||||
"bundle_id": bundle_id,
|
||||
"output_path": str(output),
|
||||
"file_size": len(bundle_bytes),
|
||||
"hash": bundle_hash,
|
||||
"items": items_written,
|
||||
"manifest": manifest,
|
||||
}
|
||||
|
||||
print(f"[SOV] Bundle created: {output}")
|
||||
print(f" Items: {items_written}, Size: {len(bundle_bytes):,} bytes, SHA256: {bundle_hash[:16]}...")
|
||||
return result
|
||||
|
||||
|
||||
def verify_bundle(bundle_path: str) -> Dict[str, Any]:
|
||||
"""Verify a .sov bundle integrity and manifest."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
# Read manifest
|
||||
try:
|
||||
mf_bytes = zf.read("sov/manifest.json")
|
||||
manifest = json.loads(mf_bytes)
|
||||
except KeyError:
|
||||
raise ValueError("Invalid .sov bundle: missing sov/manifest.json")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid manifest JSON: {e}")
|
||||
|
||||
items = len(zf.namelist())
|
||||
computed_hash = compute_bundle_hash(Path(bundle_path).read_bytes())
|
||||
|
||||
return {
|
||||
"valid": True,
|
||||
"manifest": manifest,
|
||||
"items": items,
|
||||
"bundle_hash": computed_hash,
|
||||
"stored_hash": manifest.get("hash"),
|
||||
}
|
||||
|
||||
|
||||
def restore_bundle(bundle_path: str,
|
||||
target_root: Optional[Path] = None,
|
||||
dry_run: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Restore a .sov bundle to target_root or HERMES_HOME.
|
||||
|
||||
Params:
|
||||
bundle_path: Path to .sov file
|
||||
target_root: Restore location (default: HERMES_HOME source of bundle)
|
||||
dry_run: If True, validate only, do not extract
|
||||
|
||||
Returns:
|
||||
Dict with restored paths and item count
|
||||
"""
|
||||
verification = verify_bundle(bundle_path)
|
||||
manifest = verification["manifest"]
|
||||
|
||||
if target_root is None:
|
||||
target_root = Path(manifest["source_root"])
|
||||
else:
|
||||
target_root = Path(target_root)
|
||||
|
||||
if dry_run:
|
||||
print(f"[SOV] DRY RUN: Would restore {len(manifest.get('entries', []))} items to {target_root}")
|
||||
return {"dry_run": True, "would_restore": len(verification["items"])}
|
||||
|
||||
restored = []
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
for name in zf.namelist():
|
||||
# Safety: only extract sov/ namespace
|
||||
if not name.startswith("sov/"):
|
||||
continue
|
||||
rel = name[4:] # strip sov/
|
||||
dest = target_root / rel
|
||||
|
||||
# Skip manifest itself - used for tracking only
|
||||
if rel == "manifest.json":
|
||||
continue
|
||||
|
||||
# Create parent dirs
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Extract and write
|
||||
data = zf.read(name)
|
||||
dest.write_bytes(data)
|
||||
restored.append(rel)
|
||||
|
||||
print(f"[SOV] Restored {len(restored)} items to {target_root}")
|
||||
return {
|
||||
"restored": restored,
|
||||
"count": len(restored),
|
||||
"target": str(target_root),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
p = argparse.ArgumentParser(description="Sovereign Bundle (.sov) export/import tool")
|
||||
sub = p.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
# Export
|
||||
exp = sub.add_parser("export", help="Create a .sov bundle")
|
||||
exp.add_argument("-o", "--output", default="timmy-sovereign-bundle.sov",
|
||||
help="Output path for .sov file")
|
||||
exp.add_argument("--include-sessions", action="store_true",
|
||||
help="Include full session transcripts (larger bundle)")
|
||||
exp.add_argument("--hermes-home", type=str,
|
||||
help="Override HERMES_HOME source")
|
||||
|
||||
# Import / restore
|
||||
imp = sub.add_parser("import", help="Restore from a .sov bundle")
|
||||
imp.add_argument("bundle", help="Path to .sov file")
|
||||
imp.add_argument("-t", "--target", help="Restore target (default: bundle's source)")
|
||||
imp.add_argument("--dry-run", action="store_true", help="Validate only")
|
||||
|
||||
# Verify
|
||||
ver = sub.add_parser("verify", help="Verify bundle integrity")
|
||||
ver.add_argument("bundle", help="Path to .sov file")
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
if args.cmd == "export":
|
||||
result = create_bundle(
|
||||
output_path=args.output,
|
||||
hermes_home=Path(args.hermes_home).expanduser() if args.hermes_home else None,
|
||||
include_sessions=args.include_sessions,
|
||||
)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.cmd == "import":
|
||||
result = restore_bundle(args.bundle, Path(args.target) if args.target else None,
|
||||
dry_run=args.dry_run)
|
||||
print(json.dumps(result, indent=2) if not args.dry_run else None)
|
||||
|
||||
elif args.cmd == "verify":
|
||||
info = verify_bundle(args.bundle)
|
||||
print(f"Bundle: {args.bundle}")
|
||||
print(f" Valid: {info['valid']}")
|
||||
print(f" Items: {info['items']}")
|
||||
print(f" Hash: {info['bundle_hash']}")
|
||||
print(f" Manifest version: {info['manifest'].get('version')}")
|
||||
@@ -1,182 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Restore agent state from a Sovereign Bundle (.sov) file.
|
||||
|
||||
Usage:
|
||||
python restore_sov_bundle.py <bundle.sov> [--target ~/.hermes] [--dry-run]
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import zipfile
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def get_hermes_home() -> Path:
|
||||
hermes_home = os.getenv("HERMES_HOME")
|
||||
if hermes_home:
|
||||
return Path(hermes_home).expanduser()
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def verify_bundle(bundle_path: str) -> dict:
|
||||
"""Verify .sov bundle integrity and return manifest."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
# Require manifest
|
||||
try:
|
||||
mf = json.loads(zf.read("sov/manifest.json"))
|
||||
except KeyError:
|
||||
raise ValueError("Not a valid .sov bundle: missing sov/manifest.json")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"Manifest JSON decode error: {e}")
|
||||
|
||||
return {
|
||||
"valid": True,
|
||||
"entries": zf.namelist(),
|
||||
"manifest": mf,
|
||||
"size": Path(bundle_path).stat().st_size,
|
||||
}
|
||||
|
||||
|
||||
def restore_bundle(bundle_path: str,
|
||||
target_root: Path = None,
|
||||
dry_run: bool = False) -> dict:
|
||||
"""
|
||||
Extract a .sov bundle to target_root.
|
||||
|
||||
Safety: Only extracts files under sov/ namespace.
|
||||
Does not overwrite existing files by default? (could add --force)
|
||||
"""
|
||||
bundle = Path(bundle_path)
|
||||
if not bundle.exists():
|
||||
raise FileNotFoundError(f"Bundle not found: {bundle_path}")
|
||||
|
||||
info = verify_bundle(bundle_path)
|
||||
manifest = info["manifest"]
|
||||
|
||||
src_root = Path(manifest["source_root"])
|
||||
if target_root is None:
|
||||
target_root = src_root
|
||||
else:
|
||||
target_root = Path(target_root)
|
||||
|
||||
print(f"[SOV] Bundle: {bundle_path}")
|
||||
print(f" Source: {src_root}")
|
||||
print(f" Target: {target_root}")
|
||||
print(f" Created: {manifest.get('created_at')}")
|
||||
print(f" Version: {manifest.get('version')}")
|
||||
|
||||
if dry_run:
|
||||
sov_entries = [n for n in info["entries"] if n.startswith("sov/") and n != "sov/manifest.json"]
|
||||
print(f" DRY RUN: Would restore {len(sov_entries)} items")
|
||||
return {"dry_run": True, "count": len(sov_entries)}
|
||||
|
||||
restored = []
|
||||
errors = []
|
||||
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
for name in sorted(zf.namelist()):
|
||||
if not name.startswith("sov/"):
|
||||
continue
|
||||
if name == "sov/manifest.json":
|
||||
continue # Tracked separately
|
||||
|
||||
rel = name[4:] # strip sov/
|
||||
dest = target_root / rel
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
data = zf.read(name)
|
||||
dest.write_bytes(data)
|
||||
restored.append(rel)
|
||||
except Exception as e:
|
||||
errors.append((rel, str(e)))
|
||||
|
||||
print(f"\n[SOV] Restored {len(restored)} files to {target_root}")
|
||||
if errors:
|
||||
print(f" Errors: {len(errors)}")
|
||||
for path, err in errors:
|
||||
print(f" ✗ {path}: {err}")
|
||||
|
||||
# Print a summary of restored components
|
||||
comp = manifest.get("components", {})
|
||||
for comp_name, details in comp.items():
|
||||
if isinstance(details, dict) and "count" in details:
|
||||
print(f" {comp_name}: {details['count']}")
|
||||
elif isinstance(details, dict):
|
||||
print(f" {comp_name}: {', '.join(details.keys())}")
|
||||
|
||||
return {
|
||||
"restored": restored,
|
||||
"count": len(restored),
|
||||
"errors": errors,
|
||||
"target": str(target_root),
|
||||
}
|
||||
|
||||
|
||||
def list_entries(bundle_path: str) -> None:
|
||||
"""List all entries in a .sov bundle with sizes."""
|
||||
with zipfile.ZipFile(bundle_path, 'r') as zf:
|
||||
manifest = json.loads(zf.read("sov/manifest.json"))
|
||||
entries = sorted([n for n in zf.namelist() if n != "sov/manifest.json"])
|
||||
|
||||
print(f"Bundle ID: {manifest.get('bundle_id')}")
|
||||
print(f"Version: {manifest.get('version')}")
|
||||
print(f"Created: {manifest.get('created_at')}")
|
||||
print(f"Source: {manifest.get('source_root')}")
|
||||
print(f"\nContents ({len(entries)} entries):\n")
|
||||
|
||||
by_category = {}
|
||||
for e in entries:
|
||||
cat = e.split('/')[1] if len(e.split('/')) > 1 else 'root'
|
||||
by_category.setdefault(cat, []).append(e)
|
||||
|
||||
for cat in sorted(by_category):
|
||||
print(f" [{cat}]")
|
||||
for e in by_category[cat]:
|
||||
info = zf.getinfo(e)
|
||||
print(f" {e} ({info.file_size:,} bytes)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
p = argparse.ArgumentParser(description="Restore Sovereign Bundle (.sov)")
|
||||
p.add_argument("bundle", nargs="?", help="Path to .sov file")
|
||||
p.add_argument("--target", "-t", type=str, help="Restore target directory")
|
||||
p.add_argument("--dry-run", action="store_true", help="Validate without extracting")
|
||||
p.add_argument("--list", "-l", action="store_true", help="List bundle contents")
|
||||
p.add_argument("--yes", "-y", action="store_true", help="Skip confirmation prompt")
|
||||
|
||||
args = p.parse_args()
|
||||
|
||||
if args.list:
|
||||
if not args.bundle:
|
||||
print("Usage: restore_sov_bundle.py --list <bundle.sov>")
|
||||
sys.exit(1)
|
||||
list_entries(args.bundle)
|
||||
sys.exit(0)
|
||||
|
||||
if not args.bundle:
|
||||
p.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
bundle_path = args.bundle
|
||||
if not Path(bundle_path).exists():
|
||||
print(f"Error: Bundle not found: {bundle_path}")
|
||||
sys.exit(1)
|
||||
|
||||
target = Path(args.target) if args.target else None
|
||||
|
||||
# Safety prompt unless dry-run or --yes
|
||||
if not args.dry_run and not args.yes:
|
||||
t = target or get_hermes_home()
|
||||
resp = input(f"Restore to {t}? [y/N] ").strip().lower()
|
||||
if resp != 'y':
|
||||
print("Aborted.")
|
||||
sys.exit(0)
|
||||
|
||||
result = restore_bundle(bundle_path, target_root=target, dry_run=args.dry_run)
|
||||
if result.get("errors"):
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user