Compare commits

...

14 Commits

Author SHA1 Message Date
2c31ae8972 docs: predictive resource allocation — operator guide (#749)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
Closes #749
2026-04-16 01:51:19 +00:00
a76e83439c test: predictive resource allocator (#749)
25 tests covering:
- Timestamp parsing, rate computation, caller analysis
- Heartbeat risk detection, demand prediction
- Posture determination, full forecast pipeline
- Markdown output

Closes #749
2026-04-16 01:49:51 +00:00
a14a233626 feat: predictive resource allocation — forecast fleet demand (#749)
Analyzes historical metrics and heartbeat logs to predict
workload surges and recommend pre-provisioning actions.

Closes #749
2026-04-16 01:47:48 +00:00
601c5fe267 Merge pull request 'research: Long Context vs RAG Decision Framework (backlog item #4.3)' (#750) from research/long-context-vs-rag into main 2026-04-16 01:39:55 +00:00
6222b18a38 research: Long Context vs RAG Decision Framework (backlog item #4.3)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
Highest-ratio research item (Impact:4, Effort:1, Ratio:4.0).
Covers decision matrix for stuffing vs RAG, our stack constraints,
context budgeting, progressive loading, and smart compression.
2026-04-15 16:38:07 +00:00
10fd467b28 Merge pull request 'fix: resolve v2 harness import collision with explicit path loading (#716)' (#748) from burn/716-1776264183 into main 2026-04-15 16:04:04 +00:00
ba2d365669 fix: resolve v2 harness import collision with explicit path loading (closes #716)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
2026-04-15 11:46:37 -04:00
5a696c184e Merge pull request 'feat: add NH Broadband install packet scaffold (closes #740)' (#741) from sprint/issue-740 into main 2026-04-15 11:57:34 +00:00
Alexander Whitestone
90d8daedcf feat: add NH Broadband install packet scaffold (closes #740)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-15 07:33:01 -04:00
3016e012cc Merge PR #739: feat: add laptop fleet planner scaffold (#530) 2026-04-15 06:17:19 +00:00
60b9b90f34 Merge PR #738: feat: add Know Thy Father epic orchestrator 2026-04-15 06:12:05 +00:00
Alexander Whitestone
c818a30522 feat: add laptop fleet planner scaffold (#530)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 30s
2026-04-15 02:11:31 -04:00
Alexander Whitestone
89dfa1e5de feat: add Know Thy Father epic orchestrator (#582)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 23s
2026-04-15 01:52:58 -04:00
Alexander Whitestone
d791c087cb feat: add Ezra mempalace integration packet (#570)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 22s
2026-04-15 01:37:47 -04:00
21 changed files with 2092 additions and 3 deletions

View File

@@ -0,0 +1,61 @@
# Know Thy Father — Multimodal Media Consumption Pipeline
Refs #582
This document makes the epic operational by naming the current source-of-truth scripts, their handoff artifacts, and the one-command runner that coordinates them.
## Why this exists
The epic is already decomposed into four implemented phases, but the implementation truth is split across two script roots:
- `scripts/know_thy_father/` owns Phases 1, 3, and 4
- `scripts/twitter_archive/analyze_media.py` owns Phase 2
- `twitter-archive/know-thy-father/tracker.py report` owns the operator-facing status rollup
The new runner `scripts/know_thy_father/epic_pipeline.py` does not replace those scripts. It stitches them together into one explicit, reviewable plan.
## Phase map
| Phase | Script | Primary output |
|-------|--------|----------------|
| 1. Media Indexing | `scripts/know_thy_father/index_media.py` | `twitter-archive/know-thy-father/media_manifest.jsonl` |
| 2. Multimodal Analysis | `scripts/twitter_archive/analyze_media.py --batch 10` | `twitter-archive/know-thy-father/analysis.jsonl` + `meaning-kernels.jsonl` + `pipeline-status.json` |
| 3. Holographic Synthesis | `scripts/know_thy_father/synthesize_kernels.py` | `twitter-archive/knowledge/fathers_ledger.jsonl` |
| 4. Cross-Reference Audit | `scripts/know_thy_father/crossref_audit.py` | `twitter-archive/notes/crossref_report.md` |
| 5. Processing Log | `twitter-archive/know-thy-father/tracker.py report` | `twitter-archive/know-thy-father/REPORT.md` |
## One command per phase
```bash
python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl
python3 scripts/twitter_archive/analyze_media.py --batch 10
python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json
python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md
python3 twitter-archive/know-thy-father/tracker.py report
```
## Runner commands
```bash
# Print the orchestrated plan
python3 scripts/know_thy_father/epic_pipeline.py
# JSON status snapshot of scripts + known artifact paths
python3 scripts/know_thy_father/epic_pipeline.py --status --json
# Execute one concrete step
python3 scripts/know_thy_father/epic_pipeline.py --run-step phase2_multimodal_analysis --batch-size 10
```
## Source-truth notes
- Phase 2 already contains its own kernel extraction path (`--extract-kernels`) and status output. The epic runner does not reimplement that logic.
- Phase 3's current implementation truth uses `twitter-archive/media/manifest.jsonl` as its default input. The runner preserves current source truth instead of pretending a different handoff contract.
- The processing log in `twitter-archive/know-thy-father/PROCESSING_LOG.md` can drift from current code reality. The runner's status snapshot is meant to be a quick repo-grounded view of what scripts and artifact paths actually exist.
## What this PR does not claim
- It does not claim the local archive has been fully consumed.
- It does not claim the halted processing log has been resumed.
- It does not claim fact_store ingestion has been fully wired end-to-end.
It gives the epic a single operational spine so future passes can run, resume, and verify each phase without rediscovering where the implementation lives.

View File

@@ -0,0 +1,92 @@
# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
pip install mempalace==3.0.0
mempalace init ~/.hermes/ --yes
cat > ~/.hermes/mempalace.yaml <<'YAML'
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
YAML
echo "" | mempalace mine ~/.hermes/
echo "" | mempalace mine ~/.hermes/sessions/ --mode convos
mempalace search "your common queries"
mempalace wake-up
hermes mcp add mempalace -- python -m mempalace.mcp_server
```
## Manual config template
```yaml
wing: ezra_home
palace: ~/.mempalace/palace
rooms:
- name: sessions
description: Conversation history and durable agent transcripts
globs:
- "*.json"
- "*.jsonl"
- name: config
description: Hermes configuration and runtime settings
globs:
- "*.yaml"
- "*.yml"
- "*.toml"
- name: docs
description: Notes, markdown docs, and operating reports
globs:
- "*.md"
- "*.txt"
people: []
projects: []
```
## Why this shape
- `wing: ezra_home` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
- `mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.
- The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.
- Pipe empty stdin into mining commands (`echo "" | ...`) to avoid the entity-detector stdin hang on larger directories.
- First mine downloads the ChromaDB embedding model cache (~79MB).
- Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.

View File

@@ -0,0 +1,87 @@
# Predictive Resource Allocation
Forecasts near-term fleet demand from historical telemetry so the operator can
pre-provision resources before a surge hits.
## How It Works
The predictor reads two data sources:
1. **Metric logs** (`metrics/local_*.jsonl`) — request cadence, token volume,
caller mix, success/failure rates
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
local inference health
It compares a **recent window** (last N hours) against a **baseline window**
(previous N hours) to detect surges and degradation.
## Output Contract
```json
{
"resource_mode": "steady|surge",
"dispatch_posture": "normal|degraded",
"horizon_hours": 6,
"recent_request_rate": 12.5,
"baseline_request_rate": 8.0,
"predicted_request_rate": 15.0,
"surge_factor": 1.56,
"demand_level": "elevated|normal|low|critical",
"gitea_outages": 0,
"inference_failures": 2,
"top_callers": [...],
"recommended_actions": ["..."]
}
```
### Demand Levels
| Surge Factor | Level | Meaning |
|-------------|-------|---------|
| > 3.0 | critical | Extreme surge, immediate action needed |
| > 1.5 | elevated | Notable increase, pre-warm recommended |
| > 1.0 | normal | Slight increase, monitor |
| <= 1.0 | low | Flat or declining |
### Posture Signals
| Signal | Effect |
|--------|--------|
| Surge factor > 1.5 | `resource_mode: surge` + pre-warm recommendation |
| Gitea outages >= 1 | `dispatch_posture: degraded` + cache recommendation |
| Inference failures >= 2 | `resource_mode: surge` + reliability investigation |
| Heavy batch callers | Throttle recommendation |
| High caller failure rates | Investigation recommendation |
## Usage
```bash
# Markdown report
python3 scripts/predictive_resource_allocator.py
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Custom paths and horizon
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/local_20260329.jsonl \
--heartbeat heartbeat/ticks_20260329.jsonl \
--horizon 12
```
## Tests
```bash
python3 -m pytest tests/test_predictive_resource_allocator.py -v
```
## Recommended Actions
The predictor generates contextual recommendations:
- **Pre-warm local inference** — surge detected, warm up before next window
- **Throttle background jobs** — heavy batch work consuming capacity
- **Investigate failure rates** — specific callers failing at high rates
- **Investigate model reliability** — inference health degraded
- **Cache forge state** — Gitea availability issues
- **Maintain current allocation** — no issues detected

View File

@@ -0,0 +1,62 @@
fleet_name: timmy-laptop-fleet
machines:
- hostname: timmy-anchor-a
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: macOS
adapter_condition: good
idle_watts: 11
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-anchor-b
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Linux
adapter_condition: good
idle_watts: 13
always_on_capable: true
notes: candidate 24/7 anchor agent
- hostname: timmy-daylight-a
machine_type: laptop
ram_gb: 32
cpu_cores: 10
os: macOS
adapter_condition: ok
idle_watts: 22
always_on_capable: true
notes: higher-performance daylight compute
- hostname: timmy-daylight-b
machine_type: laptop
ram_gb: 16
cpu_cores: 8
os: Linux
adapter_condition: ok
idle_watts: 19
always_on_capable: true
notes: daylight compute node
- hostname: timmy-daylight-c
machine_type: laptop
ram_gb: 8
cpu_cores: 4
os: Windows
adapter_condition: needs_replacement
idle_watts: 17
always_on_capable: false
notes: repair power adapter before production duty
- hostname: timmy-desktop-nas
machine_type: desktop
ram_gb: 64
cpu_cores: 12
os: Linux
adapter_condition: good
idle_watts: 58
always_on_capable: false
has_4tb_ssd: true
notes: desktop plus 4TB SSD NAS and heavy compute during peak sun

View File

@@ -0,0 +1,30 @@
# Laptop Fleet Deployment Plan
Fleet: timmy-laptop-fleet
Machine count: 6
24/7 anchor agents: timmy-anchor-a, timmy-anchor-b
Desktop/NAS: timmy-desktop-nas
Daylight schedule: 10:00-16:00
## Role mapping
| Hostname | Role | Schedule | Duty cycle |
|---|---|---|---|
| timmy-anchor-a | anchor_agent | 24/7 | continuous |
| timmy-anchor-b | anchor_agent | 24/7 | continuous |
| timmy-daylight-a | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-b | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-daylight-c | daylight_agent | 10:00-16:00 | peak_solar |
| timmy-desktop-nas | desktop_nas | 10:00-16:00 | daylight_only |
## Machine inventory
| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |
|---|---|---:|---:|---|---|---:|---|
| timmy-anchor-a | laptop | 16 | 8 | macOS | good | 11 | candidate 24/7 anchor agent |
| timmy-anchor-b | laptop | 8 | 4 | Linux | good | 13 | candidate 24/7 anchor agent |
| timmy-daylight-a | laptop | 32 | 10 | macOS | ok | 22 | higher-performance daylight compute |
| timmy-daylight-b | laptop | 16 | 8 | Linux | ok | 19 | daylight compute node |
| timmy-daylight-c | laptop | 8 | 4 | Windows | needs_replacement | 17 | repair power adapter before production duty |
| timmy-desktop-nas | desktop | 64 | 12 | Linux | good | 58 | desktop plus 4TB SSD NAS and heavy compute during peak sun |

View File

@@ -0,0 +1,37 @@
# NH Broadband Install Packet
**Packet ID:** nh-bb-20260415-113232
**Generated:** 2026-04-15T11:32:32.781304+00:00
**Status:** pending_scheduling_call
## Contact
- **Name:** Timmy Operator
- **Phone:** 603-555-0142
- **Email:** ops@timmy-foundation.example
## Service Address
- 123 Example Lane
- Concord, NH 03301
## Desired Plan
residential-fiber
## Call Log
- **2026-04-15T14:30:00Z** — no_answer
- Called 1-800-NHBB-INFO, ring-out after 45s
## Appointment Checklist
- [ ] Confirm exact-address availability via NH Broadband online lookup
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
- [ ] Select appointment window (morning/afternoon)
- [ ] Confirm payment method (credit card / ACH)
- [ ] Receive appointment confirmation number
- [ ] Prepare site: clear path to ONT install location
- [ ] Post-install: run speed test (fast.com / speedtest.net)
- [ ] Log final speeds and appointment outcome

View File

@@ -0,0 +1,27 @@
contact:
name: Timmy Operator
phone: "603-555-0142"
email: ops@timmy-foundation.example
service:
address: "123 Example Lane"
city: Concord
state: NH
zip: "03301"
desired_plan: residential-fiber
call_log:
- timestamp: "2026-04-15T14:30:00Z"
outcome: no_answer
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
checklist:
- "Confirm exact-address availability via NH Broadband online lookup"
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
- "Select appointment window (morning/afternoon)"
- "Confirm payment method (credit card / ACH)"
- "Receive appointment confirmation number"
- "Prepare site: clear path to ONT install location"
- "Post-install: run speed test (fast.com / speedtest.net)"
- "Log final speeds and appointment outcome"

View File

@@ -0,0 +1,35 @@
# NH Broadband — Public Research Memo
**Date:** 2026-04-15
**Status:** Draft — separates verified facts from unverified live work
**Refs:** #533, #740
---
## Verified (official public sources)
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
- Residential fiber plans are offered; speed tiers vary by location.
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
## Unverified / Requires Live Work
| Item | Status | Notes |
|---|---|---|
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
| Post-install speed test results | ❌ pending | Must run after physical install completes |
## Next Steps (Refs #740)
1. Run address availability lookup on `nhbroadband.com`
2. Call 1-800-NHBB-INFO to schedule install
3. Confirm payment method
4. Receive appointment confirmation number
5. Prepare site (clear ONT install path)
6. Post-install: speed test and log results

View File

@@ -0,0 +1,102 @@
# Long Context vs RAG Decision Framework
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
**Date**: 2026-04-15
**Status**: RESEARCHED
## Executive Summary
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
## The Core Insight
**Long context ≠ better answers.** Research shows:
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
- Cost scales quadratically with context length (attention computation)
- Latency increases linearly with input length
**RAG ≠ always better.** Retrieval introduces:
- Recall errors (miss relevant chunks)
- Precision errors (retrieve irrelevant chunks)
- Chunking artifacts (splitting mid-sentence)
- Additional latency for embedding + search
## Decision Matrix
| Scenario | Context Size | Recommendation | Why |
|----------|-------------|---------------|-----|
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
## Our Stack Constraints
### What We Have
- **Cloud models**: 128K-200K context (OpenRouter providers)
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
- **Hermes fact_store**: SQLite FTS5 full-text search
- **Memory**: MemPalace holographic embeddings
- **Session context**: Growing conversation history
### What This Means
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
3. **Mixed fleet**: Need a routing layer that decides per-session.
## Advanced Patterns
### 1. Progressive Context Loading
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
```
Turn 1: RAG search → top 3 chunks
Turn 2: Model asks "I need more context about X" → stuff X
Turn 3: Model has enough → continue
```
### 2. Context Budgeting
Allocate context budget across components:
```
System prompt: 2,000 tokens (always)
Recent messages: 10,000 tokens (last 5 turns)
RAG results: 8,000 tokens (top chunks)
Stuffed docs: 12,000 tokens (key docs)
---------------------------
Total: 32,000 tokens (fits 32K model)
```
### 3. Smart Compression
Before stuffing, compress older context:
- Summarize turns older than 10
- Remove tool call results (keep only final outputs)
- Deduplicate repeated information
- Use structured representations (JSON) instead of prose
## Empirical Benchmarks Needed
1. **Stuffing vs RAG accuracy** on our fact_store queries
2. **Latency comparison** at 32K, 64K, 128K context
3. **Cost per query** for cloud models at various context sizes
4. **Local model behavior** when pushed beyond rated context
## Recommendations
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
3. **Add context-size logging**: Track input tokens per session for data gathering
## References
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
---
*Sovereignty and service always.*

View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""Operational runner and status view for the Know Thy Father multimodal epic."""
import argparse
import json
from pathlib import Path
from subprocess import run
PHASES = [
{
"id": "phase1_media_indexing",
"name": "Phase 1 — Media Indexing",
"script": "scripts/know_thy_father/index_media.py",
"command_template": "python3 scripts/know_thy_father/index_media.py --tweets twitter-archive/extracted/tweets.jsonl --output twitter-archive/know-thy-father/media_manifest.jsonl",
"outputs": ["twitter-archive/know-thy-father/media_manifest.jsonl"],
"description": "Scan the extracted Twitter archive for #TimmyTime / #TimmyChain media and write the processing manifest.",
},
{
"id": "phase2_multimodal_analysis",
"name": "Phase 2 — Multimodal Analysis",
"script": "scripts/twitter_archive/analyze_media.py",
"command_template": "python3 scripts/twitter_archive/analyze_media.py --batch {batch_size}",
"outputs": [
"twitter-archive/know-thy-father/analysis.jsonl",
"twitter-archive/know-thy-father/meaning-kernels.jsonl",
"twitter-archive/know-thy-father/pipeline-status.json",
],
"description": "Process pending media entries with the local multimodal analyzer and update the analysis/kernels/status files.",
},
{
"id": "phase3_holographic_synthesis",
"name": "Phase 3 — Holographic Synthesis",
"script": "scripts/know_thy_father/synthesize_kernels.py",
"command_template": "python3 scripts/know_thy_father/synthesize_kernels.py --input twitter-archive/media/manifest.jsonl --output twitter-archive/knowledge/fathers_ledger.jsonl --summary twitter-archive/knowledge/fathers_ledger.summary.json",
"outputs": [
"twitter-archive/knowledge/fathers_ledger.jsonl",
"twitter-archive/knowledge/fathers_ledger.summary.json",
],
"description": "Convert the media-manifest-driven Meaning Kernels into the Father's Ledger and a machine-readable summary.",
},
{
"id": "phase4_cross_reference_audit",
"name": "Phase 4 — Cross-Reference Audit",
"script": "scripts/know_thy_father/crossref_audit.py",
"command_template": "python3 scripts/know_thy_father/crossref_audit.py --soul SOUL.md --kernels twitter-archive/notes/know_thy_father_crossref.md --output twitter-archive/notes/crossref_report.md",
"outputs": ["twitter-archive/notes/crossref_report.md"],
"description": "Compare Know Thy Father kernels against SOUL.md and related canon, then emit a Markdown audit report.",
},
{
"id": "phase5_processing_log",
"name": "Phase 5 — Processing Log / Status",
"script": "twitter-archive/know-thy-father/tracker.py",
"command_template": "python3 twitter-archive/know-thy-father/tracker.py report",
"outputs": ["twitter-archive/know-thy-father/REPORT.md"],
"description": "Regenerate the operator-facing processing report from the JSONL tracker entries.",
},
]
def build_pipeline_plan(batch_size: int = 10):
plan = []
for phase in PHASES:
plan.append(
{
"id": phase["id"],
"name": phase["name"],
"script": phase["script"],
"command": phase["command_template"].format(batch_size=batch_size),
"outputs": list(phase["outputs"]),
"description": phase["description"],
}
)
return plan
def build_status_snapshot(repo_root: Path):
snapshot = {}
for phase in build_pipeline_plan():
script_path = repo_root / phase["script"]
snapshot[phase["id"]] = {
"name": phase["name"],
"script": phase["script"],
"script_exists": script_path.exists(),
"outputs": [
{
"path": output,
"exists": (repo_root / output).exists(),
}
for output in phase["outputs"]
],
}
return snapshot
def run_step(repo_root: Path, step_id: str, batch_size: int = 10):
plan = {step["id"]: step for step in build_pipeline_plan(batch_size=batch_size)}
if step_id not in plan:
raise SystemExit(f"Unknown step: {step_id}")
step = plan[step_id]
return run(step["command"], cwd=repo_root, shell=True, check=False)
def main():
parser = argparse.ArgumentParser(description="Know Thy Father epic orchestration helper")
parser.add_argument("--batch-size", type=int, default=10)
parser.add_argument("--status", action="store_true")
parser.add_argument("--run-step", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
repo_root = Path(__file__).resolve().parents[2]
if args.run_step:
result = run_step(repo_root, args.run_step, batch_size=args.batch_size)
raise SystemExit(result.returncode)
payload = build_status_snapshot(repo_root) if args.status else build_pipeline_plan(batch_size=args.batch_size)
if args.json or args.status:
print(json.dumps(payload, indent=2))
else:
for step in payload:
print(f"[{step['id']}] {step['command']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""Prepare a MemPalace v3.0.0 integration packet for Ezra's Hermes home."""
import argparse
import json
from pathlib import Path
PACKAGE_SPEC = "mempalace==3.0.0"
DEFAULT_HERMES_HOME = "~/.hermes/"
DEFAULT_SESSIONS_DIR = "~/.hermes/sessions/"
DEFAULT_PALACE_PATH = "~/.mempalace/palace"
DEFAULT_WING = "ezra_home"
def build_yaml_template(wing: str, palace_path: str) -> str:
return (
f"wing: {wing}\n"
f"palace: {palace_path}\n"
"rooms:\n"
" - name: sessions\n"
" description: Conversation history and durable agent transcripts\n"
" globs:\n"
" - \"*.json\"\n"
" - \"*.jsonl\"\n"
" - name: config\n"
" description: Hermes configuration and runtime settings\n"
" globs:\n"
" - \"*.yaml\"\n"
" - \"*.yml\"\n"
" - \"*.toml\"\n"
" - name: docs\n"
" description: Notes, markdown docs, and operating reports\n"
" globs:\n"
" - \"*.md\"\n"
" - \"*.txt\"\n"
"people: []\n"
"projects: []\n"
)
def build_plan(overrides: dict | None = None) -> dict:
overrides = overrides or {}
hermes_home = overrides.get("hermes_home", DEFAULT_HERMES_HOME)
sessions_dir = overrides.get("sessions_dir", DEFAULT_SESSIONS_DIR)
palace_path = overrides.get("palace_path", DEFAULT_PALACE_PATH)
wing = overrides.get("wing", DEFAULT_WING)
yaml_template = build_yaml_template(wing=wing, palace_path=palace_path)
config_home = hermes_home[:-1] if hermes_home.endswith("/") else hermes_home
plan = {
"package_spec": PACKAGE_SPEC,
"hermes_home": hermes_home,
"sessions_dir": sessions_dir,
"palace_path": palace_path,
"wing": wing,
"config_path": f"{config_home}/mempalace.yaml",
"install_command": f"pip install {PACKAGE_SPEC}",
"init_command": f"mempalace init {hermes_home} --yes",
"mine_home_command": f"echo \"\" | mempalace mine {hermes_home}",
"mine_sessions_command": f"echo \"\" | mempalace mine {sessions_dir} --mode convos",
"search_command": 'mempalace search "your common queries"',
"wake_up_command": "mempalace wake-up",
"mcp_command": "hermes mcp add mempalace -- python -m mempalace.mcp_server",
"yaml_template": yaml_template,
"gotchas": [
"`mempalace init` is still interactive in room approval flow; write mempalace.yaml manually if the init output stalls.",
"The yaml key is `wing:` not `wings:`. Using the wrong key causes mine/setup failures.",
"Pipe empty stdin into mining commands (`echo \"\" | ...`) to avoid the entity-detector stdin hang on larger directories.",
"First mine downloads the ChromaDB embedding model cache (~79MB).",
"Report Ezra's before/after metrics back to issue #568 after live installation and retrieval tests.",
],
}
return plan
def render_markdown(plan: dict) -> str:
gotchas = "\n".join(f"- {item}" for item in plan["gotchas"])
return f"""# MemPalace v3.0.0 — Ezra Integration Packet
This packet turns issue #570 into an executable, reviewable integration plan for Ezra's Hermes home.
It is a repo-side scaffold: no live Ezra host changes are claimed in this artifact.
## Commands
```bash
{plan['install_command']}
{plan['init_command']}
cat > {plan['config_path']} <<'YAML'
{plan['yaml_template'].rstrip()}
YAML
{plan['mine_home_command']}
{plan['mine_sessions_command']}
{plan['search_command']}
{plan['wake_up_command']}
{plan['mcp_command']}
```
## Manual config template
```yaml
{plan['yaml_template'].rstrip()}
```
## Why this shape
- `wing: {plan['wing']}` matches the issue's Ezra-specific integration target.
- `rooms` split the mined material into sessions, config, and docs to keep retrieval interpretable.
- Mining commands pipe empty stdin to avoid the interactive entity-detector hang noted in the evaluation.
## Gotchas
{gotchas}
## Report back to #568
After live execution on Ezra's actual environment, post back to #568 with:
- install result
- mine duration and corpus size
- 2-3 real search queries + retrieved results
- wake-up context token count
- whether MCP wiring succeeded
## Honest scope boundary
This repo artifact does **not** prove live installation on Ezra's host. It makes the work reproducible and testable so the next pass can execute it without guesswork.
"""
def main() -> None:
parser = argparse.ArgumentParser(description="Prepare the MemPalace Ezra integration packet")
parser.add_argument("--hermes-home", default=DEFAULT_HERMES_HOME)
parser.add_argument("--sessions-dir", default=DEFAULT_SESSIONS_DIR)
parser.add_argument("--palace-path", default=DEFAULT_PALACE_PATH)
parser.add_argument("--wing", default=DEFAULT_WING)
parser.add_argument("--output", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
plan = build_plan(
{
"hermes_home": args.hermes_home,
"sessions_dir": args.sessions_dir,
"palace_path": args.palace_path,
"wing": args.wing,
}
)
rendered = json.dumps(plan, indent=2) if args.json else render_markdown(plan)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"MemPalace integration packet written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
DAYLIGHT_START = "10:00"
DAYLIGHT_END = "16:00"
def load_manifest(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("machines", [])
return data
def validate_manifest(data: dict[str, Any]) -> None:
machines = data.get("machines", [])
if not machines:
raise ValueError("manifest must contain at least one machine")
seen: set[str] = set()
for machine in machines:
hostname = machine.get("hostname", "").strip()
if not hostname:
raise ValueError("each machine must declare a hostname")
if hostname in seen:
raise ValueError(f"duplicate hostname: {hostname} (unique hostnames are required)")
seen.add(hostname)
for field in ("machine_type", "ram_gb", "cpu_cores", "os", "adapter_condition"):
if field not in machine:
raise ValueError(f"machine {hostname} missing required field: {field}")
def _laptops(machines: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [m for m in machines if m.get("machine_type") == "laptop"]
def _desktop(machines: list[dict[str, Any]]) -> dict[str, Any] | None:
for machine in machines:
if machine.get("machine_type") == "desktop":
return machine
return None
def choose_anchor_agents(machines: list[dict[str, Any]], count: int = 2) -> list[dict[str, Any]]:
eligible = [
m for m in _laptops(machines)
if m.get("adapter_condition") in {"good", "ok"} and m.get("always_on_capable", True)
]
eligible.sort(key=lambda m: (m.get("idle_watts", 9999), -m.get("ram_gb", 0), -m.get("cpu_cores", 0), m["hostname"]))
return eligible[:count]
def assign_roles(machines: list[dict[str, Any]]) -> dict[str, Any]:
anchors = choose_anchor_agents(machines, count=2)
anchor_names = {m["hostname"] for m in anchors}
desktop = _desktop(machines)
mapping: dict[str, dict[str, Any]] = {}
for machine in machines:
hostname = machine["hostname"]
if desktop and hostname == desktop["hostname"]:
mapping[hostname] = {
"role": "desktop_nas",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "daylight_only",
}
elif hostname in anchor_names:
mapping[hostname] = {
"role": "anchor_agent",
"schedule": "24/7",
"duty_cycle": "continuous",
}
else:
mapping[hostname] = {
"role": "daylight_agent",
"schedule": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"duty_cycle": "peak_solar",
}
return {
"anchor_agents": [m["hostname"] for m in anchors],
"desktop_nas": desktop["hostname"] if desktop else None,
"role_mapping": mapping,
}
def build_plan(data: dict[str, Any]) -> dict[str, Any]:
validate_manifest(data)
machines = data["machines"]
role_plan = assign_roles(machines)
return {
"fleet_name": data.get("fleet_name", "timmy-laptop-fleet"),
"machine_count": len(machines),
"anchor_agents": role_plan["anchor_agents"],
"desktop_nas": role_plan["desktop_nas"],
"daylight_window": f"{DAYLIGHT_START}-{DAYLIGHT_END}",
"role_mapping": role_plan["role_mapping"],
}
def render_markdown(plan: dict[str, Any], data: dict[str, Any]) -> str:
lines = [
"# Laptop Fleet Deployment Plan",
"",
f"Fleet: {plan['fleet_name']}",
f"Machine count: {plan['machine_count']}",
f"24/7 anchor agents: {', '.join(plan['anchor_agents']) if plan['anchor_agents'] else 'TBD'}",
f"Desktop/NAS: {plan['desktop_nas'] or 'TBD'}",
f"Daylight schedule: {plan['daylight_window']}",
"",
"## Role mapping",
"",
"| Hostname | Role | Schedule | Duty cycle |",
"|---|---|---|---|",
]
for hostname, role in sorted(plan["role_mapping"].items()):
lines.append(f"| {hostname} | {role['role']} | {role['schedule']} | {role['duty_cycle']} |")
lines.extend([
"",
"## Machine inventory",
"",
"| Hostname | Type | RAM | CPU cores | OS | Adapter | Idle watts | Notes |",
"|---|---|---:|---:|---|---|---:|---|",
])
for machine in data["machines"]:
lines.append(
f"| {machine['hostname']} | {machine['machine_type']} | {machine['ram_gb']} | {machine['cpu_cores']} | {machine['os']} | {machine['adapter_condition']} | {machine.get('idle_watts', 'n/a')} | {machine.get('notes', '')} |"
)
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Plan LAB-005 laptop fleet deployment.")
parser.add_argument("manifest", help="Path to laptop fleet manifest YAML")
parser.add_argument("--markdown", action="store_true", help="Render a markdown deployment plan instead of JSON")
args = parser.parse_args()
data = load_manifest(args.manifest)
plan = build_plan(data)
if args.markdown:
print(render_markdown(plan, data))
else:
print(json.dumps(plan, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""NH Broadband install packet builder for the live scheduling step."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
def load_request(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("contact", {})
data.setdefault("service", {})
data.setdefault("call_log", [])
data.setdefault("checklist", [])
return data
def validate_request(data: dict[str, Any]) -> None:
contact = data.get("contact", {})
for field in ("name", "phone"):
if not contact.get(field, "").strip():
raise ValueError(f"contact.{field} is required")
service = data.get("service", {})
for field in ("address", "city", "state"):
if not service.get(field, "").strip():
raise ValueError(f"service.{field} is required")
if not data.get("checklist"):
raise ValueError("checklist must contain at least one item")
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
validate_request(data)
contact = data["contact"]
service = data["service"]
return {
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
"generated_utc": datetime.now(timezone.utc).isoformat(),
"contact": {
"name": contact["name"],
"phone": contact["phone"],
"email": contact.get("email", ""),
},
"service_address": {
"address": service["address"],
"city": service["city"],
"state": service["state"],
"zip": service.get("zip", ""),
},
"desired_plan": data.get("desired_plan", "residential-fiber"),
"call_log": data.get("call_log", []),
"checklist": [
{"item": item, "done": False} if isinstance(item, str) else item
for item in data["checklist"]
],
"status": "pending_scheduling_call",
}
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
contact = packet["contact"]
addr = packet["service_address"]
lines = [
f"# NH Broadband Install Packet",
"",
f"**Packet ID:** {packet['packet_id']}",
f"**Generated:** {packet['generated_utc']}",
f"**Status:** {packet['status']}",
"",
"## Contact",
"",
f"- **Name:** {contact['name']}",
f"- **Phone:** {contact['phone']}",
f"- **Email:** {contact.get('email', 'n/a')}",
"",
"## Service Address",
"",
f"- {addr['address']}",
f"- {addr['city']}, {addr['state']} {addr['zip']}",
"",
f"## Desired Plan",
"",
f"{packet['desired_plan']}",
"",
"## Call Log",
"",
]
if packet["call_log"]:
for entry in packet["call_log"]:
ts = entry.get("timestamp", "n/a")
outcome = entry.get("outcome", "n/a")
notes = entry.get("notes", "")
lines.append(f"- **{ts}** — {outcome}")
if notes:
lines.append(f" - {notes}")
else:
lines.append("_No calls logged yet._")
lines.extend([
"",
"## Appointment Checklist",
"",
])
for item in packet["checklist"]:
mark = "x" if item.get("done") else " "
lines.append(f"- [{mark}] {item['item']}")
lines.append("")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
parser.add_argument("request", help="Path to install request YAML")
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
args = parser.parse_args()
data = load_request(args.request)
packet = build_packet(data)
if args.markdown:
print(render_markdown(packet, data))
else:
print(json.dumps(packet, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,410 @@
#!/usr/bin/env python3
"""
Predictive Resource Allocation — Timmy Foundation Fleet
Analyzes historical utilization patterns, predicts workload surges,
and recommends pre-provisioning actions.
Usage:
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/*.jsonl \
--heartbeat heartbeat/*.jsonl \
--horizon 6
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Quick forecast from default paths
python3 scripts/predictive_resource_allocator.py
"""
import argparse
import glob
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# ── Constants ────────────────────────────────────────────────────────────────
SURGE_THRESHOLD = 1.5
HEAVY_TOKEN_THRESHOLD = 10000
DEFAULT_HORIZON_HOURS = 6
DEFAULT_METRICS_GLOB = "metrics/local_*.jsonl"
DEFAULT_HEARTBEAT_GLOB = "heartbeat/ticks_*.jsonl"
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
# ── Data Loading ─────────────────────────────────────────────────────────────
def _parse_ts(value: str) -> datetime:
"""Parse ISO timestamp to UTC datetime."""
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(paths: Iterable[str]) -> List[dict]:
"""Load JSONL rows from one or more file paths/globs."""
rows: List[dict] = []
for pattern in paths:
for path in glob.glob(pattern):
if not os.path.isfile(path):
continue
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
return rows
def _default_paths(glob_pattern: str) -> List[str]:
"""Resolve a glob pattern relative to project root."""
full = os.path.join(ROOT_DIR, glob_pattern)
matches = glob.glob(full)
return matches if matches else [full]
# ── Time-Series Analysis ─────────────────────────────────────────────────────
def compute_rates(
rows: List[dict],
horizon_hours: int,
) -> Tuple[float, float, float, float, float]:
"""
Compare recent window vs baseline window.
Returns:
(recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate)
"""
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(_parse_ts(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
baseline = [
r for r in rows
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
]
recent_rate = len(recent) / max(horizon_hours, 1)
baseline_rate = (
len(baseline) / max(horizon_hours, 1)
if baseline
else max(0.1, recent_rate)
)
recent_tokens = sum(int(r.get("prompt_len", 0)) for r in recent)
baseline_tokens = sum(int(r.get("prompt_len", 0)) for r in baseline)
recent_token_rate = recent_tokens / max(horizon_hours, 1)
baseline_token_rate = (
baseline_tokens / max(horizon_hours, 1)
if baseline
else max(1.0, recent_token_rate)
)
request_surge = recent_rate / max(baseline_rate, 0.01)
token_surge = recent_token_rate / max(baseline_token_rate, 0.01)
surge_factor = max(request_surge, token_surge)
return recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate
def analyze_callers(rows: List[dict], horizon_hours: int) -> List[Dict[str, Any]]:
"""Summarize callers in the recent window."""
if not rows:
return []
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
calls: Counter = Counter()
tokens: Counter = Counter()
failures: Counter = Counter()
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
calls[caller] += 1
tokens[caller] += int(row.get("prompt_len", 0))
if not row.get("success", True):
failures[caller] += 1
summary = []
for caller in calls:
summary.append({
"caller": caller,
"requests": calls[caller],
"prompt_tokens": tokens[caller],
"failures": failures[caller],
"failure_rate": round(failures[caller] / max(calls[caller], 1) * 100, 1),
})
summary.sort(key=lambda x: (-x["requests"], -x["prompt_tokens"]))
return summary
def analyze_heartbeat(rows: List[dict], horizon_hours: int) -> Dict[str, int]:
"""Count infrastructure risks in recent window."""
if not rows:
return {"gitea_outages": 0, "inference_failures": 0, "total_checks": 0}
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
gitea_outages = 0
inference_failures = 0
total = 0
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
total += 1
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return {
"gitea_outages": gitea_outages,
"inference_failures": inference_failures,
"total_checks": total,
}
# ── Prediction Engine ────────────────────────────────────────────────────────
def predict_demand(
recent_rate: float,
baseline_rate: float,
surge_factor: float,
horizon_hours: int,
) -> Dict[str, Any]:
"""Predict near-term resource demand."""
predicted_rate = round(
max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2
)
if surge_factor > 3.0:
demand_level = "critical"
elif surge_factor > SURGE_THRESHOLD:
demand_level = "elevated"
elif surge_factor > 1.0:
demand_level = "normal"
else:
demand_level = "low"
return {
"predicted_requests_per_hour": predicted_rate,
"surge_factor": round(surge_factor, 2),
"demand_level": demand_level,
"horizon_hours": horizon_hours,
}
def determine_posture(
surge_factor: float,
callers: List[Dict[str, Any]],
heartbeat: Dict[str, int],
) -> Tuple[str, str, List[str]]:
"""
Determine fleet posture and recommended actions.
Returns:
(resource_mode, dispatch_posture, actions)
"""
mode = "steady"
posture = "normal"
actions: List[str] = []
# Surge detection
if surge_factor > SURGE_THRESHOLD:
mode = "surge"
actions.append(
"Pre-warm local inference before the next forecast window."
)
# Heavy background callers
heavy = [
c for c in callers
if c["prompt_tokens"] >= HEAVY_TOKEN_THRESHOLD
and ("batch" in c["caller"] or "know-thy-father" in c["caller"])
]
if heavy:
actions.append(
"Throttle or defer large background jobs until off-peak capacity is available."
)
# Caller failure rates
failing = [c for c in callers if c["failure_rate"] > 20 and c["requests"] >= 3]
if failing:
names = ", ".join(c["caller"] for c in failing[:3])
actions.append(
f"Investigate high failure rates in: {names}."
)
# Inference health
if heartbeat["inference_failures"] >= 2:
mode = "surge"
actions.append(
"Investigate local model reliability and reserve headroom for heartbeat traffic."
)
# Forge availability
if heartbeat["gitea_outages"] >= 1:
posture = "degraded"
actions.append(
"Pre-fetch or cache forge state before the next dispatch window."
)
if not actions:
actions.append(
"Maintain current resource allocation; no surge indicators detected."
)
return mode, posture, actions
# ── Main Forecast ────────────────────────────────────────────────────────────
def forecast(
metrics_paths: List[str],
heartbeat_paths: List[str],
horizon_hours: int = DEFAULT_HORIZON_HOURS,
) -> Dict[str, Any]:
"""Full resource forecast from metric and heartbeat logs."""
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_tok_rate, base_tok_rate = (
compute_rates(metric_rows, horizon_hours)
)
callers = analyze_callers(metric_rows, horizon_hours)
heartbeat = analyze_heartbeat(heartbeat_rows, horizon_hours)
demand = predict_demand(recent_rate, baseline_rate, surge_factor, horizon_hours)
mode, posture, actions = determine_posture(surge_factor, callers, heartbeat)
return {
"resource_mode": mode,
"dispatch_posture": posture,
"horizon_hours": horizon_hours,
"recent_request_rate": round(recent_rate, 2),
"baseline_request_rate": round(baseline_rate, 2),
"predicted_request_rate": demand["predicted_requests_per_hour"],
"surge_factor": demand["surge_factor"],
"demand_level": demand["demand_level"],
"recent_prompt_tokens_per_hour": round(recent_tok_rate, 2),
"baseline_prompt_tokens_per_hour": round(base_tok_rate, 2),
"gitea_outages": heartbeat["gitea_outages"],
"inference_failures": heartbeat["inference_failures"],
"heartbeat_checks": heartbeat["total_checks"],
"top_callers": callers[:10],
"recommended_actions": actions,
}
# ── Output Formatters ────────────────────────────────────────────────────────
def format_markdown(fc: Dict[str, Any]) -> str:
"""Format forecast as markdown report."""
lines = [
"# Predictive Resource Allocation — Fleet Forecast",
"",
f"**Horizon:** {fc['horizon_hours']} hours",
f"**Resource mode:** {fc['resource_mode']}",
f"**Dispatch posture:** {fc['dispatch_posture']}",
f"**Demand level:** {fc['demand_level']}",
"",
"## Demand Metrics",
"",
f"| Metric | Recent | Baseline |",
f"|--------|-------:|---------:|",
f"| Requests/hour | {fc['recent_request_rate']} | {fc['baseline_request_rate']} |",
f"| Prompt tokens/hour | {fc['recent_prompt_tokens_per_hour']} | {fc['baseline_prompt_tokens_per_hour']} |",
"",
f"**Surge factor:** {fc['surge_factor']}x",
f"**Predicted request rate:** {fc['predicted_request_rate']}/hour",
"",
"## Infrastructure Health",
"",
f"- Gitea outages (recent window): {fc['gitea_outages']}",
f"- Inference failures (recent window): {fc['inference_failures']}",
f"- Heartbeat checks analyzed: {fc['heartbeat_checks']}",
"",
"## Recommended Actions",
"",
]
for action in fc["recommended_actions"]:
lines.append(f"- {action}")
if fc["top_callers"]:
lines.extend([
"",
"## Top Callers (Recent Window)",
"",
"| Caller | Requests | Tokens | Failures |",
"|--------|---------:|-------:|---------:|",
])
for c in fc["top_callers"]:
lines.append(
f"| {c['caller']} | {c['requests']} | {c['prompt_tokens']} | {c['failures']} |"
)
return "\n".join(lines) + "\n"
# ── CLI ──────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(
description="Predictive resource allocation for the Timmy fleet"
)
parser.add_argument(
"--metrics", nargs="*", default=None,
help="Metric JSONL paths (supports globs). Default: metrics/local_*.jsonl"
)
parser.add_argument(
"--heartbeat", nargs="*", default=None,
help="Heartbeat JSONL paths (supports globs). Default: heartbeat/ticks_*.jsonl"
)
parser.add_argument(
"--horizon", type=int, default=DEFAULT_HORIZON_HOURS,
help="Forecast horizon in hours (default: 6)"
)
parser.add_argument(
"--json", action="store_true",
help="Output raw JSON instead of markdown"
)
args = parser.parse_args()
metrics_paths = args.metrics or _default_paths(DEFAULT_METRICS_GLOB)
heartbeat_paths = args.heartbeat or _default_paths(DEFAULT_HEARTBEAT_GLOB)
fc = forecast(metrics_paths, heartbeat_paths, args.horizon)
if args.json:
print(json.dumps(fc, indent=2))
else:
print(format_markdown(fc))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,76 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "know_thy_father" / "epic_pipeline.py"
DOC_PATH = ROOT / "docs" / "KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestKnowThyFatherEpicPipeline(unittest.TestCase):
def test_build_pipeline_plan_contains_all_phases_in_order(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
plan = mod.build_pipeline_plan(batch_size=10)
self.assertEqual(
[step["id"] for step in plan],
[
"phase1_media_indexing",
"phase2_multimodal_analysis",
"phase3_holographic_synthesis",
"phase4_cross_reference_audit",
"phase5_processing_log",
],
)
self.assertIn("scripts/know_thy_father/index_media.py", plan[0]["command"])
self.assertIn("scripts/twitter_archive/analyze_media.py --batch 10", plan[1]["command"])
self.assertIn("scripts/know_thy_father/synthesize_kernels.py", plan[2]["command"])
self.assertIn("scripts/know_thy_father/crossref_audit.py", plan[3]["command"])
self.assertIn("twitter-archive/know-thy-father/tracker.py report", plan[4]["command"])
def test_status_snapshot_reports_key_artifact_paths(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
status = mod.build_status_snapshot(ROOT)
self.assertIn("phase1_media_indexing", status)
self.assertIn("phase2_multimodal_analysis", status)
self.assertIn("phase3_holographic_synthesis", status)
self.assertIn("phase4_cross_reference_audit", status)
self.assertIn("phase5_processing_log", status)
self.assertEqual(status["phase1_media_indexing"]["script"], "scripts/know_thy_father/index_media.py")
self.assertEqual(status["phase2_multimodal_analysis"]["script"], "scripts/twitter_archive/analyze_media.py")
self.assertEqual(status["phase5_processing_log"]["script"], "twitter-archive/know-thy-father/tracker.py")
self.assertTrue(status["phase1_media_indexing"]["script_exists"])
self.assertTrue(status["phase2_multimodal_analysis"]["script_exists"])
self.assertTrue(status["phase3_holographic_synthesis"]["script_exists"])
self.assertTrue(status["phase4_cross_reference_audit"]["script_exists"])
self.assertTrue(status["phase5_processing_log"]["script_exists"])
def test_repo_contains_multimodal_pipeline_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed Know Thy Father pipeline doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Know Thy Father — Multimodal Media Consumption Pipeline",
"scripts/know_thy_father/index_media.py",
"scripts/twitter_archive/analyze_media.py --batch 10",
"scripts/know_thy_father/synthesize_kernels.py",
"scripts/know_thy_father/crossref_audit.py",
"twitter-archive/know-thy-father/tracker.py report",
"Refs #582",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,52 @@
from pathlib import Path
import yaml
from scripts.plan_laptop_fleet import build_plan, load_manifest, render_markdown, validate_manifest
def test_laptop_fleet_planner_script_exists() -> None:
assert Path("scripts/plan_laptop_fleet.py").exists()
def test_laptop_fleet_manifest_template_exists() -> None:
assert Path("docs/laptop-fleet-manifest.example.yaml").exists()
def test_build_plan_selects_two_lowest_idle_watt_laptops_as_anchors() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
assert plan["anchor_agents"] == ["timmy-anchor-a", "timmy-anchor-b"]
assert plan["desktop_nas"] == "timmy-desktop-nas"
assert plan["role_mapping"]["timmy-daylight-a"]["schedule"] == "10:00-16:00"
def test_validate_manifest_requires_unique_hostnames() -> None:
data = {
"machines": [
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 8, "cpu_cores": 4, "os": "Linux", "adapter_condition": "good"},
{"hostname": "dup", "machine_type": "laptop", "ram_gb": 16, "cpu_cores": 8, "os": "Linux", "adapter_condition": "good"},
]
}
try:
validate_manifest(data)
except ValueError as exc:
assert "duplicate hostname" in str(exc)
assert "unique hostnames" in str(exc)
else:
raise AssertionError("validate_manifest should reject duplicate hostname")
def test_markdown_contains_anchor_agents_and_daylight_schedule() -> None:
data = load_manifest("docs/laptop-fleet-manifest.example.yaml")
plan = build_plan(data)
content = render_markdown(plan, data)
assert "24/7 anchor agents: timmy-anchor-a, timmy-anchor-b" in content
assert "Daylight schedule: 10:00-16:00" in content
assert "desktop_nas" in content
def test_manifest_template_is_valid_yaml() -> None:
data = yaml.safe_load(Path("docs/laptop-fleet-manifest.example.yaml").read_text())
assert data["fleet_name"] == "timmy-laptop-fleet"
assert len(data["machines"]) == 6

View File

@@ -0,0 +1,68 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "mempalace_ezra_integration.py"
DOC_PATH = ROOT / "docs" / "MEMPALACE_EZRA_INTEGRATION.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestMempalaceEzraIntegration(unittest.TestCase):
def test_build_plan_contains_issue_required_steps_and_gotchas(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan({})
self.assertEqual(plan["package_spec"], "mempalace==3.0.0")
self.assertIn("pip install mempalace==3.0.0", plan["install_command"])
self.assertEqual(plan["wing"], "ezra_home")
self.assertIn('echo "" | mempalace mine ~/.hermes/', plan["mine_home_command"])
self.assertIn('--mode convos', plan["mine_sessions_command"])
self.assertIn('mempalace wake-up', plan["wake_up_command"])
self.assertIn('hermes mcp add mempalace -- python -m mempalace.mcp_server', plan["mcp_command"])
self.assertIn('wing:', plan["yaml_template"])
self.assertTrue(any('stdin' in item.lower() for item in plan["gotchas"]))
self.assertTrue(any('wing:' in item for item in plan["gotchas"]))
def test_build_plan_accepts_path_and_wing_overrides(self):
mod = load_module(SCRIPT_PATH, "mempalace_ezra_integration")
plan = mod.build_plan(
{
"hermes_home": "/root/wizards/ezra/home",
"sessions_dir": "/root/wizards/ezra/home/sessions",
"wing": "ezra_archive",
}
)
self.assertEqual(plan["wing"], "ezra_archive")
self.assertIn('/root/wizards/ezra/home', plan["mine_home_command"])
self.assertIn('/root/wizards/ezra/home/sessions', plan["mine_sessions_command"])
self.assertIn('wing: ezra_archive', plan["yaml_template"])
def test_repo_contains_mem_palace_ezra_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed MemPalace Ezra integration doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# MemPalace v3.0.0 — Ezra Integration Packet",
"pip install mempalace==3.0.0",
'echo "" | mempalace mine ~/.hermes/',
"mempalace mine ~/.hermes/sessions/ --mode convos",
"mempalace wake-up",
"hermes mcp add mempalace -- python -m mempalace.mcp_server",
"Report back to #568",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,105 @@
from pathlib import Path
import yaml
from scripts.plan_nh_broadband_install import (
build_packet,
load_request,
render_markdown,
validate_request,
)
def test_script_exists() -> None:
assert Path("scripts/plan_nh_broadband_install.py").exists()
def test_example_request_exists() -> None:
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
def test_example_packet_exists() -> None:
assert Path("docs/nh-broadband-install-packet.example.md").exists()
def test_research_memo_exists() -> None:
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
def test_load_and_build_packet() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
assert packet["contact"]["name"] == "Timmy Operator"
assert packet["service_address"]["city"] == "Concord"
assert packet["service_address"]["state"] == "NH"
assert packet["status"] == "pending_scheduling_call"
assert len(packet["checklist"]) == 8
assert packet["checklist"][0]["done"] is False
def test_validate_rejects_missing_contact_name() -> None:
data = {
"contact": {"name": "", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "contact.name" in str(exc)
else:
raise AssertionError("should reject empty contact name")
def test_validate_rejects_missing_service_address() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "service.address" in str(exc)
else:
raise AssertionError("should reject empty service address")
def test_validate_rejects_empty_checklist() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": [],
}
try:
validate_request(data)
except ValueError as exc:
assert "checklist" in str(exc)
else:
raise AssertionError("should reject empty checklist")
def test_render_markdown_contains_key_sections() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "# NH Broadband Install Packet" in md
assert "## Contact" in md
assert "## Service Address" in md
assert "## Call Log" in md
assert "## Appointment Checklist" in md
assert "Concord" in md
assert "NH" in md
def test_render_markdown_shows_checklist_items() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "- [ ] Confirm exact-address availability" in md
def test_example_yaml_is_valid() -> None:
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
assert data["contact"]["name"] == "Timmy Operator"
assert len(data["checklist"]) == 8

View File

@@ -0,0 +1,236 @@
"""Tests for predictive resource allocation."""
import json
import os
import sys
from pathlib import Path
import pytest
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from predictive_resource_allocator import (
_parse_ts,
compute_rates,
analyze_callers,
analyze_heartbeat,
predict_demand,
determine_posture,
forecast,
format_markdown,
load_jsonl,
)
def _write_jsonl(path: Path, rows: list):
with open(path, "w") as f:
for row in rows:
f.write(json.dumps(row) + "\n")
def _make_metrics(count: int, base_hour: int = 0, caller: str = "heartbeat_tick",
prompt_len: int = 1000, success: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i // 60:02d}:{i % 60:02d}:00+00:00",
"caller": caller,
"prompt_len": prompt_len,
"response_len": 50,
"success": success,
})
return rows
def _make_heartbeat(count: int, base_hour: int = 0,
gitea_alive: bool = True, inference_ok: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i:02d}:00:00+00:00",
"perception": {
"gitea_alive": gitea_alive,
"model_health": {"inference_ok": inference_ok},
},
})
return rows
# ── Timestamp Parsing ────────────────────────────────────────────────────────
class TestTimestampParsing:
def test_z_suffix(self):
dt = _parse_ts("2026-03-29T12:00:00Z")
assert dt.tzinfo is not None
def test_explicit_offset(self):
dt = _parse_ts("2026-03-29T12:00:00+00:00")
assert dt.hour == 12
def test_ordering(self):
earlier = _parse_ts("2026-03-29T10:00:00Z")
later = _parse_ts("2026-03-29T12:00:00Z")
assert earlier < later
# ── Rate Computation ─────────────────────────────────────────────────────────
class TestComputeRates:
def test_empty_returns_defaults(self):
r_rate, b_rate, surge, _, _ = compute_rates([], 6)
assert r_rate == 0.0
assert surge == 1.0
def test_surge_detected(self):
# 1 baseline req, 20 recent reqs
baseline = _make_metrics(1, base_hour=0)
recent = _make_metrics(20, base_hour=12)
rows = baseline + recent
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge > 1.0
def test_no_surge_when_stable(self):
# Same rate in both windows
early = _make_metrics(6, base_hour=0)
late = _make_metrics(6, base_hour=12)
rows = early + late
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge < 1.5
# ── Caller Analysis ──────────────────────────────────────────────────────────
class TestAnalyzeCallers:
def test_empty(self):
assert analyze_callers([], 6) == []
def test_groups_by_caller(self):
rows = _make_metrics(3, caller="heartbeat_tick") + _make_metrics(2, caller="know-thy-father", prompt_len=15000)
callers = analyze_callers(rows, horizon_hours=24)
names = [c["caller"] for c in callers]
assert "heartbeat_tick" in names
assert "know-thy-father" in names
def test_sorted_by_request_count(self):
rows = _make_metrics(1, caller="rare") + _make_metrics(10, caller="frequent")
callers = analyze_callers(rows, horizon_hours=24)
assert callers[0]["caller"] == "frequent"
def test_failure_rate(self):
rows = _make_metrics(10, caller="flaky", success=False)
callers = analyze_callers(rows, horizon_hours=24)
flaky = [c for c in callers if c["caller"] == "flaky"][0]
assert flaky["failure_rate"] == 100.0
# ── Heartbeat Analysis ───────────────────────────────────────────────────────
class TestAnalyzeHeartbeat:
def test_empty(self):
result = analyze_heartbeat([], 6)
assert result["gitea_outages"] == 0
def test_detects_gitea_outage(self):
rows = _make_heartbeat(3, gitea_alive=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["gitea_outages"] == 3
def test_detects_inference_failure(self):
rows = _make_heartbeat(2, inference_ok=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["inference_failures"] == 2
# ── Demand Prediction ────────────────────────────────────────────────────────
class TestPredictDemand:
def test_critical_on_extreme_surge(self):
result = predict_demand(100.0, 10.0, 10.0, 6)
assert result["demand_level"] == "critical"
def test_elevated_on_moderate_surge(self):
result = predict_demand(50.0, 10.0, 2.0, 6)
assert result["demand_level"] == "elevated"
def test_normal_on_slight_increase(self):
result = predict_demand(12.0, 10.0, 1.2, 6)
assert result["demand_level"] == "normal"
def test_low_when_decreasing(self):
result = predict_demand(5.0, 10.0, 0.5, 6)
assert result["demand_level"] == "low"
# ── Posture Determination ────────────────────────────────────────────────────
class TestDeterminePosture:
def test_steady_normal_when_no_issues(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "steady"
assert posture == "normal"
assert "no surge indicators" in actions[0]
def test_surge_on_high_factor(self):
mode, posture, actions = determine_posture(2.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "surge"
assert any("Pre-warm" in a for a in actions)
def test_degraded_on_gitea_outage(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 3, "inference_failures": 0, "total_checks": 5})
assert posture == "degraded"
assert any("forge state" in a for a in actions)
def test_heavy_background_flagged(self):
callers = [{"caller": "know-thy-father-batch", "requests": 5, "prompt_tokens": 50000, "failures": 0, "failure_rate": 0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("Throttle" in a or "background" in a for a in actions)
def test_failing_callers_flagged(self):
callers = [{"caller": "bad_actor", "requests": 10, "prompt_tokens": 1000, "failures": 5, "failure_rate": 50.0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("failure rate" in a.lower() for a in actions)
# ── Full Forecast ────────────────────────────────────────────────────────────
class TestForecast:
def test_end_to_end(self, tmp_path):
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
_write_jsonl(metrics_path, _make_metrics(6, base_hour=0) + _make_metrics(30, base_hour=12))
_write_jsonl(heartbeat_path, _make_heartbeat(5, base_hour=8, inference_ok=False))
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert "resource_mode" in result
assert "dispatch_posture" in result
assert "surge_factor" in result
assert "top_callers" in result
assert "recommended_actions" in result
assert isinstance(result["top_callers"], list)
assert isinstance(result["recommended_actions"], list)
def test_empty_inputs(self, tmp_path):
metrics_path = tmp_path / "empty_m.jsonl"
heartbeat_path = tmp_path / "empty_h.jsonl"
metrics_path.write_text("")
heartbeat_path.write_text("")
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert result["resource_mode"] == "steady"
assert result["surge_factor"] == 1.0
# ── Markdown Output ──────────────────────────────────────────────────────────
class TestFormatMarkdown:
def test_contains_key_sections(self):
fc = forecast([], [], horizon_hours=6)
md = format_markdown(fc)
assert "Predictive Resource Allocation" in md
assert "Demand Metrics" in md
assert "Recommended Actions" in md
assert "Horizon" in md

View File

@@ -17,8 +17,24 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import importlib.util
from harness import UniWizardHarness, House, ExecutionResult
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution."""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
class TaskType(Enum):

View File

@@ -8,13 +8,30 @@ import time
import sys
import argparse
import os
import importlib.util
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent))
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution.
Prevents namespace collisions when multiple directories contain modules
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
"""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
from author_whitelist import AuthorWhitelist