Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0e103dc8b7 feat: add the-door codebase genome (#673)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 8s
2026-04-15 00:09:42 -04:00
23 changed files with 457 additions and 2092 deletions

View File

@@ -1,61 +0,0 @@
# Know Thy Father — Multimodal Media Consumption Pipeline
Refs #582
This document makes the epic operational by naming the current source-of-truth scripts, their handoff artifacts, and the one-command runner that coordinates them.
## Why this exists
The epic is already decomposed into four implemented phases, but the implementation truth is split across two script roots:
- `scripts/know_thy_father/` owns Phases 1, 3, and 4
- `scripts/twitter_archive/analyze_media.py` owns Phase 2
- `twitter-archive/know-thy-father/tracker.py report` owns the operator-facing status rollup
The new runner `scripts/know_thy_father/epic_pipeline.py` does not replace those scripts. It stitches them together into one explicit, reviewable plan.
## Phase map
| Phase | Script | Primary output |
|-------|--------|----------------|
| 1. Media Indexing | `scripts/know_thy_father/index_media.py` | `twitter-archive/know-thy-father/media_manifest.jsonl` |
| 2. Multimodal Analysis | `scripts/twitter_archive/analyze_media.py --batch 10` | `twitter-archive/know-thy-father/analysis.jsonl` + `meaning-kernels.jsonl` + `pipeline-status.json` |
| 3. Holographic Synthesis | `scripts/know_thy_father/synthesize_kernels.py` | `twitter-archive/knowledge/fathers_ledger.jsonl` |
| 4. Cross-Reference Audit | `scripts/know_thy_father/crossref_audit.py` | `twitter-archive/notes/crossref_report.md` |
| 5. Processing Log | `twitter-archive/know-thy-father/tracker.py report` | `twitter-archive/know-thy-father/REPORT.md` |
## One command per phase
```bash
python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl
python3 scripts/twitter_archive/analyze_media.py --batch 10
python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json
python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md
python3 twitter-archive/know-thy-father/tracker.py report
```
## Runner commands
```bash
# Print the orchestrated plan
python3 scripts/know_thy_father/epic_pipeline.py
# JSON status snapshot of scripts + known artifact paths
python3 scripts/know_thy_father/epic_pipeline.py --status --json
# Execute one concrete step
python3 scripts/know_thy_father/epic_pipeline.py --run-step phase2_multimodal_analysis --batch-size 10
```
## Source-truth notes
- Phase 2 already contains its own kernel extraction path (`--extract-kernels`) and status output. The epic runner does not reimplement that logic.
- Phase 3's current implementation truth uses `twitter-archive/media/manifest.jsonl` as its default input. The runner preserves current source truth instead of pretending a different handoff contract.
- The processing log in `twitter-archive/know-thy-father/PROCESSING_LOG.md` can drift from current code reality. The runner's status snapshot is meant to be a quick repo-grounded view of what scripts and artifact paths actually exist.
## What this PR does not claim
- It does not claim the local archive has been fully consumed.
- It does not claim the halted processing log has been resumed.
- It does not claim fact_store ingestion has been fully wired end-to-end.
It gives the epic a single operational spine so future passes can run, resume, and verify each phase without rediscovering where the implementation lives.

View File

@@ -1,92 +0,0 @@
# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
pip install mempalace==3.0.0
mempalace init ~/.hermes/ --yes
cat > ~/.hermes/mempalace.yaml <<'YAML'
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
YAML
echo "" | mempalace mine ~/.hermes/
echo "" | mempalace mine ~/.hermes/sessions/ --mode convos
mempalace search "your common queries"
mempalace wake-up
hermes mcp add mempalace -- python -m mempalace.mcp_server
```
## Manual config template
```yaml
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
```
## Why this shape
- `wing: ezra_home` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
- `mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.
- The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.
- Pipe empty stdin into mining commands (`echo "" | ...`) to avoid the entity-detector stdin hang on larger directories.
- First mine downloads the ChromaDB embedding model cache (~79MB).
- Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.

View File

@@ -1,87 +0,0 @@
# Predictive Resource Allocation
Forecasts near-term fleet demand from historical telemetry so the operator can
pre-provision resources before a surge hits.
## How It Works
The predictor reads two data sources:
1. **Metric logs** (`metrics/local_*.jsonl`) — request cadence, token volume,
caller mix, success/failure rates
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
local inference health
It compares a **recent window** (last N hours) against a **baseline window**
(previous N hours) to detect surges and degradation.
## Output Contract
```json
{
"resource_mode": "steady|surge",
"dispatch_posture": "normal|degraded",
"horizon_hours": 6,
"recent_request_rate": 12.5,
"baseline_request_rate": 8.0,
"predicted_request_rate": 15.0,
"surge_factor": 1.56,
"demand_level": "elevated|normal|low|critical",
"gitea_outages": 0,
"inference_failures": 2,
"top_callers": [...],
"recommended_actions": ["..."]
}
```
### Demand Levels
| Surge Factor | Level | Meaning |
|-------------|-------|---------|
| > 3.0 | critical | Extreme surge, immediate action needed |
| > 1.5 | elevated | Notable increase, pre-warm recommended |
| > 1.0 | normal | Slight increase, monitor |
| <= 1.0 | low | Flat or declining |
### Posture Signals
| Signal | Effect |
|--------|--------|
| Surge factor > 1.5 | `resource_mode: surge` + pre-warm recommendation |
| Gitea outages >= 1 | `dispatch_posture: degraded` + cache recommendation |
| Inference failures >= 2 | `resource_mode: surge` + reliability investigation |
| Heavy batch callers | Throttle recommendation |
| High caller failure rates | Investigation recommendation |
## Usage
```bash
# Markdown report
python3 scripts/predictive_resource_allocator.py
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Custom paths and horizon
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/local_20260329.jsonl \
--heartbeat heartbeat/ticks_20260329.jsonl \
--horizon 12
```
## Tests
```bash
python3 -m pytest tests/test_predictive_resource_allocator.py -v
```
## Recommended Actions
The predictor generates contextual recommendations:
- **Pre-warm local inference** — surge detected, warm up before next window
- **Throttle background jobs** — heavy batch work consuming capacity
- **Investigate failure rates** — specific callers failing at high rates
- **Investigate model reliability** — inference health degraded
- **Cache forge state** — Gitea availability issues
- **Maintain current allocation** — no issues detected

View File

@@ -1,62 +0,0 @@
fleet_name: timmy-laptop-fleet
machines:
- hostname: timmy-anchor-a
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: macOS
adapter_condition: good
idle_watts: 11
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-anchor-b
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Linux
adapter_condition: good
idle_watts: 13
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-daylight-a
machine_type: laptop
ram_gb: 32
cpu_cores: 10
os: macOS
adapter_condition: ok
idle_watts: 22
always_on_capable: true
notes: higher-performance daylight compute
- hostname: timmy-daylight-b
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: Linux
adapter_condition: ok
idle_watts: 19
always_on_capable: true
notes: daylight compute node
- hostname: timmy-daylight-c
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Windows
adapter_condition: needs_replacement
idle_watts: 17
always_on_capable: false
notes: repair power adapter before production duty
- hostname: timmy-desktop-nas
machine_type: desktop
ram_gb: 64
cpu_cores: 12
os: Linux
adapter_condition: good
idle_watts: 58
always_on_capable: false
has_4tb_ssd: true
notes: desktop plus 4TB SSD NAS and heavy compute during peak sun

View File

@@ -1,30 +0,0 @@
# Laptop Fleet Deployment Plan
Fleet: timmy-laptop-fleet
Machine count: 6
24/7 anchor agents: timmy-anchor-a, timmy-anchor-b
Desktop/NAS: timmy-desktop-nas
Daylight schedule: 10:00-16:00
## Role mapping
| Hostname | Role | Schedule | Duty cycle |
|---|---|---|---|
| timmy-anchor-a | anchor_agent | 24/7 | continuous |
| timmy-anchor-b | anchor_agent | 24/7 | continuous |
| timmy-daylight-a | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-b | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-c | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-desktop-nas | desktop_nas | 10:00-16:00 | daylight_only |
## Machine inventory
| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |
|---|---|---:|---:|---|---|---:|---|
| timmy-anchor-a | laptop | 16 | 8 | macOS | good | 11 | candidate 24/7 anchor agent |
| timmy-anchor-b | laptop | 8 | 4 | Linux | good | 13 | candidate 24/7 anchor agent |
| timmy-daylight-a | laptop | 32 | 10 | macOS | ok | 22 | higher-performance daylight compute |
| timmy-daylight-b | laptop | 16 | 8 | Linux | ok | 19 | daylight compute node |
| timmy-daylight-c | laptop | 8 | 4 | Windows | needs_replacement | 17 | repair power adapter before production duty |
| timmy-desktop-nas | desktop | 64 | 12 | Linux | good | 58 | desktop plus 4TB SSD NAS and heavy compute during peak sun |

View File

@@ -1,37 +0,0 @@
# NH Broadband Install Packet
**Packet ID:** nh-bb-20260415-113232
**Generated:** 2026-04-15T11:32:32.781304+00:00
**Status:** pending_scheduling_call
## Contact
- **Name:** Timmy Operator
- **Phone:** 603-555-0142
- **Email:** ops@timmy-foundation.example
## Service Address
- 123 Example Lane
- Concord, NH 03301
## Desired Plan
residential-fiber
## Call Log
- **2026-04-15T14:30:00Z** — no_answer
- Called 1-800-NHBB-INFO, ring-out after 45s
## Appointment Checklist
- [ ] Confirm exact-address availability via NH Broadband online lookup
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
- [ ] Select appointment window (morning/afternoon)
- [ ] Confirm payment method (credit card / ACH)
- [ ] Receive appointment confirmation number
- [ ] Prepare site: clear path to ONT install location
- [ ] Post-install: run speed test (fast.com / speedtest.net)
- [ ] Log final speeds and appointment outcome

View File

@@ -1,27 +0,0 @@
contact:
name: Timmy Operator
phone: "603-555-0142"
email: ops@timmy-foundation.example
service:
address: "123 Example Lane"
city: Concord
state: NH
zip: "03301"
desired_plan: residential-fiber
call_log:
- timestamp: "2026-04-15T14:30:00Z"
outcome: no_answer
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
checklist:
- "Confirm exact-address availability via NH Broadband online lookup"
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
- "Select appointment window (morning/afternoon)"
- "Confirm payment method (credit card / ACH)"
- "Receive appointment confirmation number"
- "Prepare site: clear path to ONT install location"
- "Post-install: run speed test (fast.com / speedtest.net)"
- "Log final speeds and appointment outcome"

View File

@@ -1,35 +0,0 @@
# NH Broadband — Public Research Memo
**Date:** 2026-04-15
**Status:** Draft — separates verified facts from unverified live work
**Refs:** #533, #740
---
## Verified (official public sources)
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
- Residential fiber plans are offered; speed tiers vary by location.
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
## Unverified / Requires Live Work
| Item | Status | Notes |
|---|---|---|
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
| Post-install speed test results | ❌ pending | Must run after physical install completes |
## Next Steps (Refs #740)
1. Run address availability lookup on `nhbroadband.com`
2. Call 1-800-NHBB-INFO to schedule install
3. Confirm payment method
4. Receive appointment confirmation number
5. Prepare site (clear ONT install path)
6. Post-install: speed test and log results

View File

@@ -1,102 +0,0 @@
# Long Context vs RAG Decision Framework
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
**Date**: 2026-04-15
**Status**: RESEARCHED
## Executive Summary
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
## The Core Insight
**Long context ≠ better answers.** Research shows:
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
- Cost scales quadratically with context length (attention computation)
- Latency increases linearly with input length
**RAG ≠ always better.** Retrieval introduces:
- Recall errors (miss relevant chunks)
- Precision errors (retrieve irrelevant chunks)
- Chunking artifacts (splitting mid-sentence)
- Additional latency for embedding + search
## Decision Matrix
| Scenario | Context Size | Recommendation | Why |
|----------|-------------|---------------|-----|
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
## Our Stack Constraints
### What We Have
- **Cloud models**: 128K-200K context (OpenRouter providers)
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
- **Hermes fact_store**: SQLite FTS5 full-text search
- **Memory**: MemPalace holographic embeddings
- **Session context**: Growing conversation history
### What This Means
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
3. **Mixed fleet**: Need a routing layer that decides per-session.
## Advanced Patterns
### 1. Progressive Context Loading
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
```
Turn 1: RAG search → top 3 chunks
Turn 2: Model asks "I need more context about X" → stuff X
Turn 3: Model has enough → continue
```
### 2. Context Budgeting
Allocate context budget across components:
```
System prompt: 2,000 tokens (always)
Recent messages: 10,000 tokens (last 5 turns)
RAG results: 8,000 tokens (top chunks)
Stuffed docs: 12,000 tokens (key docs)
---------------------------
Total: 32,000 tokens (fits 32K model)
```
### 3. Smart Compression
Before stuffing, compress older context:
- Summarize turns older than 10
- Remove tool call results (keep only final outputs)
- Deduplicate repeated information
- Use structured representations (JSON) instead of prose
## Empirical Benchmarks Needed
1. **Stuffing vs RAG accuracy** on our fact_store queries
2. **Latency comparison** at 32K, 64K, 128K context
3. **Cost per query** for cloud models at various context sizes
4. **Local model behavior** when pushed beyond rated context
## Recommendations
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
3. **Add context-size logging**: Track input tokens per session for data gathering
## References
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
---
*Sovereignty and service always.*

View File

@@ -1,127 +0,0 @@
#!/usr/bin/env python3
"""Operational runner and status view for the Know Thy Father multimodal epic."""
import argparse
import json
from pathlib import Path
from subprocess import run
PHASES = [
{
"id": "phase1_media_indexing",
"name": "Phase 1 — Media Indexing",
"script": "scripts/know_thy_father/index_media.py",
"command_template": "python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl",
"outputs": ["twitter-archive/know-thy-father/media_manifest.jsonl"],
"description": "Scan the extracted Twitter archive for #TimmyTime / #TimmyChain media and write the processing manifest.",
},
{
"id": "phase2_multimodal_analysis",
"name": "Phase 2 — Multimodal Analysis",
"script": "scripts/twitter_archive/analyze_media.py",
"command_template": "python3 scripts/twitter_archive/analyze_media.py --batch {batch_size}",
"outputs": [
"twitter-archive/know-thy-father/analysis.jsonl",
"twitter-archive/know-thy-father/meaning-kernels.jsonl",
"twitter-archive/know-thy-father/pipeline-status.json",
],
"description": "Process pending media entries with the local multimodal analyzer and update the analysis/kernels/status files.",
},
{
"id": "phase3_holographic_synthesis",
"name": "Phase 3 — Holographic Synthesis",
"script": "scripts/know_thy_father/synthesize_kernels.py",
"command_template": "python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json",
"outputs": [
"twitter-archive/knowledge/fathers_ledger.jsonl",
"twitter-archive/knowledge/fathers_ledger.summary.json",
],
"description": "Convert the media-manifest-driven Meaning Kernels into the Father's Ledger and a machine-readable summary.",
},
{
"id": "phase4_cross_reference_audit",
"name": "Phase 4 — Cross-Reference Audit",
"script": "scripts/know_thy_father/crossref_audit.py",
"command_template": "python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md",
"outputs": ["twitter-archive/notes/crossref_report.md"],
"description": "Compare Know Thy Father kernels against SOUL.md and related canon, then emit a Markdown audit report.",
},
{
"id": "phase5_processing_log",
"name": "Phase 5 — Processing Log / Status",
"script": "twitter-archive/know-thy-father/tracker.py",
"command_template": "python3 twitter-archive/know-thy-father/tracker.py report",
"outputs": ["twitter-archive/know-thy-father/REPORT.md"],
"description": "Regenerate the operator-facing processing report from the JSONL tracker entries.",
},
]
def build_pipeline_plan(batch_size: int = 10):
plan = []
for phase in PHASES:
plan.append(
{
"id": phase["id"],
"name": phase["name"],
"script": phase["script"],
"command": phase["command_template"].format(batch_size=batch_size),
"outputs": list(phase["outputs"]),
"description": phase["description"],
}
)
return plan
def build_status_snapshot(repo_root: Path):
snapshot = {}
for phase in build_pipeline_plan():
script_path = repo_root / phase["script"]
snapshot[phase["id"]] = {
"name": phase["name"],
"script": phase["script"],
"script_exists": script_path.exists(),
"outputs": [
{
"path": output,
"exists": (repo_root / output).exists(),
}
for output in phase["outputs"]
],
}
return snapshot
def run_step(repo_root: Path, step_id: str, batch_size: int = 10):
plan = {step["id"]: step for step in build_pipeline_plan(batch_size=batch_size)}
if step_id not in plan:
raise SystemExit(f"Unknown step: {step_id}")
step = plan[step_id]
return run(step["command"], cwd=repo_root, shell=True, check=False)
def main():
parser = argparse.ArgumentParser(description="Know Thy Father epic orchestration helper")
parser.add_argument("--batch-size", type=int, default=10)
parser.add_argument("--status", action="store_true")
parser.add_argument("--run-step", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
repo_root = Path(__file__).resolve().parents[2]
if args.run_step:
result = run_step(repo_root, args.run_step, batch_size=args.batch_size)
raise SystemExit(result.returncode)
payload = build_status_snapshot(repo_root) if args.status else build_pipeline_plan(batch_size=args.batch_size)
if args.json or args.status:
print(json.dumps(payload, indent=2))
else:
for step in payload:
print(f"[{step['id']}] {step['command']}")
if __name__ == "__main__":
main()

View File

@@ -1,159 +0,0 @@
#!/usr/bin/env python3
"""Prepare a MemPalace v3.0.0 integration packet for Ezra's Hermes home."""
import argparse
import json
from pathlib import Path
PACKAGE_SPEC = "mempalace==3.0.0"
DEFAULT_HERMES_HOME = "~/.hermes/"
DEFAULT_SESSIONS_DIR = "~/.hermes/sessions/"
DEFAULT_PALACE_PATH = "~/.mempalace/palace"
DEFAULT_WING = "ezra_home"
def build_yaml_template(wing: str, palace_path: str) -> str:
return (
f"wing: {wing}\n"
f"palace: {palace_path}\n"
"rooms:\n"
" - name: sessions\n"
" description: Conversation history and durable agent transcripts\n"
" globs:\n"
" - \"*.json\"\n"
" - \"*.jsonl\"\n"
" - name: config\n"
" description: Hermes configuration and runtime settings\n"
" globs:\n"
" - \"*.yaml\"\n"
" - \"*.yml\"\n"
" - \"*.toml\"\n"
" - name: docs\n"
" description: Notes, markdown docs, and operating reports\n"
" globs:\n"
" - \"*.md\"\n"
" - \"*.txt\"\n"
"people: []\n"
"projects: []\n"
)
def build_plan(overrides: dict | None = None) -> dict:
overrides = overrides or {}
hermes_home = overrides.get("hermes_home", DEFAULT_HERMES_HOME)
sessions_dir = overrides.get("sessions_dir", DEFAULT_SESSIONS_DIR)
palace_path = overrides.get("palace_path", DEFAULT_PALACE_PATH)
wing = overrides.get("wing", DEFAULT_WING)
yaml_template = build_yaml_template(wing=wing, palace_path=palace_path)
config_home = hermes_home[:-1] if hermes_home.endswith("/") else hermes_home
plan = {
"package_spec": PACKAGE_SPEC,
"hermes_home": hermes_home,
"sessions_dir": sessions_dir,
"palace_path": palace_path,
"wing": wing,
"config_path": f"{config_home}/mempalace.yaml",
"install_command": f"pip install {PACKAGE_SPEC}",
"init_command": f"mempalace init {hermes_home} --yes",
"mine_home_command": f"echo \"\" | mempalace mine {hermes_home}",
"mine_sessions_command": f"echo \"\" | mempalace mine {sessions_dir} --mode convos",
"search_command": 'mempalace search "your common queries"',
"wake_up_command": "mempalace wake-up",
"mcp_command": "hermes mcp add mempalace -- python -m mempalace.mcp_server",
"yaml_template": yaml_template,
"gotchas": [
"`mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.",
"The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.",
"Pipe empty stdin into mining commands (`echo \"\" | ...`) to avoid the entity-detector stdin hang on larger directories.",
"First mine downloads the ChromaDB embedding model cache (~79MB).",
"Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.",
],
}
return plan
def render_markdown(plan: dict) -> str:
gotchas = "\n".join(f"- {item}" for item in plan["gotchas"])
return f"""# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
{plan['install_command']}
{plan['init_command']}
cat > {plan['config_path']} <<'YAML'
{plan['yaml_template'].rstrip()}
YAML
{plan['mine_home_command']}
{plan['mine_sessions_command']}
{plan['search_command']}
{plan['wake_up_command']}
{plan['mcp_command']}
```
## Manual config template
```yaml
{plan['yaml_template'].rstrip()}
```
## Why this shape
- `wing: {plan['wing']}` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
{gotchas}
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.
"""
def main() -> None:
parser = argparse.ArgumentParser(description="Prepare the MemPalace Ezra integration packet")
parser.add_argument("--hermes-home", default=DEFAULT_HERMES_HOME)
parser.add_argument("--sessions-dir", default=DEFAULT_SESSIONS_DIR)
parser.add_argument("--palace-path", default=DEFAULT_PALACE_PATH)
parser.add_argument("--wing", default=DEFAULT_WING)
parser.add_argument("--output", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
plan = build_plan(
{
"hermes_home": args.hermes_home,
"sessions_dir": args.sessions_dir,
"palace_path": args.palace_path,
"wing": args.wing,
}
)
rendered = json.dumps(plan, indent=2) if args.json else render_markdown(plan)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"MemPalace integration packet written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
DAYLIGHT_START = "10:00"
DAYLIGHT_END = "16:00"
def load_manifest(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("machines", [])
return data
def validate_manifest(data: dict[str, Any]) -> None:
machines = data.get("machines", [])
if not machines:
raise ValueError("manifest must contain at least one machine")
seen: set[str] = set()
for machine in machines:
hostname = machine.get("hostname", "").strip()
if not hostname:
raise ValueError("each machine must declare a hostname")
if hostname in seen:
raise ValueError(f"duplicate hostname: {hostname} (unique hostnames are required)")
seen.add(hostname)
for field in ("machine_type", "ram_gb", "cpu_cores", "os", "adapter_condition"):
if field not in machine:
raise ValueError(f"machine {hostname} missing required field: {field}")
def _laptops(machines: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [m for m in machines if m.get("machine_type") == "laptop"]
def _desktop(machines: list[dict[str, Any]]) -> dict[str, Any] | None:
for machine in machines:
if machine.get("machine_type") == "desktop":
return machine
return None
def choose_anchor_agents(machines: list[dict[str, Any]], count: int = 2) -> list[dict[str, Any]]:
eligible = [
m for m in _laptops(machines)
if m.get("adapter_condition") in {"good", "ok"} and m.get("always_on_capable", True)
]
eligible.sort(key=lambda m: (m.get("idle_watts", 9999), -m.get("ram_gb", 0), -m.get("cpu_cores", 0), m["hostname"]))
return eligible[:count]
def assign_roles(machines: list[dict[str, Any]]) -> dict[str, Any]:
anchors = choose_anchor_agents(machines, count=2)
anchor_names = {m["hostname"] for m in anchors}
desktop = _desktop(machines)
mapping: dict[str, dict[str, Any]] = {}
for machine in machines:
hostname = machine["hostname"]
if desktop and hostname == desktop["hostname"]:
mapping[hostname] = {
"role": "desktop_nas",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "daylight_only",
}
elif hostname in anchor_names:
mapping[hostname] = {
"role": "anchor_agent",
"schedule": "24/7",
"duty_cycle": "continuous",
}
else:
mapping[hostname] = {
"role": "daylight_agent",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "peak_solar",
}
return {
"anchor_agents": [m["hostname"] for m in anchors],
"desktop_nas": desktop["hostname"] if desktop else None,
"role_mapping": mapping,
}
def build_plan(data: dict[str, Any]) -> dict[str, Any]:
validate_manifest(data)
machines = data["machines"]
role_plan = assign_roles(machines)
return {
"fleet_name": data.get("fleet_name", "timmy-laptop-fleet"),
"machine_count": len(machines),
"anchor_agents": role_plan["anchor_agents"],
"desktop_nas": role_plan["desktop_nas"],
"daylight_window": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"role_mapping": role_plan["role_mapping"],
}
def render_markdown(plan: dict[str, Any], data: dict[str, Any]) -> str:
lines = [
"# Laptop Fleet Deployment Plan",
"",
f"Fleet: {plan['fleet_name']}",
f"Machine count: {plan['machine_count']}",
f"24/7 anchor agents: {', '.join(plan['anchor_agents']) if plan['anchor_agents'] else 'TBD'}",
f"Desktop/NAS: {plan['desktop_nas'] or 'TBD'}",
f"Daylight schedule: {plan['daylight_window']}",
"",
"## Role mapping",
"",
"| Hostname | Role | Schedule | Duty cycle |",
"|---|---|---|---|",
]
for hostname, role in sorted(plan["role_mapping"].items()):
lines.append(f"| {hostname} | {role['role']} | {role['schedule']} | {role['duty_cycle']} |")
lines.extend([
"",
"## Machine inventory",
"",
"| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |",
"|---|---|---:|---:|---|---|---:|---|",
])
for machine in data["machines"]:
lines.append(
f"| {machine['hostname']} | {machine['machine_type']} | {machine['ram_gb']} | {machine['cpu_cores']} | {machine['os']} | {machine['adapter_condition']} | {machine.get('idle_watts', 'n/a')} | {machine.get('notes', '')} |"
)
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Plan LAB-005 laptop fleet deployment.")
parser.add_argument("manifest", help="Path to laptop fleet manifest YAML")
parser.add_argument("--markdown", action="store_true", help="Render a markdown deployment plan instead of JSON")
args = parser.parse_args()
data = load_manifest(args.manifest)
plan = build_plan(data)
if args.markdown:
print(render_markdown(plan, data))
else:
print(json.dumps(plan, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""NH Broadband install packet builder for the live scheduling step."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
def load_request(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("contact", {})
data.setdefault("service", {})
data.setdefault("call_log", [])
data.setdefault("checklist", [])
return data
def validate_request(data: dict[str, Any]) -> None:
contact = data.get("contact", {})
for field in ("name", "phone"):
if not contact.get(field, "").strip():
raise ValueError(f"contact.{field} is required")
service = data.get("service", {})
for field in ("address", "city", "state"):
if not service.get(field, "").strip():
raise ValueError(f"service.{field} is required")
if not data.get("checklist"):
raise ValueError("checklist must contain at least one item")
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
validate_request(data)
contact = data["contact"]
service = data["service"]
return {
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
"generated_utc": datetime.now(timezone.utc).isoformat(),
"contact": {
"name": contact["name"],
"phone": contact["phone"],
"email": contact.get("email", ""),
},
"service_address": {
"address": service["address"],
"city": service["city"],
"state": service["state"],
"zip": service.get("zip", ""),
},
"desired_plan": data.get("desired_plan", "residential-fiber"),
"call_log": data.get("call_log", []),
"checklist": [
{"item": item, "done": False} if isinstance(item, str) else item
for item in data["checklist"]
],
"status": "pending_scheduling_call",
}
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
contact = packet["contact"]
addr = packet["service_address"]
lines = [
f"# NH Broadband Install Packet",
"",
f"**Packet ID:** {packet['packet_id']}",
f"**Generated:** {packet['generated_utc']}",
f"**Status:** {packet['status']}",
"",
"## Contact",
"",
f"- **Name:** {contact['name']}",
f"- **Phone:** {contact['phone']}",
f"- **Email:** {contact.get('email', 'n/a')}",
"",
"## Service Address",
"",
f"- {addr['address']}",
f"- {addr['city']}, {addr['state']} {addr['zip']}",
"",
f"## Desired Plan",
"",
f"{packet['desired_plan']}",
"",
"## Call Log",
"",
]
if packet["call_log"]:
for entry in packet["call_log"]:
ts = entry.get("timestamp", "n/a")
outcome = entry.get("outcome", "n/a")
notes = entry.get("notes", "")
lines.append(f"- **{ts}** — {outcome}")
if notes:
lines.append(f" - {notes}")
else:
lines.append("_No calls logged yet._")
lines.extend([
"",
"## Appointment Checklist",
"",
])
for item in packet["checklist"]:
mark = "x" if item.get("done") else " "
lines.append(f"- [{mark}] {item['item']}")
lines.append("")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
parser.add_argument("request", help="Path to install request YAML")
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
args = parser.parse_args()
data = load_request(args.request)
packet = build_packet(data)
if args.markdown:
print(render_markdown(packet, data))
else:
print(json.dumps(packet, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,410 +0,0 @@
#!/usr/bin/env python3
"""
Predictive Resource Allocation — Timmy Foundation Fleet
Analyzes historical utilization patterns, predicts workload surges,
and recommends pre-provisioning actions.
Usage:
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/*.jsonl \
--heartbeat heartbeat/*.jsonl \
--horizon 6
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Quick forecast from default paths
python3 scripts/predictive_resource_allocator.py
"""
import argparse
import glob
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# ── Constants ────────────────────────────────────────────────────────────────
SURGE_THRESHOLD = 1.5
HEAVY_TOKEN_THRESHOLD = 10000
DEFAULT_HORIZON_HOURS = 6
DEFAULT_METRICS_GLOB = "metrics/local_*.jsonl"
DEFAULT_HEARTBEAT_GLOB = "heartbeat/ticks_*.jsonl"
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
# ── Data Loading ─────────────────────────────────────────────────────────────
def _parse_ts(value: str) -> datetime:
"""Parse ISO timestamp to UTC datetime."""
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(paths: Iterable[str]) -> List[dict]:
"""Load JSONL rows from one or more file paths/globs."""
rows: List[dict] = []
for pattern in paths:
for path in glob.glob(pattern):
if not os.path.isfile(path):
continue
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
return rows
def _default_paths(glob_pattern: str) -> List[str]:
"""Resolve a glob pattern relative to project root."""
full = os.path.join(ROOT_DIR, glob_pattern)
matches = glob.glob(full)
return matches if matches else [full]
# ── Time-Series Analysis ─────────────────────────────────────────────────────
def compute_rates(
rows: List[dict],
horizon_hours: int,
) -> Tuple[float, float, float, float, float]:
"""
Compare recent window vs baseline window.
Returns:
(recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate)
"""
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(_parse_ts(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
baseline = [
r for r in rows
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
]
recent_rate = len(recent) / max(horizon_hours, 1)
baseline_rate = (
len(baseline) / max(horizon_hours, 1)
if baseline
else max(0.1, recent_rate)
)
recent_tokens = sum(int(r.get("prompt_len", 0)) for r in recent)
baseline_tokens = sum(int(r.get("prompt_len", 0)) for r in baseline)
recent_token_rate = recent_tokens / max(horizon_hours, 1)
baseline_token_rate = (
baseline_tokens / max(horizon_hours, 1)
if baseline
else max(1.0, recent_token_rate)
)
request_surge = recent_rate / max(baseline_rate, 0.01)
token_surge = recent_token_rate / max(baseline_token_rate, 0.01)
surge_factor = max(request_surge, token_surge)
return recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate
def analyze_callers(rows: List[dict], horizon_hours: int) -> List[Dict[str, Any]]:
"""Summarize callers in the recent window."""
if not rows:
return []
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
calls: Counter = Counter()
tokens: Counter = Counter()
failures: Counter = Counter()
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
calls[caller] += 1
tokens[caller] += int(row.get("prompt_len", 0))
if not row.get("success", True):
failures[caller] += 1
summary = []
for caller in calls:
summary.append({
"caller": caller,
"requests": calls[caller],
"prompt_tokens": tokens[caller],
"failures": failures[caller],
"failure_rate": round(failures[caller] / max(calls[caller], 1) * 100, 1),
})
summary.sort(key=lambda x: (-x["requests"], -x["prompt_tokens"]))
return summary
def analyze_heartbeat(rows: List[dict], horizon_hours: int) -> Dict[str, int]:
"""Count infrastructure risks in recent window."""
if not rows:
return {"gitea_outages": 0, "inference_failures": 0, "total_checks": 0}
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
gitea_outages = 0
inference_failures = 0
total = 0
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
total += 1
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return {
"gitea_outages": gitea_outages,
"inference_failures": inference_failures,
"total_checks": total,
}
# ── Prediction Engine ────────────────────────────────────────────────────────
def predict_demand(
recent_rate: float,
baseline_rate: float,
surge_factor: float,
horizon_hours: int,
) -> Dict[str, Any]:
"""Predict near-term resource demand."""
predicted_rate = round(
max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2
)
if surge_factor > 3.0:
demand_level = "critical"
elif surge_factor > SURGE_THRESHOLD:
demand_level = "elevated"
elif surge_factor > 1.0:
demand_level = "normal"
else:
demand_level = "low"
return {
"predicted_requests_per_hour": predicted_rate,
"surge_factor": round(surge_factor, 2),
"demand_level": demand_level,
"horizon_hours": horizon_hours,
}
def determine_posture(
surge_factor: float,
callers: List[Dict[str, Any]],
heartbeat: Dict[str, int],
) -> Tuple[str, str, List[str]]:
"""
Determine fleet posture and recommended actions.
Returns:
(resource_mode, dispatch_posture, actions)
"""
mode = "steady"
posture = "normal"
actions: List[str] = []
# Surge detection
if surge_factor > SURGE_THRESHOLD:
mode = "surge"
actions.append(
"Pre-warm local inference before the next forecast window."
)
# Heavy background callers
heavy = [
c for c in callers
if c["prompt_tokens"] >= HEAVY_TOKEN_THRESHOLD
and ("batch" in c["caller"] or "know-thy-father" in c["caller"])
]
if heavy:
actions.append(
"Throttle or defer large background jobs until off-peak capacity is available."
)
# Caller failure rates
failing = [c for c in callers if c["failure_rate"] > 20 and c["requests"] >= 3]
if failing:
names = ", ".join(c["caller"] for c in failing[:3])
actions.append(
f"Investigate high failure rates in: {names}."
)
# Inference health
if heartbeat["inference_failures"] >= 2:
mode = "surge"
actions.append(
"Investigate local model reliability and reserve headroom for heartbeat traffic."
)
# Forge availability
if heartbeat["gitea_outages"] >= 1:
posture = "degraded"
actions.append(
"Pre-fetch or cache forge state before the next dispatch window."
)
if not actions:
actions.append(
"Maintain current resource allocation; no surge indicators detected."
)
return mode, posture, actions
# ── Main Forecast ────────────────────────────────────────────────────────────
def forecast(
metrics_paths: List[str],
heartbeat_paths: List[str],
horizon_hours: int = DEFAULT_HORIZON_HOURS,
) -> Dict[str, Any]:
"""Full resource forecast from metric and heartbeat logs."""
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_tok_rate, base_tok_rate = (
compute_rates(metric_rows, horizon_hours)
)
callers = analyze_callers(metric_rows, horizon_hours)
heartbeat = analyze_heartbeat(heartbeat_rows, horizon_hours)
demand = predict_demand(recent_rate, baseline_rate, surge_factor, horizon_hours)
mode, posture, actions = determine_posture(surge_factor, callers, heartbeat)
return {
"resource_mode": mode,
"dispatch_posture": posture,
"horizon_hours": horizon_hours,
"recent_request_rate": round(recent_rate, 2),
"baseline_request_rate": round(baseline_rate, 2),
"predicted_request_rate": demand["predicted_requests_per_hour"],
"surge_factor": demand["surge_factor"],
"demand_level": demand["demand_level"],
"recent_prompt_tokens_per_hour": round(recent_tok_rate, 2),
"baseline_prompt_tokens_per_hour": round(base_tok_rate, 2),
"gitea_outages": heartbeat["gitea_outages"],
"inference_failures": heartbeat["inference_failures"],
"heartbeat_checks": heartbeat["total_checks"],
"top_callers": callers[:10],
"recommended_actions": actions,
}
# ── Output Formatters ────────────────────────────────────────────────────────
def format_markdown(fc: Dict[str, Any]) -> str:
"""Format forecast as markdown report."""
lines = [
"# Predictive Resource Allocation — Fleet Forecast",
"",
f"**Horizon:** {fc['horizon_hours']} hours",
f"**Resource mode:** {fc['resource_mode']}",
f"**Dispatch posture:** {fc['dispatch_posture']}",
f"**Demand level:** {fc['demand_level']}",
"",
"## Demand Metrics",
"",
f"| Metric | Recent | Baseline |",
f"|--------|-------:|---------:|",
f"| Requests/hour | {fc['recent_request_rate']} | {fc['baseline_request_rate']} |",
f"| Prompt tokens/hour | {fc['recent_prompt_tokens_per_hour']} | {fc['baseline_prompt_tokens_per_hour']} |",
"",
f"**Surge factor:** {fc['surge_factor']}x",
f"**Predicted request rate:** {fc['predicted_request_rate']}/hour",
"",
"## Infrastructure Health",
"",
f"- Gitea outages (recent window): {fc['gitea_outages']}",
f"- Inference failures (recent window): {fc['inference_failures']}",
f"- Heartbeat checks analyzed: {fc['heartbeat_checks']}",
"",
"## Recommended Actions",
"",
]
for action in fc["recommended_actions"]:
lines.append(f"- {action}")
if fc["top_callers"]:
lines.extend([
"",
"## Top Callers (Recent Window)",
"",
"| Caller | Requests | Tokens | Failures |",
"|--------|---------:|-------:|---------:|",
])
for c in fc["top_callers"]:
lines.append(
f"| {c['caller']} | {c['requests']} | {c['prompt_tokens']} | {c['failures']} |"
)
return "\n".join(lines) + "\n"
# ── CLI ──────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(
description="Predictive resource allocation for the Timmy fleet"
)
parser.add_argument(
"--metrics", nargs="*", default=None,
help="Metric JSONL paths (supports globs). Default: metrics/local_*.jsonl"
)
parser.add_argument(
"--heartbeat", nargs="*", default=None,
help="Heartbeat JSONL paths (supports globs). Default: heartbeat/ticks_*.jsonl"
)
parser.add_argument(
"--horizon", type=int, default=DEFAULT_HORIZON_HOURS,
help="Forecast horizon in hours (default: 6)"
)
parser.add_argument(
"--json", action="store_true",
help="Output raw JSON instead of markdown"
)
args = parser.parse_args()
metrics_paths = args.metrics or _default_paths(DEFAULT_METRICS_GLOB)
heartbeat_paths = args.heartbeat or _default_paths(DEFAULT_HEARTBEAT_GLOB)
fc = forecast(metrics_paths, heartbeat_paths, args.horizon)
if args.json:
print(json.dumps(fc, indent=2))
else:
print(format_markdown(fc))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,35 @@
from pathlib import Path
def _content() -> str:
return Path("the-door-GENOME.md").read_text()
def test_the_door_genome_exists() -> None:
assert Path("the-door-GENOME.md").exists()
def test_the_door_genome_has_required_sections() -> None:
content = _content()
assert "# GENOME.md — the-door" in content
assert "## Project Overview" in content
assert "## Architecture" in content
assert "```mermaid" in content
assert "## Entry Points" in content
assert "## Data Flow" in content
assert "## Key Abstractions" in content
assert "## API Surface" in content
assert "## Test Coverage Gaps" in content
assert "## Security Considerations" in content
assert "## Dependencies" in content
assert "## Deployment" in content
assert "## Technical Debt" in content
def test_the_door_genome_captures_repo_specific_findings() -> None:
content = _content()
assert "lastUserMessage" in content
assert "localStorage" in content
assert "crisis-offline.html" in content
assert "hermes-gateway.service" in content
assert "/api/v1/chat/completions" in content

View File

@@ -1,76 +0,0 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "know_thy_father" / "epic_pipeline.py"
DOC_PATH = ROOT / "docs" / "KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestKnowThyFatherEpicPipeline(unittest.TestCase):
def test_build_pipeline_plan_contains_all_phases_in_order(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
plan = mod.build_pipeline_plan(batch_size=10)
self.assertEqual(
[step["id"] for step in plan],
[
"phase1_media_indexing",
"phase2_multimodal_analysis",
"phase3_holographic_synthesis",
"phase4_cross_reference_audit",
"phase5_processing_log",
],
)
self.assertIn("scripts/know_thy_father/index_media.py", plan[0]["command"])
self.assertIn("scripts/twitter_archive/analyze_media.py --batch 10", plan[1]["command"])
self.assertIn("scripts/know_thy_father/synthesize_kernels.py", plan[2]["command"])
self.assertIn("scripts/know_thy_father/crossref_audit.py", plan[3]["command"])
self.assertIn("twitter-archive/know-thy-father/tracker.py report", plan[4]["command"])
def test_status_snapshot_reports_key_artifact_paths(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
status = mod.build_status_snapshot(ROOT)
self.assertIn("phase1_media_indexing", status)
self.assertIn("phase2_multimodal_analysis", status)
self.assertIn("phase3_holographic_synthesis", status)
self.assertIn("phase4_cross_reference_audit", status)
self.assertIn("phase5_processing_log", status)
self.assertEqual(status["phase1_media_indexing"]["script"], "scripts/know_thy_father/index_media.py")
self.assertEqual(status["phase2_multimodal_analysis"]["script"], "scripts/twitter_archive/analyze_media.py")
self.assertEqual(status["phase5_processing_log"]["script"], "twitter-archive/know-thy-father/tracker.py")
self.assertTrue(status["phase1_media_indexing"]["script_exists"])
self.assertTrue(status["phase2_multimodal_analysis"]["script_exists"])
self.assertTrue(status["phase3_holographic_synthesis"]["script_exists"])
self.assertTrue(status["phase4_cross_reference_audit"]["script_exists"])
self.assertTrue(status["phase5_processing_log"]["script_exists"])
def test_repo_contains_multimodal_pipeline_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed Know Thy Father pipeline doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Know Thy Father — Multimodal Media Consumption Pipeline",
"scripts/know_thy_father/index_media.py",
"scripts/twitter_archive/analyze_media.py --batch 10",
"scripts/know_thy_father/synthesize_kernels.py",
"scripts/know_thy_father/crossref_audit.py",
"twitter-archive/know-thy-father/tracker.py report",
"Refs #582",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,52 +0,0 @@
from pathlib import Path
import yaml
from scripts.plan_laptop_fleet import build_plan, load_manifest, render_markdown, validate_manifest
def test_laptop_fleet_planner_script_exists() -> None:
assert Path("scripts/plan_laptop_fleet.py").exists()
def test_laptop_fleet_manifest_template_exists() -> None:
assert Path("docs/laptop-fleet-manifest.example.yaml").exists()
def test_build_plan_selects_two_lowest_idle_watt_laptops_as_anchors() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
assert plan["anchor_agents"] == ["timmy-anchor-a", "timmy-anchor-b"]
assert plan["desktop_nas"] == "timmy-desktop-nas"
assert plan["role_mapping"]["timmy-daylight-a"]["schedule"] == "10:00-16:00"
def test_validate_manifest_requires_unique_hostnames() -> None:
data = {
"machines": [
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 8, "cpu_cores": 4, "os": "Linux", "adapter_condition": "good"},
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 16, "cpu_cores": 8, "os": "Linux", "adapter_condition": "good"},
]
}
try:
validate_manifest(data)
except ValueError as exc:
assert "duplicate hostname" in str(exc)
assert "unique hostnames" in str(exc)
else:
raise AssertionError("validate_manifest should reject duplicate hostname")
def test_markdown_contains_anchor_agents_and_daylight_schedule() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
content = render_markdown(plan, data)
assert "24/7 anchor agents: timmy-anchor-a, timmy-anchor-b" in content
assert "Daylight schedule: 10:00-16:00" in content
assert "desktop_nas" in content
def test_manifest_template_is_valid_yaml() -> None:
data = yaml.safe_load(Path("docs/laptop-fleet-manifest.example.yaml").read_text())
assert data["fleet_name"] == "timmy-laptop-fleet"
assert len(data["machines"]) == 6

View File

@@ -1,68 +0,0 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "mempalace_ezra_integration.py"
DOC_PATH = ROOT / "docs" / "MEMPALACE_EZRA_INTEGRATION.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestMempalaceEzraIntegration(unittest.TestCase):
def test_build_plan_contains_issue_required_steps_and_gotchas(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan({})
self.assertEqual(plan["package_spec"], "mempalace==3.0.0")
self.assertIn("pip install mempalace==3.0.0", plan["install_command"])
self.assertEqual(plan["wing"], "ezra_home")
self.assertIn('echo "" | mempalace mine ~/.hermes/', plan["mine_home_command"])
self.assertIn('--mode convos', plan["mine_sessions_command"])
self.assertIn('mempalace wake-up', plan["wake_up_command"])
self.assertIn('hermes mcp add mempalace -- python -m mempalace.mcp_server', plan["mcp_command"])
self.assertIn('wing:', plan["yaml_template"])
self.assertTrue(any('stdin' in item.lower() for item in plan["gotchas"]))
self.assertTrue(any('wing:' in item for item in plan["gotchas"]))
def test_build_plan_accepts_path_and_wing_overrides(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan(
{
"hermes_home": "/root/wizards/ezra/home",
"sessions_dir": "/root/wizards/ezra/home/sessions",
"wing": "ezra_archive",
}
)
self.assertEqual(plan["wing"], "ezra_archive")
self.assertIn('/root/wizards/ezra/home', plan["mine_home_command"])
self.assertIn('/root/wizards/ezra/home/sessions', plan["mine_sessions_command"])
self.assertIn('wing: ezra_archive', plan["yaml_template"])
def test_repo_contains_mem_palace_ezra_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed MemPalace Ezra integration doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# MemPalace v3.0.0 — Ezra Integration Packet",
"pip install mempalace==3.0.0",
'echo "" | mempalace mine ~/.hermes/',
"mempalace mine ~/.hermes/sessions/ --mode convos",
"mempalace wake-up",
"hermes mcp add mempalace -- python -m mempalace.mcp_server",
"Report back to #568",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,105 +0,0 @@
from pathlib import Path
import yaml
from scripts.plan_nh_broadband_install import (
build_packet,
load_request,
render_markdown,
validate_request,
)
def test_script_exists() -> None:
assert Path("scripts/plan_nh_broadband_install.py").exists()
def test_example_request_exists() -> None:
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
def test_example_packet_exists() -> None:
assert Path("docs/nh-broadband-install-packet.example.md").exists()
def test_research_memo_exists() -> None:
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
def test_load_and_build_packet() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
assert packet["contact"]["name"] == "Timmy Operator"
assert packet["service_address"]["city"] == "Concord"
assert packet["service_address"]["state"] == "NH"
assert packet["status"] == "pending_scheduling_call"
assert len(packet["checklist"]) == 8
assert packet["checklist"][0]["done"] is False
def test_validate_rejects_missing_contact_name() -> None:
data = {
"contact": {"name": "", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "contact.name" in str(exc)
else:
raise AssertionError("should reject empty contact name")
def test_validate_rejects_missing_service_address() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "service.address" in str(exc)
else:
raise AssertionError("should reject empty service address")
def test_validate_rejects_empty_checklist() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": [],
}
try:
validate_request(data)
except ValueError as exc:
assert "checklist" in str(exc)
else:
raise AssertionError("should reject empty checklist")
def test_render_markdown_contains_key_sections() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "# NH Broadband Install Packet" in md
assert "## Contact" in md
assert "## Service Address" in md
assert "## Call Log" in md
assert "## Appointment Checklist" in md
assert "Concord" in md
assert "NH" in md
def test_render_markdown_shows_checklist_items() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "- [ ] Confirm exact-address availability" in md
def test_example_yaml_is_valid() -> None:
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
assert data["contact"]["name"] == "Timmy Operator"
assert len(data["checklist"]) == 8

View File

@@ -1,236 +0,0 @@
"""Tests for predictive resource allocation."""
import json
import os
import sys
from pathlib import Path
import pytest
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from predictive_resource_allocator import (
_parse_ts,
compute_rates,
analyze_callers,
analyze_heartbeat,
predict_demand,
determine_posture,
forecast,
format_markdown,
load_jsonl,
)
def _write_jsonl(path: Path, rows: list):
with open(path, "w") as f:
for row in rows:
f.write(json.dumps(row) + "\n")
def _make_metrics(count: int, base_hour: int = 0, caller: str = "heartbeat_tick",
prompt_len: int = 1000, success: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i // 60:02d}:{i % 60:02d}:00+00:00",
"caller": caller,
"prompt_len": prompt_len,
"response_len": 50,
"success": success,
})
return rows
def _make_heartbeat(count: int, base_hour: int = 0,
gitea_alive: bool = True, inference_ok: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i:02d}:00:00+00:00",
"perception": {
"gitea_alive": gitea_alive,
"model_health": {"inference_ok": inference_ok},
},
})
return rows
# ── Timestamp Parsing ────────────────────────────────────────────────────────
class TestTimestampParsing:
def test_z_suffix(self):
dt = _parse_ts("2026-03-29T12:00:00Z")
assert dt.tzinfo is not None
def test_explicit_offset(self):
dt = _parse_ts("2026-03-29T12:00:00+00:00")
assert dt.hour == 12
def test_ordering(self):
earlier = _parse_ts("2026-03-29T10:00:00Z")
later = _parse_ts("2026-03-29T12:00:00Z")
assert earlier < later
# ── Rate Computation ─────────────────────────────────────────────────────────
class TestComputeRates:
def test_empty_returns_defaults(self):
r_rate, b_rate, surge, _, _ = compute_rates([], 6)
assert r_rate == 0.0
assert surge == 1.0
def test_surge_detected(self):
# 1 baseline req, 20 recent reqs
baseline = _make_metrics(1, base_hour=0)
recent = _make_metrics(20, base_hour=12)
rows = baseline + recent
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge > 1.0
def test_no_surge_when_stable(self):
# Same rate in both windows
early = _make_metrics(6, base_hour=0)
late = _make_metrics(6, base_hour=12)
rows = early + late
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge < 1.5
# ── Caller Analysis ──────────────────────────────────────────────────────────
class TestAnalyzeCallers:
def test_empty(self):
assert analyze_callers([], 6) == []
def test_groups_by_caller(self):
rows = _make_metrics(3, caller="heartbeat_tick") + _make_metrics(2, caller="know-thy-father", prompt_len=15000)
callers = analyze_callers(rows, horizon_hours=24)
names = [c["caller"] for c in callers]
assert "heartbeat_tick" in names
assert "know-thy-father" in names
def test_sorted_by_request_count(self):
rows = _make_metrics(1, caller="rare") + _make_metrics(10, caller="frequent")
callers = analyze_callers(rows, horizon_hours=24)
assert callers[0]["caller"] == "frequent"
def test_failure_rate(self):
rows = _make_metrics(10, caller="flaky", success=False)
callers = analyze_callers(rows, horizon_hours=24)
flaky = [c for c in callers if c["caller"] == "flaky"][0]
assert flaky["failure_rate"] == 100.0
# ── Heartbeat Analysis ───────────────────────────────────────────────────────
class TestAnalyzeHeartbeat:
def test_empty(self):
result = analyze_heartbeat([], 6)
assert result["gitea_outages"] == 0
def test_detects_gitea_outage(self):
rows = _make_heartbeat(3, gitea_alive=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["gitea_outages"] == 3
def test_detects_inference_failure(self):
rows = _make_heartbeat(2, inference_ok=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["inference_failures"] == 2
# ── Demand Prediction ────────────────────────────────────────────────────────
class TestPredictDemand:
def test_critical_on_extreme_surge(self):
result = predict_demand(100.0, 10.0, 10.0, 6)
assert result["demand_level"] == "critical"
def test_elevated_on_moderate_surge(self):
result = predict_demand(50.0, 10.0, 2.0, 6)
assert result["demand_level"] == "elevated"
def test_normal_on_slight_increase(self):
result = predict_demand(12.0, 10.0, 1.2, 6)
assert result["demand_level"] == "normal"
def test_low_when_decreasing(self):
result = predict_demand(5.0, 10.0, 0.5, 6)
assert result["demand_level"] == "low"
# ── Posture Determination ────────────────────────────────────────────────────
class TestDeterminePosture:
def test_steady_normal_when_no_issues(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "steady"
assert posture == "normal"
assert "no surge indicators" in actions[0]
def test_surge_on_high_factor(self):
mode, posture, actions = determine_posture(2.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "surge"
assert any("Pre-warm" in a for a in actions)
def test_degraded_on_gitea_outage(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 3, "inference_failures": 0, "total_checks": 5})
assert posture == "degraded"
assert any("forge state" in a for a in actions)
def test_heavy_background_flagged(self):
callers = [{"caller": "know-thy-father-batch", "requests": 5, "prompt_tokens": 50000, "failures": 0, "failure_rate": 0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("Throttle" in a or "background" in a for a in actions)
def test_failing_callers_flagged(self):
callers = [{"caller": "bad_actor", "requests": 10, "prompt_tokens": 1000, "failures": 5, "failure_rate": 50.0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("failure rate" in a.lower() for a in actions)
# ── Full Forecast ────────────────────────────────────────────────────────────
class TestForecast:
def test_end_to_end(self, tmp_path):
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
_write_jsonl(metrics_path, _make_metrics(6, base_hour=0) + _make_metrics(30, base_hour=12))
_write_jsonl(heartbeat_path, _make_heartbeat(5, base_hour=8, inference_ok=False))
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert "resource_mode" in result
assert "dispatch_posture" in result
assert "surge_factor" in result
assert "top_callers" in result
assert "recommended_actions" in result
assert isinstance(result["top_callers"], list)
assert isinstance(result["recommended_actions"], list)
def test_empty_inputs(self, tmp_path):
metrics_path = tmp_path / "empty_m.jsonl"
heartbeat_path = tmp_path / "empty_h.jsonl"
metrics_path.write_text("")
heartbeat_path.write_text("")
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert result["resource_mode"] == "steady"
assert result["surge_factor"] == 1.0
# ── Markdown Output ──────────────────────────────────────────────────────────
class TestFormatMarkdown:
def test_contains_key_sections(self):
fc = forecast([], [], horizon_hours=6)
md = format_markdown(fc)
assert "Predictive Resource Allocation" in md
assert "Demand Metrics" in md
assert "Recommended Actions" in md
assert "Horizon" in md

419
the-door-GENOME.md Normal file
View File

@@ -0,0 +1,419 @@
# GENOME.md — the-door
Generated: 2026-04-15 00:03:16 EDT
Repo: Timmy_Foundation/the-door
Issue: timmy-home #673
## Project Overview
The Door is a crisis-first front door to Timmy: one URL, no account wall, no app install, and a permanently visible 988 escape hatch. The repo combines a static browser UI, a local Hermes API gateway behind nginx, and a Python crisis package that duplicates and enriches the frontend's safety logic.
What the codebase actually contains today:
- 1 primary browser app: `index.html`
- 4 companion browser assets/pages: `about.html`, `testimony.html`, `crisis-offline.html`, `sw.js`
- 17 Python files across canonical crisis logic, legacy shims, wrappers, and tests
- 2 Gitea workflows: `smoke.yml`, `sanity.yml`
- 1 systemd unit: `deploy/hermes-gateway.service`
- full test suite currently passing: `115 passed, 3 subtests passed`
The repo is small, but it is not simple. The true architecture is a layered safety system:
1. immediate browser-side crisis escalation
2. OpenAI-compatible streaming chat through Hermes
3. canonical Python crisis detection and response modules
4. nginx hardening, rate limiting, and localhost-only gateway exposure
5. service-worker offline fallback for crisis resources
The strongest pattern in this codebase is safety redundancy: the UI, prompt layer, offline fallback, and backend detection all try to catch the same sacred failure mode from different directions.
## Architecture
```mermaid
graph TD
U[User in browser] --> I[index.html chat app]
I --> K[Client-side crisis detection\ncrisisKeywords + explicitPhrases]
K --> P[Inline crisis panel]
K --> O[Fullscreen crisis overlay]
I --> L[localStorage\nchat history + safety plan]
I --> SW[sw.js service worker]
SW --> OFF[crisis-offline.html]
I --> API[/POST /api/v1/chat/completions/]
API --> NGINX[nginx reverse proxy]
NGINX --> H[Hermes Gateway :8644]
NGINX --> HC[/health proxy]
H --> G[crisis/gateway.py]
G --> D[crisis/detect.py]
G --> R[crisis/response.py]
D --> CR[CrisisDetectionResult]
R --> RESP[CrisisResponse]
D --> LEG[Legacy shims\ncrisis_detector.py\ncrisis_responder.py\ndying_detection]
DEP[deploy/playbook.yml\ndeploy/deploy.sh\nhermes-gateway.service] --> NGINX
DEP --> H
CI[.gitea/workflows\nsmoke.yml + sanity.yml] --> I
CI --> D
```
## Entry Points
### Browser / user-facing entry points
- `index.html`
- the main product
- contains inline CSS, inline JS, embedded `SYSTEM_PROMPT`, chat UI, crisis panel, fullscreen overlay, and safety-plan modal
- `about.html`
- static about page
- linked from the chat footer, though the main app currently links to `/about` while the repo ships `about.html`
- `testimony.html`
- static companion content page
- `crisis-offline.html`
- offline crisis resource page served by the service worker when navigation cannot reach the network
- `manifest.json`
- PWA metadata and shortcuts, including `/?safetyplan=true` and `tel:988`
- `sw.js`
- network-first service worker with offline crisis fallback
### Backend / Python entry points
- `crisis/detect.py`
- canonical detection engine and public detection API
- `crisis/response.py`
- canonical response generator, UI flags, prompt modifier, grounding helpers
- `crisis/gateway.py`
- integration layer for `check_crisis()` and `get_system_prompt()`
- `crisis/compassion_router.py`
- profile-based prompt routing abstraction parallel to `response.py`
- `crisis_detector.py`
- root legacy shim exposing canonical detection in older shapes
- `crisis_responder.py`
- root legacy response module with a richer compatibility response contract
- `dying_detection/__init__.py`
- deprecated wrapper around canonical detection
### Operational entry points
- `deploy/deploy.sh`
- most complete one-command operational bootstrap path in the repo
- `deploy/playbook.yml`
- Ansible provisioning path for swap, packages, nginx, firewall, and site files
- `deploy/hermes-gateway.service`
- systemd unit running `hermes gateway --platform api_server --port 8644`
- `.gitea/workflows/smoke.yml`
- parse/syntax checks and secret scan
- `.gitea/workflows/sanity.yml`
- basic repo sanity grep checks for 988/system-prompt presence
## Data Flow
### Happy path: user message to streamed response
1. User types into `#msg-input` in `index.html`.
2. `sendMessage()`:
- trims text
- appends a user bubble to the DOM
- pushes `{role: 'user', content: text}` into the in-memory `messages` array
- runs client-side `checkCrisis(text)`
- clears the input and starts streaming
3. `streamResponse()` builds the request payload:
- prepends a synthetic system message from `getSystemPrompt(lastUserMessage || '')`
- posts JSON to `/api/v1/chat/completions`
4. nginx proxies `/api/*` to `127.0.0.1:8644`.
5. Hermes streams OpenAI-style SSE chunks back to the browser.
6. The browser reads `choices[0].delta.content` and incrementally renders the assistant message.
7. When streaming ends, the assistant turn is pushed into `messages`, saved to `localStorage`, and passed through `checkCrisis(fullText)` again.
### Immediate local crisis escalation path
1. `checkCrisis(text)` scans substrings against two client-side lists.
2. Low-tier/soft crisis text reveals the inline crisis panel.
3. Explicit intent text triggers the fullscreen overlay and delayed-dismiss flow.
4. The user still remains in the conversation flow rather than being hard-redirected away.
### Offline / failure path
1. `sw.js` precaches static routes and the crisis fallback page.
2. Navigation uses a network-first strategy with timeout fallback.
3. If network and cache both fail, the service worker tries `crisis-offline.html`.
4. If API streaming fails, `index.html` inserts a static emergency message with 988 and 741741 instead of a blank error.
## Key Abstractions
### 1. `SYSTEM_PROMPT`
Embedded directly in `index.html`, not loaded at runtime from `system-prompt.txt`. The browser treats the prompt as part of the application runtime contract.
### 2. `COMPASSION_PROFILES`
Frontend prompt-state profiles for `CRITICAL`, `HIGH`, `MEDIUM`, `LOW`, and `NONE`. They encode tone and directive shifts, but the current `levelMap` only maps browser levels to `NONE`, `MEDIUM`, and `CRITICAL`, leaving `HIGH` and `LOW` effectively unused in the main prompt-building path.
### 3. Client-side crisis detector
In `index.html`, the browser uses:
- `crisisKeywords` for panel escalation
- `explicitPhrases` for hard overlay escalation
- `checkCrisis(text)` for UI behavior
- `getCrisisLevel(text)` for prompt shaping
This is fast and local, but it is also a separate detector from the canonical Python package.
### 4. `CrisisDetectionResult`
The core canonical backend dataclass from `crisis/detect.py`:
- `level`
- `indicators`
- `recommended_action`
- `score`
- `matches`
This is the canonical representation shared by the main Python crisis stack.
### 5. `CrisisResponse`
In `crisis/response.py`, the canonical response dataclass ties backend detection to frontend/UI needs:
- `timmy_message`
- `show_crisis_panel`
- `show_overlay`
- `provide_988`
- `escalate`
### 6. Legacy compatibility layer
The repo still carries older interfaces:
- `crisis_detector.py`
- `crisis_responder.py`
- `dying_detection/__init__.py`
These preserve compatibility, but they also create drift risk:
- `MEDIUM` vs `MODERATE`
- two different `CrisisResponse` contracts
- two prompt-routing paths (`response.py` vs `compassion_router.py`)
### 7. Browser persistence contract
`localStorage` is a real part of runtime state despite some docs claiming otherwise.
Keys:
- `timmy_chat_history`
- `timmy_safety_plan`
That means The Door is not truly “close tab = gone” in its current implementation.
## API Surface
### Browser -> Hermes API contract
`index.html` sends:
```json
{
"model": "timmy",
"messages": [
{"role": "system", "content": "...prompt..."},
{"role": "assistant", "content": "..."},
{"role": "user", "content": "..."}
],
"stream": true
}
```
Endpoint:
- `/api/v1/chat/completions`
Expected response shape:
- streaming SSE lines beginning with `data: `
- chunk payloads with `choices[0].delta.content`
- `[DONE]` terminator
### Canonical Python API
- `crisis.detect.detect_crisis(text)`
- `crisis.response.generate_response(detection)`
- `crisis.response.process_message(text)`
- `crisis.response.get_system_prompt_modifier(detection)`
- `crisis.gateway.check_crisis(text)`
- `crisis.gateway.get_system_prompt(base_prompt, text="")`
- `crisis.gateway.format_gateway_response(text, pretty=True)`
### Legacy / compatibility API
- `CrisisDetector.scan()`
- `detect_crisis_legacy()`
- root `crisis_responder.generate_response()`
- deprecated `dying_detection.detect()` and helpers
## Test Coverage Gaps
### Current state
Verified on fresh `main` clone of `the-door`:
- `python3 -m pytest -q` -> `115 passed, 3 subtests passed`
What is already covered well:
- canonical crisis detection tiers
- response flags and gateway structure
- many false-positive regressions
- service-worker offline crisis fallback
- crisis overlay focus trap string-level assertions
- deprecated wrapper behavior
### High-value gaps that still matter
1. No real browser test of the actual send path in `index.html`.
- The repo currently contains a concrete scope bug:
- `sendMessage()` defines `var lastUserMessage = text;`
- `streamResponse()` later uses `getSystemPrompt(lastUserMessage || '')`
- `lastUserMessage` is not in `streamResponse()` scope
- Existing passing tests do not execute this real path.
2. No DOM-true test for overlay background locking.
- The overlay code targets `document.querySelector('.app')` and `getElementById('chat')`.
- The main document uses `id="app"`, not `.app`, and does not expose a `#chat` node.
- Current tests assert code presence, not selector correctness.
3. No route validation for `/about` vs `about.html`.
- The footer links to `/about`.
- The repo ships `about.html`.
- With current nginx `try_files`, this looks like a drift bug.
4. Legacy responder path remains largely untested.
- `crisis_responder.py` is still present and meaningful but lacks direct tests for its richer response payloads.
5. CI does not run pytest.
- The repo has a substantial suite, but Gitea workflows only do syntax/grep checks.
### Generated missing tests for critical paths
These are the three most important tests this codebase still needs.
#### A. Browser send-path smoke test
Goal: catch the `lastUserMessage` regression and ensure the chat request actually builds.
```python
# Example Playwright/browser test
async def test_send_message_builds_stream_request(page):
await page.goto("file:///.../index.html")
await page.fill("#msg-input", "hello")
await page.click("#send-btn")
# Expect no ReferenceError and one request to /api/v1/chat/completions
```
#### B. Overlay selector correctness test
Goal: prove the inert/background lock hits real DOM nodes, not dead selectors.
```python
def test_overlay_background_selectors_match_real_dom():
html = Path("index.html").read_text()
assert 'id="app"' in html
assert "querySelector('.app')" not in html
assert "getElementById('chat')" not in html
```
#### C. Legacy responder contract test
Goal: keep compatibility layers honest until they are deleted.
```python
from crisis_responder import process_message
def test_legacy_responder_returns_resources_for_high_risk():
response = process_message("I want to kill myself")
assert response.escalate is True
assert response.show_overlay is True
assert any("988" in r for r in response.resources)
```
## Security Considerations
### Strengths
- Browser message bubbles use `textContent`, not unsafe inner HTML, for chat content.
- API calls are same-origin and proxied through nginx.
- Service worker does not cache `/api/*` responses.
- nginx includes CSP, HSTS, and localhost-only gateway exposure.
- UFW/docs expect only `22`, `80`, and `443` to be public.
- systemd unit hardening is present in `hermes-gateway.service`.
### Risks
1. `localStorage` persistence contradicts the privacy story.
- chat history and safety plan are stored in plaintext on the device
- shared-device risk is real
2. `script-src 'unsafe-inline'` is required by the current architecture.
- all runtime logic and CSS are inline in `index.html`
- this weakens CSP/XSS posture
3. Safety enforcement is still heavily client-shaped.
- the frontend always embeds the crisis-aware prompt
- deployment does not clearly prove that all callers are forced through server-side crisis middleware
- direct API clients may bypass browser-supplied context
4. Client and server detection logic can drift.
- the browser uses substring lists
- the backend uses canonical regex tiers in `crisis/detect.py`
- parity is not tested
5. Deprecated wrapper emits a deterministic session hash.
- `dying_detection` exposes a truncated SHA-256 fingerprint of text
- useful for correlation, but still privacy-sensitive
## Dependencies
### Runtime
- Hermes binary at `/usr/local/bin/hermes`
- nginx
- certbot + python certbot nginx plugin
- ufw
- curl
- Python 3
- browser with JavaScript, service-worker, and `localStorage` support
### Test / operator dependencies
- pytest
- PyYAML (used implicitly by smoke workflow checks)
- ansible / ansible-playbook
- rsync, ssh, scp
- openssl
- dig / dnsutils
### In-repo dependency style
- Python code is effectively stdlib-first
- no `requirements.txt`, `pyproject.toml`, or `package.json`
- operational dependencies live mostly in docs and scripts rather than a declared manifest
## Deployment
### Intended production path
Browser -> nginx TLS -> static webroot + `/api/*` reverse proxy -> Hermes on `127.0.0.1:8644`
### Main deployment commands
- `make deploy`
- `make deploy-bash`
- `make push`
- `make check`
- `bash deploy/deploy.sh`
- `cd deploy && ansible-playbook -i inventory.ini playbook.yml`
### Operational files
- `deploy/nginx.conf`
- `deploy/playbook.yml`
- `deploy/deploy.sh`
- `deploy/hermes-gateway.service`
- `resilience/health-check.sh`
- `resilience/service-restart.sh`
### Deployment reality check
The repo's deploy surface is not fully coherent:
- `deploy/deploy.sh` is the most complete operational path
- `deploy/playbook.yml` provisions nginx/site/firewall/SSL but does not manage `hermes-gateway.service`
- resilience scripts still target port `8000`, not the real gateway at `8644`
- `crisis-offline.html` is required by `sw.js`, but full deploy paths do not appear to ship it consistently
## Technical Debt
### Highest-priority debt
1. Fix the `lastUserMessage` scope bug in `index.html`.
2. Fix overlay background selector drift (`.app` vs `#app`, missing `#chat`).
3. Fix `/about` route drift.
4. Add pytest to Gitea CI.
5. Make deploy paths ship the same artifact set, including `crisis-offline.html`.
6. Make the recommended Ansible path actually manage `hermes-gateway.service`.
7. Align or remove resilience scripts targeting the wrong port/service.
8. Resolve doc drift:
- ARCHITECTURE says “close tab = gone,” but implementation uses `localStorage`
- BACKEND_SETUP still says 49 tests, while current verified suite is 115 + 3 subtests
- audit docs understate current automation coverage
### Strategic debt
- Duplicate crisis logic across browser and backend
- Parallel prompt-routing mechanisms (`response.py` and `compassion_router.py`)
- Legacy compatibility layers that still matter but are not first-class tested
- No declared dependency manifest for operator tooling
- No true E2E browser validation of the core conversation loop
## Bottom Line
The Door is not just a static landing page. It is a small but layered safety system with three cores:
- a browser-first crisis chat UI
- a canonical Python crisis package
- a thin nginx/Hermes deployment shell
Its design is morally serious and operationally pragmatic. Its main weaknesses are not missing ambition; they are drift, duplication, and shallow verification at the exact seams where the browser, backend, and deploy layer meet.

View File

@@ -17,24 +17,8 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import importlib.util
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution."""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
from harness import UniWizardHarness, House, ExecutionResult
class TaskType(Enum):

View File

@@ -8,30 +8,13 @@ import time
import sys
import argparse
import os
import importlib.util
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution.
Prevents namespace collisions when multiple directories contain modules
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
"""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
sys.path.insert(0, str(Path(__file__).parent))
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
from author_whitelist import AuthorWhitelist