Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
b90a15baca feat: codify phase-1 survival state (#548)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 11s
2026-04-15 00:24:25 -04:00
24 changed files with 356 additions and 1642 deletions

View File

@@ -1,21 +0,0 @@
# DNS Records — Fleet Domain Configuration
# Sync with: python3 scripts/dns-manager.py sync --zone alexanderwhitestone.com --config dns-records.yaml
# Part of #692
zone: alexanderwhitestone.com
records:
- name: forge.alexanderwhitestone.com
ip: 143.198.27.163
ttl: 300
note: Gitea forge (Ezra VPS)
- name: bezalel.alexanderwhitestone.com
ip: 167.99.126.228
ttl: 300
note: Bezalel VPS
- name: allegro.alexanderwhitestone.com
ip: 167.99.126.228
ttl: 300
note: Allegro VPS (shared with Bezalel)

View File

@@ -0,0 +1,61 @@
# [PHASE-1] Survival - Keep the Lights On
Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.
## Phase Definition
- Current state: fleet exists, agents run, everything important still depends on human vigilance.
- Resources tracked here: Capacity, Uptime.
- Next phase: [PHASE-2] Automation - Self-Healing Infrastructure
## Current Buildings
- VPS hosts: Ezra, Allegro, Bezalel
- Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker
- Gitea forge
- Evennia worlds
## Current Resource Snapshot
- Fleet operational: yes
- Uptime baseline: 0.0%
- Days at or above 95% uptime: 0
- Capacity utilization: 0.0%
## Next Phase Trigger
To unlock [PHASE-2] Automation - Self-Healing Infrastructure, the fleet must hold both of these conditions at once:
- Uptime >= 95% for 30 consecutive days
- Capacity utilization > 60%
- Current trigger state: NOT READY
## Missing Requirements
- Uptime 0.0% / 95.0%
- Days at or above 95% uptime: 0/30
- Capacity utilization 0.0% / >60.0%
## Manual Clicker Interpretation
Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.
Every restart, every SSH, every check is a manual click.
## Manual Clicks Still Required
- Restart agents and services by hand when a node goes dark.
- SSH into machines to verify health, disk, and memory.
- Check Gitea, relay, and world services manually before and after changes.
- Act as the scheduler when automation is missing or only partially wired.
## Repo Signals Already Present
- `scripts/fleet_health_probe.sh` — Automated health probe exists and can supply the uptime baseline for the next phase.
- `scripts/fleet_milestones.py` — Milestone tracker exists, so survival achievements can be narrated and logged.
- `scripts/auto_restart_agent.sh` — Auto-restart tooling already exists as phase-2 groundwork.
- `scripts/backup_pipeline.sh` — Backup pipeline scaffold exists for post-survival automation work.
- `infrastructure/timmy-bridge/reports/generate_report.py` — Bridge reporting exists and can summarize heartbeat-driven uptime.
## Notes
- The fleet is alive, but the human is still the control loop.
- Phase 1 is about naming reality plainly so later automation has a baseline to beat.

View File

@@ -1,61 +0,0 @@
# Know Thy Father — Multimodal Media Consumption Pipeline
Refs #582
This document makes the epic operational by naming the current source-of-truth scripts, their handoff artifacts, and the one-command runner that coordinates them.
## Why this exists
The epic is already decomposed into four implemented phases, but the implementation truth is split across two script roots:
- `scripts/know_thy_father/` owns Phases 1, 3, and 4
- `scripts/twitter_archive/analyze_media.py` owns Phase 2
- `twitter-archive/know-thy-father/tracker.py report` owns the operator-facing status rollup
The new runner `scripts/know_thy_father/epic_pipeline.py` does not replace those scripts. It stitches them together into one explicit, reviewable plan.
## Phase map
| Phase | Script | Primary output |
|-------|--------|----------------|
| 1. Media Indexing | `scripts/know_thy_father/index_media.py` | `twitter-archive/know-thy-father/media_manifest.jsonl` |
| 2. Multimodal Analysis | `scripts/twitter_archive/analyze_media.py --batch 10` | `twitter-archive/know-thy-father/analysis.jsonl` + `meaning-kernels.jsonl` + `pipeline-status.json` |
| 3. Holographic Synthesis | `scripts/know_thy_father/synthesize_kernels.py` | `twitter-archive/knowledge/fathers_ledger.jsonl` |
| 4. Cross-Reference Audit | `scripts/know_thy_father/crossref_audit.py` | `twitter-archive/notes/crossref_report.md` |
| 5. Processing Log | `twitter-archive/know-thy-father/tracker.py report` | `twitter-archive/know-thy-father/REPORT.md` |
## One command per phase
```bash
python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl
python3 scripts/twitter_archive/analyze_media.py --batch 10
python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json
python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md
python3 twitter-archive/know-thy-father/tracker.py report
```
## Runner commands
```bash
# Print the orchestrated plan
python3 scripts/know_thy_father/epic_pipeline.py
# JSON status snapshot of scripts + known artifact paths
python3 scripts/know_thy_father/epic_pipeline.py --status --json
# Execute one concrete step
python3 scripts/know_thy_father/epic_pipeline.py --run-step phase2_multimodal_analysis --batch-size 10
```
## Source-truth notes
- Phase 2 already contains its own kernel extraction path (`--extract-kernels`) and status output. The epic runner does not reimplement that logic.
- Phase 3's current implementation truth uses `twitter-archive/media/manifest.jsonl` as its default input. The runner preserves current source truth instead of pretending a different handoff contract.
- The processing log in `twitter-archive/know-thy-father/PROCESSING_LOG.md` can drift from current code reality. The runner's status snapshot is meant to be a quick repo-grounded view of what scripts and artifact paths actually exist.
## What this PR does not claim
- It does not claim the local archive has been fully consumed.
- It does not claim the halted processing log has been resumed.
- It does not claim fact_store ingestion has been fully wired end-to-end.
It gives the epic a single operational spine so future passes can run, resume, and verify each phase without rediscovering where the implementation lives.

View File

@@ -1,92 +0,0 @@
# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
pip install mempalace==3.0.0
mempalace init ~/.hermes/ --yes
cat > ~/.hermes/mempalace.yaml <<'YAML'
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
YAML
echo "" | mempalace mine ~/.hermes/
echo "" | mempalace mine ~/.hermes/sessions/ --mode convos
mempalace search "your common queries"
mempalace wake-up
hermes mcp add mempalace -- python -m mempalace.mcp_server
```
## Manual config template
```yaml
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
```
## Why this shape
- `wing: ezra_home` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
- `mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.
- The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.
- Pipe empty stdin into mining commands (`echo "" | ...`) to avoid the entity-detector stdin hang on larger directories.
- First mine downloads the ChromaDB embedding model cache (~79MB).
- Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.

View File

@@ -12,6 +12,7 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
| Render Phase-1 survival report | timmy-home | `python3 scripts/fleet_phase_status.py --output docs/FLEET_PHASE_1_SURVIVAL.md` |
## the-nexus (Frontend + Brain)

View File

@@ -1,62 +0,0 @@
fleet_name: timmy-laptop-fleet
machines:
- hostname: timmy-anchor-a
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: macOS
adapter_condition: good
idle_watts: 11
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-anchor-b
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Linux
adapter_condition: good
idle_watts: 13
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-daylight-a
machine_type: laptop
ram_gb: 32
cpu_cores: 10
os: macOS
adapter_condition: ok
idle_watts: 22
always_on_capable: true
notes: higher-performance daylight compute
- hostname: timmy-daylight-b
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: Linux
adapter_condition: ok
idle_watts: 19
always_on_capable: true
notes: daylight compute node
- hostname: timmy-daylight-c
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Windows
adapter_condition: needs_replacement
idle_watts: 17
always_on_capable: false
notes: repair power adapter before production duty
- hostname: timmy-desktop-nas
machine_type: desktop
ram_gb: 64
cpu_cores: 12
os: Linux
adapter_condition: good
idle_watts: 58
always_on_capable: false
has_4tb_ssd: true
notes: desktop plus 4TB SSD NAS and heavy compute during peak sun

View File

@@ -1,30 +0,0 @@
# Laptop Fleet Deployment Plan
Fleet: timmy-laptop-fleet
Machine count: 6
24/7 anchor agents: timmy-anchor-a, timmy-anchor-b
Desktop/NAS: timmy-desktop-nas
Daylight schedule: 10:00-16:00
## Role mapping
| Hostname | Role | Schedule | Duty cycle |
|---|---|---|---|
| timmy-anchor-a | anchor_agent | 24/7 | continuous |
| timmy-anchor-b | anchor_agent | 24/7 | continuous |
| timmy-daylight-a | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-b | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-c | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-desktop-nas | desktop_nas | 10:00-16:00 | daylight_only |
## Machine inventory
| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |
|---|---|---:|---:|---|---|---:|---|
| timmy-anchor-a | laptop | 16 | 8 | macOS | good | 11 | candidate 24/7 anchor agent |
| timmy-anchor-b | laptop | 8 | 4 | Linux | good | 13 | candidate 24/7 anchor agent |
| timmy-daylight-a | laptop | 32 | 10 | macOS | ok | 22 | higher-performance daylight compute |
| timmy-daylight-b | laptop | 16 | 8 | Linux | ok | 19 | daylight compute node |
| timmy-daylight-c | laptop | 8 | 4 | Windows | needs_replacement | 17 | repair power adapter before production duty |
| timmy-desktop-nas | desktop | 64 | 12 | Linux | good | 58 | desktop plus 4TB SSD NAS and heavy compute during peak sun |

View File

@@ -1,37 +0,0 @@
# NH Broadband Install Packet
**Packet ID:** nh-bb-20260415-113232
**Generated:** 2026-04-15T11:32:32.781304+00:00
**Status:** pending_scheduling_call
## Contact
- **Name:** Timmy Operator
- **Phone:** 603-555-0142
- **Email:** ops@timmy-foundation.example
## Service Address
- 123 Example Lane
- Concord, NH 03301
## Desired Plan
residential-fiber
## Call Log
- **2026-04-15T14:30:00Z** — no_answer
- Called 1-800-NHBB-INFO, ring-out after 45s
## Appointment Checklist
- [ ] Confirm exact-address availability via NH Broadband online lookup
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
- [ ] Select appointment window (morning/afternoon)
- [ ] Confirm payment method (credit card / ACH)
- [ ] Receive appointment confirmation number
- [ ] Prepare site: clear path to ONT install location
- [ ] Post-install: run speed test (fast.com / speedtest.net)
- [ ] Log final speeds and appointment outcome

View File

@@ -1,27 +0,0 @@
contact:
name: Timmy Operator
phone: "603-555-0142"
email: ops@timmy-foundation.example
service:
address: "123 Example Lane"
city: Concord
state: NH
zip: "03301"
desired_plan: residential-fiber
call_log:
- timestamp: "2026-04-15T14:30:00Z"
outcome: no_answer
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
checklist:
- "Confirm exact-address availability via NH Broadband online lookup"
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
- "Select appointment window (morning/afternoon)"
- "Confirm payment method (credit card / ACH)"
- "Receive appointment confirmation number"
- "Prepare site: clear path to ONT install location"
- "Post-install: run speed test (fast.com / speedtest.net)"
- "Log final speeds and appointment outcome"

View File

@@ -1,35 +0,0 @@
# NH Broadband — Public Research Memo
**Date:** 2026-04-15
**Status:** Draft — separates verified facts from unverified live work
**Refs:** #533, #740
---
## Verified (official public sources)
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
- Residential fiber plans are offered; speed tiers vary by location.
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
## Unverified / Requires Live Work
| Item | Status | Notes |
|---|---|---|
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
| Post-install speed test results | ❌ pending | Must run after physical install completes |
## Next Steps (Refs #740)
1. Run address availability lookup on `nhbroadband.com`
2. Call 1-800-NHBB-INFO to schedule install
3. Confirm payment method
4. Receive appointment confirmation number
5. Prepare site (clear ONT install path)
6. Post-install: speed test and log results

View File

@@ -1,102 +0,0 @@
# Long Context vs RAG Decision Framework
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
**Date**: 2026-04-15
**Status**: RESEARCHED
## Executive Summary
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
## The Core Insight
**Long context ≠ better answers.** Research shows:
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
- Cost scales quadratically with context length (attention computation)
- Latency increases linearly with input length
**RAG ≠ always better.** Retrieval introduces:
- Recall errors (miss relevant chunks)
- Precision errors (retrieve irrelevant chunks)
- Chunking artifacts (splitting mid-sentence)
- Additional latency for embedding + search
## Decision Matrix
| Scenario | Context Size | Recommendation | Why |
|----------|-------------|---------------|-----|
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
## Our Stack Constraints
### What We Have
- **Cloud models**: 128K-200K context (OpenRouter providers)
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
- **Hermes fact_store**: SQLite FTS5 full-text search
- **Memory**: MemPalace holographic embeddings
- **Session context**: Growing conversation history
### What This Means
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
3. **Mixed fleet**: Need a routing layer that decides per-session.
## Advanced Patterns
### 1. Progressive Context Loading
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
```
Turn 1: RAG search → top 3 chunks
Turn 2: Model asks "I need more context about X" → stuff X
Turn 3: Model has enough → continue
```
### 2. Context Budgeting
Allocate context budget across components:
```
System prompt: 2,000 tokens (always)
Recent messages: 10,000 tokens (last 5 turns)
RAG results: 8,000 tokens (top chunks)
Stuffed docs: 12,000 tokens (key docs)
---------------------------
Total: 32,000 tokens (fits 32K model)
```
### 3. Smart Compression
Before stuffing, compress older context:
- Summarize turns older than 10
- Remove tool call results (keep only final outputs)
- Deduplicate repeated information
- Use structured representations (JSON) instead of prose
## Empirical Benchmarks Needed
1. **Stuffing vs RAG accuracy** on our fact_store queries
2. **Latency comparison** at 32K, 64K, 128K context
3. **Cost per query** for cloud models at various context sizes
4. **Local model behavior** when pushed beyond rated context
## Recommendations
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
3. **Add context-size logging**: Track input tokens per session for data gathering
## References
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
---
*Sovereignty and service always.*

View File

@@ -1,262 +0,0 @@
#!/usr/bin/env python3
"""
dns-manager.py — Manage DNS records via Cloudflare API.
Provides add/update/delete/list operations for DNS A records.
Designed for fleet VPS nodes that need API-driven DNS management.
Usage:
python3 scripts/dns-manager.py list --zone alexanderwhitestone.com
python3 scripts/dns-manager.py add --zone alexanderwhitestone.com --name forge --ip 143.198.27.163
python3 scripts/dns-manager.py update --zone alexanderwhitestone.com --name forge --ip 167.99.126.228
python3 scripts/dns-manager.py delete --zone alexanderwhitestone.com --name forge
python3 scripts/dns-manager.py sync --config dns-records.yaml
Config via env:
CLOUDFLARE_API_TOKEN — API token with DNS:Edit permission
CLOUDFLARE_ZONE_ID — Zone ID (auto-resolved if not set)
Part of #692: Sovereign DNS management.
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from pathlib import Path
from typing import Any, Dict, List, Optional
CF_API = "https://api.cloudflare.com/client/v4"
# ── Auth ──────────────────────────────────────────────────────────────────
def get_token() -> str:
"""Get Cloudflare API token from env or config."""
token = os.environ.get("CLOUDFLARE_API_TOKEN", "")
if not token:
token_path = Path.home() / ".config" / "cloudflare" / "token"
if token_path.exists():
token = token_path.read_text().strip()
if not token:
print("ERROR: No Cloudflare API token found.", file=sys.stderr)
print("Set CLOUDFLARE_API_TOKEN env var or create ~/.config/cloudflare/token", file=sys.stderr)
sys.exit(1)
return token
def cf_request(method: str, path: str, token: str, data: dict = None) -> dict:
"""Make a Cloudflare API request."""
url = f"{CF_API}{path}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read().decode())
if not result.get("success", True):
errors = result.get("errors", [])
print(f"API error: {errors}", file=sys.stderr)
sys.exit(1)
return result
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else ""
print(f"HTTP {e.code}: {body[:500]}", file=sys.stderr)
sys.exit(1)
# ── Zone Resolution ──────────────────────────────────────────────────────
def resolve_zone_id(zone_name: str, token: str) -> str:
"""Resolve zone name to zone ID."""
cached = os.environ.get("CLOUDFLARE_ZONE_ID", "")
if cached:
return cached
result = cf_request("GET", f"/zones?name={zone_name}", token)
zones = result.get("result", [])
if not zones:
print(f"ERROR: Zone '{zone_name}' not found", file=sys.stderr)
sys.exit(1)
return zones[0]["id"]
# ── DNS Operations ───────────────────────────────────────────────────────
def list_records(zone_id: str, token: str, name_filter: str = "") -> List[dict]:
"""List DNS records in a zone."""
path = f"/zones/{zone_id}/dns_records?per_page=100"
if name_filter:
path += f"&name={name_filter}"
result = cf_request("GET", path, token)
return result.get("result", [])
def find_record(zone_id: str, token: str, name: str, record_type: str = "A") -> Optional[dict]:
"""Find a specific DNS record."""
records = list_records(zone_id, token, name)
for r in records:
if r["name"] == name and r["type"] == record_type:
return r
return None
def add_record(zone_id: str, token: str, name: str, ip: str, ttl: int = 300, proxied: bool = False) -> dict:
"""Add a new DNS A record."""
# Check if record already exists
existing = find_record(zone_id, token, name)
if existing:
print(f"Record {name} already exists (IP: {existing['content']}). Use 'update' to change.")
return existing
data = {
"type": "A",
"name": name,
"content": ip,
"ttl": ttl,
"proxied": proxied,
}
result = cf_request("POST", f"/zones/{zone_id}/dns_records", token, data)
record = result["result"]
print(f"Added: {record['name']} -> {record['content']} (ID: {record['id']})")
return record
def update_record(zone_id: str, token: str, name: str, ip: str, ttl: int = 300) -> dict:
"""Update an existing DNS A record."""
existing = find_record(zone_id, token, name)
if not existing:
print(f"Record {name} not found. Use 'add' to create it.")
sys.exit(1)
data = {
"type": "A",
"name": name,
"content": ip,
"ttl": ttl,
"proxied": existing.get("proxied", False),
}
result = cf_request("PUT", f"/zones/{zone_id}/dns_records/{existing['id']}", token, data)
record = result["result"]
print(f"Updated: {record['name']} {existing['content']} -> {record['content']}")
return record
def delete_record(zone_id: str, token: str, name: str) -> bool:
"""Delete a DNS A record."""
existing = find_record(zone_id, token, name)
if not existing:
print(f"Record {name} not found.")
return False
cf_request("DELETE", f"/zones/{zone_id}/dns_records/{existing['id']}", token)
print(f"Deleted: {name} ({existing['content']})")
return True
def sync_records(zone_id: str, token: str, config_path: str):
"""Sync DNS records from a YAML config file."""
try:
import yaml
except ImportError:
print("ERROR: PyYAML required for sync. Install: pip install pyyaml", file=sys.stderr)
sys.exit(1)
with open(config_path) as f:
config = yaml.safe_load(f)
desired = config.get("records", [])
current = {r["name"]: r for r in list_records(zone_id, token)}
added = 0
updated = 0
unchanged = 0
for rec in desired:
name = rec["name"]
ip = rec["ip"]
ttl = rec.get("ttl", 300)
if name in current:
if current[name]["content"] == ip:
unchanged += 1
else:
update_record(zone_id, token, name, ip, ttl)
updated += 1
else:
add_record(zone_id, token, name, ip, ttl)
added += 1
print(f"\nSync complete: {added} added, {updated} updated, {unchanged} unchanged")
# ── CLI ──────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Manage DNS records via Cloudflare API")
sub = parser.add_subparsers(dest="command")
# list
p_list = sub.add_parser("list", help="List DNS records")
p_list.add_argument("--zone", required=True, help="Zone name (e.g., example.com)")
p_list.add_argument("--name", default="", help="Filter by record name")
# add
p_add = sub.add_parser("add", help="Add DNS A record")
p_add.add_argument("--zone", required=True)
p_add.add_argument("--name", required=True, help="Record name (e.g., forge.example.com)")
p_add.add_argument("--ip", required=True, help="IPv4 address")
p_add.add_argument("--ttl", type=int, default=300)
# update
p_update = sub.add_parser("update", help="Update DNS A record")
p_update.add_argument("--zone", required=True)
p_update.add_argument("--name", required=True)
p_update.add_argument("--ip", required=True)
p_update.add_argument("--ttl", type=int, default=300)
# delete
p_delete = sub.add_parser("delete", help="Delete DNS A record")
p_delete.add_argument("--zone", required=True)
p_delete.add_argument("--name", required=True)
# sync
p_sync = sub.add_parser("sync", help="Sync records from YAML config")
p_sync.add_argument("--zone", required=True)
p_sync.add_argument("--config", required=True, help="Path to YAML config")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
token = get_token()
zone_id = resolve_zone_id(args.zone, token)
if args.command == "list":
records = list_records(zone_id, token, args.name)
for r in sorted(records, key=lambda x: x["name"]):
print(f" {r['type']:5s} {r['name']:40s} -> {r['content']:20s} TTL:{r['ttl']}")
print(f"\n{len(records)} records")
elif args.command == "add":
add_record(zone_id, token, args.name, args.ip, args.ttl)
elif args.command == "update":
update_record(zone_id, token, args.name, args.ip, args.ttl)
elif args.command == "delete":
delete_record(zone_id, token, args.name)
elif args.command == "sync":
sync_records(zone_id, token, args.config)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""Render the current fleet survival phase as a durable report."""
from __future__ import annotations
import argparse
import json
from copy import deepcopy
from pathlib import Path
from typing import Any
PHASE_NAME = "[PHASE-1] Survival - Keep the Lights On"
NEXT_PHASE_NAME = "[PHASE-2] Automation - Self-Healing Infrastructure"
TARGET_UPTIME_PERCENT = 95.0
TARGET_UPTIME_DAYS = 30
TARGET_CAPACITY_PERCENT = 60.0
DEFAULT_BUILDINGS = [
"VPS hosts: Ezra, Allegro, Bezalel",
"Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker",
"Gitea forge",
"Evennia worlds",
]
DEFAULT_MANUAL_CLICKS = [
"Restart agents and services by hand when a node goes dark.",
"SSH into machines to verify health, disk, and memory.",
"Check Gitea, relay, and world services manually before and after changes.",
"Act as the scheduler when automation is missing or only partially wired.",
]
REPO_SIGNAL_FILES = {
"scripts/fleet_health_probe.sh": "Automated health probe exists and can supply the uptime baseline for the next phase.",
"scripts/fleet_milestones.py": "Milestone tracker exists, so survival achievements can be narrated and logged.",
"scripts/auto_restart_agent.sh": "Auto-restart tooling already exists as phase-2 groundwork.",
"scripts/backup_pipeline.sh": "Backup pipeline scaffold exists for post-survival automation work.",
"infrastructure/timmy-bridge/reports/generate_report.py": "Bridge reporting exists and can summarize heartbeat-driven uptime.",
}
DEFAULT_SNAPSHOT = {
"fleet_operational": True,
"resources": {
"uptime_percent": 0.0,
"days_at_or_above_95_percent": 0,
"capacity_utilization_percent": 0.0,
},
"current_buildings": DEFAULT_BUILDINGS,
"manual_clicks": DEFAULT_MANUAL_CLICKS,
"notes": [
"The fleet is alive, but the human is still the control loop.",
"Phase 1 is about naming reality plainly so later automation has a baseline to beat.",
],
}
def default_snapshot() -> dict[str, Any]:
return deepcopy(DEFAULT_SNAPSHOT)
def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
result = deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
def load_snapshot(snapshot_path: Path | None = None) -> dict[str, Any]:
snapshot = default_snapshot()
if snapshot_path is None:
return snapshot
override = json.loads(snapshot_path.read_text(encoding="utf-8"))
return _deep_merge(snapshot, override)
def collect_repo_signals(repo_root: Path) -> list[str]:
signals: list[str] = []
for rel_path, description in REPO_SIGNAL_FILES.items():
if (repo_root / rel_path).exists():
signals.append(f"`{rel_path}` — {description}")
return signals
def compute_phase_status(snapshot: dict[str, Any], repo_root: Path | None = None) -> dict[str, Any]:
repo_root = repo_root or Path(__file__).resolve().parents[1]
resources = snapshot.get("resources", {})
uptime_percent = float(resources.get("uptime_percent", 0.0))
uptime_days = int(resources.get("days_at_or_above_95_percent", 0))
capacity_percent = float(resources.get("capacity_utilization_percent", 0.0))
fleet_operational = bool(snapshot.get("fleet_operational", False))
missing: list[str] = []
if not fleet_operational:
missing.append("Fleet operational flag is false.")
if uptime_percent < TARGET_UPTIME_PERCENT:
missing.append(f"Uptime {uptime_percent:.1f}% / {TARGET_UPTIME_PERCENT:.1f}%")
if uptime_days < TARGET_UPTIME_DAYS:
missing.append(f"Days at or above 95% uptime: {uptime_days}/{TARGET_UPTIME_DAYS}")
if capacity_percent <= TARGET_CAPACITY_PERCENT:
missing.append(f"Capacity utilization {capacity_percent:.1f}% / >{TARGET_CAPACITY_PERCENT:.1f}%")
return {
"title": PHASE_NAME,
"current_phase": "PHASE-1 Survival",
"fleet_operational": fleet_operational,
"resources": {
"uptime_percent": uptime_percent,
"days_at_or_above_95_percent": uptime_days,
"capacity_utilization_percent": capacity_percent,
},
"current_buildings": list(snapshot.get("current_buildings", DEFAULT_BUILDINGS)),
"manual_clicks": list(snapshot.get("manual_clicks", DEFAULT_MANUAL_CLICKS)),
"notes": list(snapshot.get("notes", [])),
"repo_signals": collect_repo_signals(repo_root),
"next_phase": NEXT_PHASE_NAME,
"next_phase_ready": fleet_operational and not missing,
"missing_requirements": missing,
}
def render_markdown(status: dict[str, Any]) -> str:
resources = status["resources"]
missing = status["missing_requirements"]
ready_line = "READY" if status["next_phase_ready"] else "NOT READY"
lines = [
f"# {status['title']}",
"",
"Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.",
"",
"## Phase Definition",
"",
"- Current state: fleet exists, agents run, everything important still depends on human vigilance.",
"- Resources tracked here: Capacity, Uptime.",
f"- Next phase: {status['next_phase']}",
"",
"## Current Buildings",
"",
]
lines.extend(f"- {item}" for item in status["current_buildings"])
lines.extend([
"",
"## Current Resource Snapshot",
"",
f"- Fleet operational: {'yes' if status['fleet_operational'] else 'no'}",
f"- Uptime baseline: {resources['uptime_percent']:.1f}%",
f"- Days at or above 95% uptime: {resources['days_at_or_above_95_percent']}",
f"- Capacity utilization: {resources['capacity_utilization_percent']:.1f}%",
"",
"## Next Phase Trigger",
"",
f"To unlock {status['next_phase']}, the fleet must hold both of these conditions at once:",
f"- Uptime >= {TARGET_UPTIME_PERCENT:.0f}% for {TARGET_UPTIME_DAYS} consecutive days",
f"- Capacity utilization > {TARGET_CAPACITY_PERCENT:.0f}%",
f"- Current trigger state: {ready_line}",
"",
"## Missing Requirements",
"",
])
if missing:
lines.extend(f"- {item}" for item in missing)
else:
lines.append("- None. Phase 2 can unlock now.")
lines.extend([
"",
"## Manual Clicker Interpretation",
"",
"Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.",
"Every restart, every SSH, every check is a manual click.",
"",
"## Manual Clicks Still Required",
"",
])
lines.extend(f"- {item}" for item in status["manual_clicks"])
lines.extend([
"",
"## Repo Signals Already Present",
"",
])
if status["repo_signals"]:
lines.extend(f"- {item}" for item in status["repo_signals"])
else:
lines.append("- No survival-adjacent repo signals detected.")
if status["notes"]:
lines.extend(["", "## Notes", ""])
lines.extend(f"- {item}" for item in status["notes"])
return "\n".join(lines).rstrip() + "\n"
def main() -> None:
parser = argparse.ArgumentParser(description="Render the fleet phase-1 survival report")
parser.add_argument("--snapshot", help="Optional JSON snapshot overriding the default phase-1 baseline")
parser.add_argument("--output", help="Write markdown report to this path")
parser.add_argument("--json", action="store_true", help="Print computed status as JSON instead of markdown")
args = parser.parse_args()
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
repo_root = Path(__file__).resolve().parents[1]
status = compute_phase_status(snapshot, repo_root=repo_root)
if args.json:
rendered = json.dumps(status, indent=2)
else:
rendered = render_markdown(status)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"Phase status written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -1,127 +0,0 @@
#!/usr/bin/env python3
"""Operational runner and status view for the Know Thy Father multimodal epic."""
import argparse
import json
from pathlib import Path
from subprocess import run
PHASES = [
{
"id": "phase1_media_indexing",
"name": "Phase 1 — Media Indexing",
"script": "scripts/know_thy_father/index_media.py",
"command_template": "python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl",
"outputs": ["twitter-archive/know-thy-father/media_manifest.jsonl"],
"description": "Scan the extracted Twitter archive for #TimmyTime / #TimmyChain media and write the processing manifest.",
},
{
"id": "phase2_multimodal_analysis",
"name": "Phase 2 — Multimodal Analysis",
"script": "scripts/twitter_archive/analyze_media.py",
"command_template": "python3 scripts/twitter_archive/analyze_media.py --batch {batch_size}",
"outputs": [
"twitter-archive/know-thy-father/analysis.jsonl",
"twitter-archive/know-thy-father/meaning-kernels.jsonl",
"twitter-archive/know-thy-father/pipeline-status.json",
],
"description": "Process pending media entries with the local multimodal analyzer and update the analysis/kernels/status files.",
},
{
"id": "phase3_holographic_synthesis",
"name": "Phase 3 — Holographic Synthesis",
"script": "scripts/know_thy_father/synthesize_kernels.py",
"command_template": "python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json",
"outputs": [
"twitter-archive/knowledge/fathers_ledger.jsonl",
"twitter-archive/knowledge/fathers_ledger.summary.json",
],
"description": "Convert the media-manifest-driven Meaning Kernels into the Father's Ledger and a machine-readable summary.",
},
{
"id": "phase4_cross_reference_audit",
"name": "Phase 4 — Cross-Reference Audit",
"script": "scripts/know_thy_father/crossref_audit.py",
"command_template": "python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md",
"outputs": ["twitter-archive/notes/crossref_report.md"],
"description": "Compare Know Thy Father kernels against SOUL.md and related canon, then emit a Markdown audit report.",
},
{
"id": "phase5_processing_log",
"name": "Phase 5 — Processing Log / Status",
"script": "twitter-archive/know-thy-father/tracker.py",
"command_template": "python3 twitter-archive/know-thy-father/tracker.py report",
"outputs": ["twitter-archive/know-thy-father/REPORT.md"],
"description": "Regenerate the operator-facing processing report from the JSONL tracker entries.",
},
]
def build_pipeline_plan(batch_size: int = 10):
plan = []
for phase in PHASES:
plan.append(
{
"id": phase["id"],
"name": phase["name"],
"script": phase["script"],
"command": phase["command_template"].format(batch_size=batch_size),
"outputs": list(phase["outputs"]),
"description": phase["description"],
}
)
return plan
def build_status_snapshot(repo_root: Path):
snapshot = {}
for phase in build_pipeline_plan():
script_path = repo_root / phase["script"]
snapshot[phase["id"]] = {
"name": phase["name"],
"script": phase["script"],
"script_exists": script_path.exists(),
"outputs": [
{
"path": output,
"exists": (repo_root / output).exists(),
}
for output in phase["outputs"]
],
}
return snapshot
def run_step(repo_root: Path, step_id: str, batch_size: int = 10):
plan = {step["id"]: step for step in build_pipeline_plan(batch_size=batch_size)}
if step_id not in plan:
raise SystemExit(f"Unknown step: {step_id}")
step = plan[step_id]
return run(step["command"], cwd=repo_root, shell=True, check=False)
def main():
parser = argparse.ArgumentParser(description="Know Thy Father epic orchestration helper")
parser.add_argument("--batch-size", type=int, default=10)
parser.add_argument("--status", action="store_true")
parser.add_argument("--run-step", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
repo_root = Path(__file__).resolve().parents[2]
if args.run_step:
result = run_step(repo_root, args.run_step, batch_size=args.batch_size)
raise SystemExit(result.returncode)
payload = build_status_snapshot(repo_root) if args.status else build_pipeline_plan(batch_size=args.batch_size)
if args.json or args.status:
print(json.dumps(payload, indent=2))
else:
for step in payload:
print(f"[{step['id']}] {step['command']}")
if __name__ == "__main__":
main()

View File

@@ -1,159 +0,0 @@
#!/usr/bin/env python3
"""Prepare a MemPalace v3.0.0 integration packet for Ezra's Hermes home."""
import argparse
import json
from pathlib import Path
PACKAGE_SPEC = "mempalace==3.0.0"
DEFAULT_HERMES_HOME = "~/.hermes/"
DEFAULT_SESSIONS_DIR = "~/.hermes/sessions/"
DEFAULT_PALACE_PATH = "~/.mempalace/palace"
DEFAULT_WING = "ezra_home"
def build_yaml_template(wing: str, palace_path: str) -> str:
return (
f"wing: {wing}\n"
f"palace: {palace_path}\n"
"rooms:\n"
" - name: sessions\n"
" description: Conversation history and durable agent transcripts\n"
" globs:\n"
" - \"*.json\"\n"
" - \"*.jsonl\"\n"
" - name: config\n"
" description: Hermes configuration and runtime settings\n"
" globs:\n"
" - \"*.yaml\"\n"
" - \"*.yml\"\n"
" - \"*.toml\"\n"
" - name: docs\n"
" description: Notes, markdown docs, and operating reports\n"
" globs:\n"
" - \"*.md\"\n"
" - \"*.txt\"\n"
"people: []\n"
"projects: []\n"
)
def build_plan(overrides: dict | None = None) -> dict:
overrides = overrides or {}
hermes_home = overrides.get("hermes_home", DEFAULT_HERMES_HOME)
sessions_dir = overrides.get("sessions_dir", DEFAULT_SESSIONS_DIR)
palace_path = overrides.get("palace_path", DEFAULT_PALACE_PATH)
wing = overrides.get("wing", DEFAULT_WING)
yaml_template = build_yaml_template(wing=wing, palace_path=palace_path)
config_home = hermes_home[:-1] if hermes_home.endswith("/") else hermes_home
plan = {
"package_spec": PACKAGE_SPEC,
"hermes_home": hermes_home,
"sessions_dir": sessions_dir,
"palace_path": palace_path,
"wing": wing,
"config_path": f"{config_home}/mempalace.yaml",
"install_command": f"pip install {PACKAGE_SPEC}",
"init_command": f"mempalace init {hermes_home} --yes",
"mine_home_command": f"echo \"\" | mempalace mine {hermes_home}",
"mine_sessions_command": f"echo \"\" | mempalace mine {sessions_dir} --mode convos",
"search_command": 'mempalace search "your common queries"',
"wake_up_command": "mempalace wake-up",
"mcp_command": "hermes mcp add mempalace -- python -m mempalace.mcp_server",
"yaml_template": yaml_template,
"gotchas": [
"`mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.",
"The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.",
"Pipe empty stdin into mining commands (`echo \"\" | ...`) to avoid the entity-detector stdin hang on larger directories.",
"First mine downloads the ChromaDB embedding model cache (~79MB).",
"Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.",
],
}
return plan
def render_markdown(plan: dict) -> str:
gotchas = "\n".join(f"- {item}" for item in plan["gotchas"])
return f"""# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
{plan['install_command']}
{plan['init_command']}
cat > {plan['config_path']} <<'YAML'
{plan['yaml_template'].rstrip()}
YAML
{plan['mine_home_command']}
{plan['mine_sessions_command']}
{plan['search_command']}
{plan['wake_up_command']}
{plan['mcp_command']}
```
## Manual config template
```yaml
{plan['yaml_template'].rstrip()}
```
## Why this shape
- `wing: {plan['wing']}` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
{gotchas}
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.
"""
def main() -> None:
parser = argparse.ArgumentParser(description="Prepare the MemPalace Ezra integration packet")
parser.add_argument("--hermes-home", default=DEFAULT_HERMES_HOME)
parser.add_argument("--sessions-dir", default=DEFAULT_SESSIONS_DIR)
parser.add_argument("--palace-path", default=DEFAULT_PALACE_PATH)
parser.add_argument("--wing", default=DEFAULT_WING)
parser.add_argument("--output", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
plan = build_plan(
{
"hermes_home": args.hermes_home,
"sessions_dir": args.sessions_dir,
"palace_path": args.palace_path,
"wing": args.wing,
}
)
rendered = json.dumps(plan, indent=2) if args.json else render_markdown(plan)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"MemPalace integration packet written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
DAYLIGHT_START = "10:00"
DAYLIGHT_END = "16:00"
def load_manifest(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("machines", [])
return data
def validate_manifest(data: dict[str, Any]) -> None:
machines = data.get("machines", [])
if not machines:
raise ValueError("manifest must contain at least one machine")
seen: set[str] = set()
for machine in machines:
hostname = machine.get("hostname", "").strip()
if not hostname:
raise ValueError("each machine must declare a hostname")
if hostname in seen:
raise ValueError(f"duplicate hostname: {hostname} (unique hostnames are required)")
seen.add(hostname)
for field in ("machine_type", "ram_gb", "cpu_cores", "os", "adapter_condition"):
if field not in machine:
raise ValueError(f"machine {hostname} missing required field: {field}")
def _laptops(machines: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [m for m in machines if m.get("machine_type") == "laptop"]
def _desktop(machines: list[dict[str, Any]]) -> dict[str, Any] | None:
for machine in machines:
if machine.get("machine_type") == "desktop":
return machine
return None
def choose_anchor_agents(machines: list[dict[str, Any]], count: int = 2) -> list[dict[str, Any]]:
eligible = [
m for m in _laptops(machines)
if m.get("adapter_condition") in {"good", "ok"} and m.get("always_on_capable", True)
]
eligible.sort(key=lambda m: (m.get("idle_watts", 9999), -m.get("ram_gb", 0), -m.get("cpu_cores", 0), m["hostname"]))
return eligible[:count]
def assign_roles(machines: list[dict[str, Any]]) -> dict[str, Any]:
anchors = choose_anchor_agents(machines, count=2)
anchor_names = {m["hostname"] for m in anchors}
desktop = _desktop(machines)
mapping: dict[str, dict[str, Any]] = {}
for machine in machines:
hostname = machine["hostname"]
if desktop and hostname == desktop["hostname"]:
mapping[hostname] = {
"role": "desktop_nas",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "daylight_only",
}
elif hostname in anchor_names:
mapping[hostname] = {
"role": "anchor_agent",
"schedule": "24/7",
"duty_cycle": "continuous",
}
else:
mapping[hostname] = {
"role": "daylight_agent",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "peak_solar",
}
return {
"anchor_agents": [m["hostname"] for m in anchors],
"desktop_nas": desktop["hostname"] if desktop else None,
"role_mapping": mapping,
}
def build_plan(data: dict[str, Any]) -> dict[str, Any]:
validate_manifest(data)
machines = data["machines"]
role_plan = assign_roles(machines)
return {
"fleet_name": data.get("fleet_name", "timmy-laptop-fleet"),
"machine_count": len(machines),
"anchor_agents": role_plan["anchor_agents"],
"desktop_nas": role_plan["desktop_nas"],
"daylight_window": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"role_mapping": role_plan["role_mapping"],
}
def render_markdown(plan: dict[str, Any], data: dict[str, Any]) -> str:
lines = [
"# Laptop Fleet Deployment Plan",
"",
f"Fleet: {plan['fleet_name']}",
f"Machine count: {plan['machine_count']}",
f"24/7 anchor agents: {', '.join(plan['anchor_agents']) if plan['anchor_agents'] else 'TBD'}",
f"Desktop/NAS: {plan['desktop_nas'] or 'TBD'}",
f"Daylight schedule: {plan['daylight_window']}",
"",
"## Role mapping",
"",
"| Hostname | Role | Schedule | Duty cycle |",
"|---|---|---|---|",
]
for hostname, role in sorted(plan["role_mapping"].items()):
lines.append(f"| {hostname} | {role['role']} | {role['schedule']} | {role['duty_cycle']} |")
lines.extend([
"",
"## Machine inventory",
"",
"| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |",
"|---|---|---:|---:|---|---|---:|---|",
])
for machine in data["machines"]:
lines.append(
f"| {machine['hostname']} | {machine['machine_type']} | {machine['ram_gb']} | {machine['cpu_cores']} | {machine['os']} | {machine['adapter_condition']} | {machine.get('idle_watts', 'n/a')} | {machine.get('notes', '')} |"
)
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Plan LAB-005 laptop fleet deployment.")
parser.add_argument("manifest", help="Path to laptop fleet manifest YAML")
parser.add_argument("--markdown", action="store_true", help="Render a markdown deployment plan instead of JSON")
args = parser.parse_args()
data = load_manifest(args.manifest)
plan = build_plan(data)
if args.markdown:
print(render_markdown(plan, data))
else:
print(json.dumps(plan, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""NH Broadband install packet builder for the live scheduling step."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
def load_request(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("contact", {})
data.setdefault("service", {})
data.setdefault("call_log", [])
data.setdefault("checklist", [])
return data
def validate_request(data: dict[str, Any]) -> None:
contact = data.get("contact", {})
for field in ("name", "phone"):
if not contact.get(field, "").strip():
raise ValueError(f"contact.{field} is required")
service = data.get("service", {})
for field in ("address", "city", "state"):
if not service.get(field, "").strip():
raise ValueError(f"service.{field} is required")
if not data.get("checklist"):
raise ValueError("checklist must contain at least one item")
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
validate_request(data)
contact = data["contact"]
service = data["service"]
return {
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
"generated_utc": datetime.now(timezone.utc).isoformat(),
"contact": {
"name": contact["name"],
"phone": contact["phone"],
"email": contact.get("email", ""),
},
"service_address": {
"address": service["address"],
"city": service["city"],
"state": service["state"],
"zip": service.get("zip", ""),
},
"desired_plan": data.get("desired_plan", "residential-fiber"),
"call_log": data.get("call_log", []),
"checklist": [
{"item": item, "done": False} if isinstance(item, str) else item
for item in data["checklist"]
],
"status": "pending_scheduling_call",
}
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
contact = packet["contact"]
addr = packet["service_address"]
lines = [
f"# NH Broadband Install Packet",
"",
f"**Packet ID:** {packet['packet_id']}",
f"**Generated:** {packet['generated_utc']}",
f"**Status:** {packet['status']}",
"",
"## Contact",
"",
f"- **Name:** {contact['name']}",
f"- **Phone:** {contact['phone']}",
f"- **Email:** {contact.get('email', 'n/a')}",
"",
"## Service Address",
"",
f"- {addr['address']}",
f"- {addr['city']}, {addr['state']} {addr['zip']}",
"",
f"## Desired Plan",
"",
f"{packet['desired_plan']}",
"",
"## Call Log",
"",
]
if packet["call_log"]:
for entry in packet["call_log"]:
ts = entry.get("timestamp", "n/a")
outcome = entry.get("outcome", "n/a")
notes = entry.get("notes", "")
lines.append(f"- **{ts}** — {outcome}")
if notes:
lines.append(f" - {notes}")
else:
lines.append("_No calls logged yet._")
lines.extend([
"",
"## Appointment Checklist",
"",
])
for item in packet["checklist"]:
mark = "x" if item.get("done") else " "
lines.append(f"- [{mark}] {item['item']}")
lines.append("")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
parser.add_argument("request", help="Path to install request YAML")
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
args = parser.parse_args()
data = load_request(args.request)
packet = build_packet(data)
if args.markdown:
print(render_markdown(packet, data))
else:
print(json.dumps(packet, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,67 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_phase_status.py"
DOC_PATH = ROOT / "docs" / "FLEET_PHASE_1_SURVIVAL.md"
def _load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_compute_phase_status_tracks_survival_gate_requirements() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(
{
"fleet_operational": True,
"resources": {
"uptime_percent": 94.5,
"days_at_or_above_95_percent": 12,
"capacity_utilization_percent": 45.0,
},
}
)
assert status["current_phase"] == "PHASE-1 Survival"
assert status["next_phase_ready"] is False
assert any("94.5% / 95.0%" in item for item in status["missing_requirements"])
assert any("12/30" in item for item in status["missing_requirements"])
assert any("45.0% / >60.0%" in item for item in status["missing_requirements"])
def test_render_markdown_preserves_phase_buildings_and_manual_clicker_language() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(mod.default_snapshot())
report = mod.render_markdown(status)
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"VPS hosts: Ezra, Allegro, Bezalel",
"Timmy harness",
"Gitea forge",
"Evennia worlds",
"Every restart, every SSH, every check is a manual click.",
):
assert snippet in report
def test_repo_contains_generated_phase_1_doc() -> None:
assert DOC_PATH.exists(), "missing committed phase-1 survival doc"
text = DOC_PATH.read_text(encoding="utf-8")
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"## Current Buildings",
"## Next Phase Trigger",
"## Manual Clicker Interpretation",
):
assert snippet in text

View File

@@ -1,76 +0,0 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "know_thy_father" / "epic_pipeline.py"
DOC_PATH = ROOT / "docs" / "KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestKnowThyFatherEpicPipeline(unittest.TestCase):
def test_build_pipeline_plan_contains_all_phases_in_order(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
plan = mod.build_pipeline_plan(batch_size=10)
self.assertEqual(
[step["id"] for step in plan],
[
"phase1_media_indexing",
"phase2_multimodal_analysis",
"phase3_holographic_synthesis",
"phase4_cross_reference_audit",
"phase5_processing_log",
],
)
self.assertIn("scripts/know_thy_father/index_media.py", plan[0]["command"])
self.assertIn("scripts/twitter_archive/analyze_media.py --batch 10", plan[1]["command"])
self.assertIn("scripts/know_thy_father/synthesize_kernels.py", plan[2]["command"])
self.assertIn("scripts/know_thy_father/crossref_audit.py", plan[3]["command"])
self.assertIn("twitter-archive/know-thy-father/tracker.py report", plan[4]["command"])
def test_status_snapshot_reports_key_artifact_paths(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
status = mod.build_status_snapshot(ROOT)
self.assertIn("phase1_media_indexing", status)
self.assertIn("phase2_multimodal_analysis", status)
self.assertIn("phase3_holographic_synthesis", status)
self.assertIn("phase4_cross_reference_audit", status)
self.assertIn("phase5_processing_log", status)
self.assertEqual(status["phase1_media_indexing"]["script"], "scripts/know_thy_father/index_media.py")
self.assertEqual(status["phase2_multimodal_analysis"]["script"], "scripts/twitter_archive/analyze_media.py")
self.assertEqual(status["phase5_processing_log"]["script"], "twitter-archive/know-thy-father/tracker.py")
self.assertTrue(status["phase1_media_indexing"]["script_exists"])
self.assertTrue(status["phase2_multimodal_analysis"]["script_exists"])
self.assertTrue(status["phase3_holographic_synthesis"]["script_exists"])
self.assertTrue(status["phase4_cross_reference_audit"]["script_exists"])
self.assertTrue(status["phase5_processing_log"]["script_exists"])
def test_repo_contains_multimodal_pipeline_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed Know Thy Father pipeline doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Know Thy Father — Multimodal Media Consumption Pipeline",
"scripts/know_thy_father/index_media.py",
"scripts/twitter_archive/analyze_media.py --batch 10",
"scripts/know_thy_father/synthesize_kernels.py",
"scripts/know_thy_father/crossref_audit.py",
"twitter-archive/know-thy-father/tracker.py report",
"Refs #582",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,52 +0,0 @@
from pathlib import Path
import yaml
from scripts.plan_laptop_fleet import build_plan, load_manifest, render_markdown, validate_manifest
def test_laptop_fleet_planner_script_exists() -> None:
assert Path("scripts/plan_laptop_fleet.py").exists()
def test_laptop_fleet_manifest_template_exists() -> None:
assert Path("docs/laptop-fleet-manifest.example.yaml").exists()
def test_build_plan_selects_two_lowest_idle_watt_laptops_as_anchors() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
assert plan["anchor_agents"] == ["timmy-anchor-a", "timmy-anchor-b"]
assert plan["desktop_nas"] == "timmy-desktop-nas"
assert plan["role_mapping"]["timmy-daylight-a"]["schedule"] == "10:00-16:00"
def test_validate_manifest_requires_unique_hostnames() -> None:
data = {
"machines": [
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 8, "cpu_cores": 4, "os": "Linux", "adapter_condition": "good"},
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 16, "cpu_cores": 8, "os": "Linux", "adapter_condition": "good"},
]
}
try:
validate_manifest(data)
except ValueError as exc:
assert "duplicate hostname" in str(exc)
assert "unique hostnames" in str(exc)
else:
raise AssertionError("validate_manifest should reject duplicate hostname")
def test_markdown_contains_anchor_agents_and_daylight_schedule() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
content = render_markdown(plan, data)
assert "24/7 anchor agents: timmy-anchor-a, timmy-anchor-b" in content
assert "Daylight schedule: 10:00-16:00" in content
assert "desktop_nas" in content
def test_manifest_template_is_valid_yaml() -> None:
data = yaml.safe_load(Path("docs/laptop-fleet-manifest.example.yaml").read_text())
assert data["fleet_name"] == "timmy-laptop-fleet"
assert len(data["machines"]) == 6

View File

@@ -1,68 +0,0 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "mempalace_ezra_integration.py"
DOC_PATH = ROOT / "docs" / "MEMPALACE_EZRA_INTEGRATION.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestMempalaceEzraIntegration(unittest.TestCase):
def test_build_plan_contains_issue_required_steps_and_gotchas(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan({})
self.assertEqual(plan["package_spec"], "mempalace==3.0.0")
self.assertIn("pip install mempalace==3.0.0", plan["install_command"])
self.assertEqual(plan["wing"], "ezra_home")
self.assertIn('echo "" | mempalace mine ~/.hermes/', plan["mine_home_command"])
self.assertIn('--mode convos', plan["mine_sessions_command"])
self.assertIn('mempalace wake-up', plan["wake_up_command"])
self.assertIn('hermes mcp add mempalace -- python -m mempalace.mcp_server', plan["mcp_command"])
self.assertIn('wing:', plan["yaml_template"])
self.assertTrue(any('stdin' in item.lower() for item in plan["gotchas"]))
self.assertTrue(any('wing:' in item for item in plan["gotchas"]))
def test_build_plan_accepts_path_and_wing_overrides(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan(
{
"hermes_home": "/root/wizards/ezra/home",
"sessions_dir": "/root/wizards/ezra/home/sessions",
"wing": "ezra_archive",
}
)
self.assertEqual(plan["wing"], "ezra_archive")
self.assertIn('/root/wizards/ezra/home', plan["mine_home_command"])
self.assertIn('/root/wizards/ezra/home/sessions', plan["mine_sessions_command"])
self.assertIn('wing: ezra_archive', plan["yaml_template"])
def test_repo_contains_mem_palace_ezra_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed MemPalace Ezra integration doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# MemPalace v3.0.0 — Ezra Integration Packet",
"pip install mempalace==3.0.0",
'echo "" | mempalace mine ~/.hermes/',
"mempalace mine ~/.hermes/sessions/ --mode convos",
"mempalace wake-up",
"hermes mcp add mempalace -- python -m mempalace.mcp_server",
"Report back to #568",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,105 +0,0 @@
from pathlib import Path
import yaml
from scripts.plan_nh_broadband_install import (
build_packet,
load_request,
render_markdown,
validate_request,
)
def test_script_exists() -> None:
assert Path("scripts/plan_nh_broadband_install.py").exists()
def test_example_request_exists() -> None:
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
def test_example_packet_exists() -> None:
assert Path("docs/nh-broadband-install-packet.example.md").exists()
def test_research_memo_exists() -> None:
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
def test_load_and_build_packet() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
assert packet["contact"]["name"] == "Timmy Operator"
assert packet["service_address"]["city"] == "Concord"
assert packet["service_address"]["state"] == "NH"
assert packet["status"] == "pending_scheduling_call"
assert len(packet["checklist"]) == 8
assert packet["checklist"][0]["done"] is False
def test_validate_rejects_missing_contact_name() -> None:
data = {
"contact": {"name": "", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "contact.name" in str(exc)
else:
raise AssertionError("should reject empty contact name")
def test_validate_rejects_missing_service_address() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "service.address" in str(exc)
else:
raise AssertionError("should reject empty service address")
def test_validate_rejects_empty_checklist() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": [],
}
try:
validate_request(data)
except ValueError as exc:
assert "checklist" in str(exc)
else:
raise AssertionError("should reject empty checklist")
def test_render_markdown_contains_key_sections() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "# NH Broadband Install Packet" in md
assert "## Contact" in md
assert "## Service Address" in md
assert "## Call Log" in md
assert "## Appointment Checklist" in md
assert "Concord" in md
assert "NH" in md
def test_render_markdown_shows_checklist_items() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "- [ ] Confirm exact-address availability" in md
def test_example_yaml_is_valid() -> None:
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
assert data["contact"]["name"] == "Timmy Operator"
assert len(data["checklist"]) == 8

View File

@@ -17,24 +17,8 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import importlib.util
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution."""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
from harness import UniWizardHarness, House, ExecutionResult
class TaskType(Enum):

View File

@@ -8,30 +8,13 @@ import time
import sys
import argparse
import os
import importlib.util
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution.
Prevents namespace collisions when multiple directories contain modules
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
"""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
sys.path.insert(0, str(Path(__file__).parent))
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
from author_whitelist import AuthorWhitelist