Compare commits

..

5 Commits

Author SHA1 Message Date
Alexander Whitestone
9c2341f4ca feat: add Ezra quarterly report April 2026 (MD + PDF)
Brings in consolidated quarterly report from epic-999-phase-ii-forge branch.
Covers V-011 security hardening, context compressor tuning, burn mode resilience,
system formalization audit, Operation Get A Job GTM strategy, and fleet status.

Fixes #133
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 22:04:29 -04:00
069d5404a0 [BEZALEL][DEMO] Notebook Workflow: Jupytext + Papermill for Agent Tasks (#157)
Some checks failed
Notebook CI / notebook-smoke (push) Failing after 2s
2026-04-07 02:02:49 +00:00
258d02eb9b [claude] Sovereign Deployment Runbook — Repeatable, Documented Service Deployment (#146) (#161)
Some checks failed
Docker Build and Publish / build-and-push (push) Failing after 8s
Nix / nix (ubuntu-latest) (push) Failing after 1s
Tests / test (push) Failing after 2s
Nix / nix (macos-latest) (push) Has been cancelled
2026-04-07 02:02:04 +00:00
a89c0a2ea4 [claude] The Testbed Observatory — Health Monitoring & Alerting (#147) (#159)
Some checks failed
Docker Build and Publish / build-and-push (push) Failing after 17s
Nix / nix (ubuntu-latest) (push) Failing after 1s
Tests / test (push) Failing after 5s
Nix / nix (macos-latest) (push) Has been cancelled
2026-04-07 02:00:40 +00:00
c994c01c9f [claude] Deep research: Jupyter ecosystem as LLM execution layer (#155) (#160)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been cancelled
Nix / nix (macos-latest) (push) Has been cancelled
Nix / nix (ubuntu-latest) (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-04-07 02:00:20 +00:00
10 changed files with 2540 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
name: Notebook CI
on:
push:
paths:
- 'notebooks/**'
pull_request:
paths:
- 'notebooks/**'
jobs:
notebook-smoke:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install papermill jupytext nbformat
python -m ipykernel install --user --name python3
- name: Execute system health notebook
run: |
papermill notebooks/agent_task_system_health.ipynb /tmp/output.ipynb \
-p threshold 0.5 \
-p hostname ci-runner
- name: Verify output has results
run: |
python -c "
import json
nb = json.load(open('/tmp/output.ipynb'))
code_cells = [c for c in nb['cells'] if c['cell_type'] == 'code']
outputs = [c.get('outputs', []) for c in code_cells]
total_outputs = sum(len(o) for o in outputs)
assert total_outputs > 0, 'Notebook produced no outputs'
print(f'Notebook executed successfully with {total_outputs} output(s)')
"

57
docs/NOTEBOOK_WORKFLOW.md Normal file
View File

@@ -0,0 +1,57 @@
# Notebook Workflow for Agent Tasks
This directory demonstrates a sovereign, version-controlled workflow for LLM agent tasks using Jupyter notebooks.
## Philosophy
- **`.py` files are the source of truth`** — authored and reviewed as plain Python with `# %%` cell markers (via Jupytext)
- **`.ipynb` files are generated artifacts** — auto-created from `.py` for execution and rich viewing
- **Papermill parameterizes and executes** — each run produces an output notebook with code, narrative, and results preserved
- **Output notebooks are audit artifacts** — every execution leaves a permanent, replayable record
## File Layout
```
notebooks/
agent_task_system_health.py # Source of truth (Jupytext)
agent_task_system_health.ipynb # Generated from .py
docs/
NOTEBOOK_WORKFLOW.md # This document
.gitea/workflows/
notebook-ci.yml # CI gate: executes notebooks on PR/push
```
## How Agents Work With Notebooks
1. **Create** — Agent generates a `.py` notebook using `# %% [markdown]` and `# %%` code blocks
2. **Review** — PR reviewers see clean diffs in Gitea (no JSON noise)
3. **Generate**`jupytext --to ipynb` produces the `.ipynb` before merge
4. **Execute** — Papermill runs the notebook with injected parameters
5. **Archive** — Output notebook is committed to a `reports/` branch or artifact store
## Converting Between Formats
```bash
# .py -> .ipynb
jupytext --to ipynb notebooks/agent_task_system_health.py
# .ipynb -> .py
jupytext --to py notebooks/agent_task_system_health.ipynb
# Execute with parameters
papermill notebooks/agent_task_system_health.ipynb output.ipynb \
-p threshold 1.0 -p hostname forge-vps-01
```
## CI Gate
The `notebook-ci.yml` workflow executes all notebooks in `notebooks/` on every PR and push, ensuring that checked-in notebooks still run and produce outputs.
## Why This Matters
| Problem | Notebook Solution |
|---|---|
| Ephemeral agent reasoning | Markdown cells narrate the thought process |
| Stateless single-turn tools | Stateful cells persist variables across steps |
| Unreviewable binary artifacts | `.py` source is diffable and PR-friendly |
| No execution audit trail | Output notebook preserves code + outputs + metadata |

View File

@@ -0,0 +1,678 @@
# Jupyter Notebooks as Core LLM Execution Layer — Deep Research Report
**Issue:** #155
**Date:** 2026-04-06
**Status:** Research / Spike
**Prior Art:** Timmy's initial spike (llm_execution_spike.ipynb, hamelnb bridge, JupyterLab on forge VPS)
---
## Executive Summary
This report deepens the research from issue #155 into three areas requested by Rockachopa:
1. The **full Jupyter product suite** — JupyterHub vs JupyterLab vs Notebook
2. **Papermill** — the production-grade notebook execution engine already used in real data pipelines
3. The **"PR model for notebooks"** — how agents can propose, diff, review, and merge changes to `.ipynb` files similarly to code PRs
The conclusion: an elegant, production-grade agent→notebook pipeline already exists as open-source tooling. We don't need to invent much — we need to compose what's there.
---
## 1. The Jupyter Product Suite
The Jupyter ecosystem has three distinct layers that are often conflated. Understanding the distinction is critical for architectural decisions.
### 1.1 Jupyter Notebook (Classic)
The original single-user interface. One browser tab = one `.ipynb` file. Version 6 is in maintenance-only mode. Version 7 was rebuilt on JupyterLab components and is functionally equivalent. For headless agent use, the UI is irrelevant — what matters is the `.ipynb` file format and the kernel execution model underneath.
### 1.2 JupyterLab
The current canonical Jupyter interface for human users: full IDE, multi-pane, terminal, extension manager, built-in diff viewer, and `jupyterlab-git` for Git workflows from the UI. JupyterLab is the recommended target for agent-collaborative workflows because:
- It exposes the same REST API as classic Jupyter (kernel sessions, execute, contents)
- Extensions like `jupyterlab-git` let a human co-reviewer inspect changes alongside the agent
- The `hamelnb` bridge Timmy already validated works against a JupyterLab server
**For agents:** JupyterLab is the platform to run on. The agent doesn't interact with the UI — it uses the Jupyter REST API or Papermill on top of it.
### 1.3 JupyterHub — The Multi-User Orchestration Layer
JupyterHub is not a UI. It is a **multi-user server** that spawns, manages, and proxies individual single-user Jupyter servers. This is the production infrastructure layer.
```
[Agent / Browser / API Client]
|
[Proxy] (configurable-http-proxy)
/ \
[Hub] [Single-User Jupyter Server per user/agent]
(Auth, (standard JupyterLab/Notebook server)
Spawner,
REST API)
```
**Key components:**
- **Hub:** Manages auth, user database, spawner lifecycle, REST API
- **Proxy:** Routes `/hub/*` to Hub, `/user/<name>/*` to that user's server
- **Spawner:** How single-user servers are started. Default = local process. Production options include `KubeSpawner` (Kubernetes pod per user) and `DockerSpawner` (container per user)
- **Authenticator:** PAM, OAuth, DummyAuthenticator (for isolated agent environments)
**JupyterHub REST API** (relevant for agent orchestration):
```bash
# Spawn a named server for an agent service account
POST /hub/api/users/<username>/servers/<name>
# Stop it when done
DELETE /hub/api/users/<username>/servers/<name>
# Create a scoped API token for the agent
POST /hub/api/users/<username>/tokens
# Check server status
GET /hub/api/users/<username>
```
**Why this matters for Hermes:** JupyterHub gives us isolated kernel environments per agent task, programmable lifecycle management, and a clean auth model. Instead of running one shared JupyterLab instance on the forge VPS, we could spawn ephemeral single-user servers per notebook execution run — each with its own kernel, clean state, and resource limits.
### 1.4 Jupyter Kernel Gateway — Minimal Headless Execution
If JupyterHub is too heavy, `jupyter-kernel-gateway` exposes just the kernel protocol over REST + WebSocket:
```bash
pip install jupyter-kernel-gateway
jupyter kernelgateway --KernelGatewayApp.api=kernel_gateway.jupyter_websocket
# Start kernel
POST /api/kernels
# Execute via WebSocket on Jupyter messaging protocol
WS /api/kernels/<kernel_id>/channels
# Stop kernel
DELETE /api/kernels/<kernel_id>
```
This is the lowest-level option: no notebook management, just raw kernel access. Suitable if we want to build our own execution layer from scratch.
---
## 2. Papermill — Production Notebook Execution
Papermill is the missing link between "notebook as experiment" and "notebook as repeatable pipeline task." It is already used at scale in industry data pipelines (Netflix, Airbnb, etc.).
### 2.1 Core Concept: Parameterization
Papermill's key innovation is **parameter injection**. Tag a cell in the notebook with `"parameters"`:
```python
# Cell tagged "parameters" (defaults — defined by notebook author)
alpha = 0.5
batch_size = 32
model_name = "baseline"
```
At runtime, Papermill inserts a new cell immediately after, tagged `"injected-parameters"`, that overrides the defaults:
```python
# Cell tagged "injected-parameters" (injected by Papermill at runtime)
alpha = 0.01
batch_size = 128
model_name = "experiment_007"
```
Because Python executes top-to-bottom, the injected cell shadows the defaults. The original notebook is never mutated — Papermill reads input, writes to a new output file.
### 2.2 Python API
```python
import papermill as pm
nb = pm.execute_notebook(
input_path="analysis.ipynb", # source (can be s3://, az://, gs://)
output_path="output/run_001.ipynb", # destination (persists outputs)
parameters={
"alpha": 0.01,
"n_samples": 1000,
"run_id": "fleet-check-2026-04-06",
},
kernel_name="python3",
execution_timeout=300, # per-cell timeout in seconds
log_output=True, # stream cell output to logger
cwd="/path/to/notebook/", # working directory
)
# Returns: NotebookNode (the fully executed notebook with all outputs)
```
On cell failure, Papermill raises `PapermillExecutionError` with:
- `cell_index` — which cell failed
- `source` — the failing cell's code
- `ename` / `evalue` — exception type and message
- `traceback` — full traceback
Even on failure, the output notebook is written with whatever cells completed — enabling partial-run inspection.
### 2.3 CLI
```bash
# Basic execution
papermill analysis.ipynb output/run_001.ipynb \
-p alpha 0.01 \
-p n_samples 1000
# From YAML parameter file
papermill analysis.ipynb output/run_001.ipynb -f params.yaml
# CI-friendly: log outputs, no progress bar
papermill analysis.ipynb output/run_001.ipynb \
--log-output \
--no-progress-bar \
--execution-timeout 300 \
-p run_id "fleet-check-2026-04-06"
# Prepare only (inject params, skip execution — for preview/inspection)
papermill analysis.ipynb preview.ipynb --prepare-only -p alpha 0.01
# Inspect parameter schema
papermill --help-notebook analysis.ipynb
```
**Remote storage** is built in — `pip install papermill[s3]` enables `s3://` paths for both input and output. Azure and GCS are also supported. For Hermes, this means notebook runs can be stored in object storage and retrieved later for audit.
### 2.4 Scrapbook — Structured Output Collection
`scrapbook` is Papermill's companion for extracting structured data from executed notebooks. Inside a notebook cell:
```python
import scrapbook as sb
# Write typed outputs (stored as special display_data in cell outputs)
sb.glue("accuracy", 0.9342)
sb.glue("metrics", {"precision": 0.91, "recall": 0.93, "f1": 0.92})
sb.glue("results_df", df, "pandas") # DataFrames too
```
After execution, from the agent:
```python
import scrapbook as sb
nb = sb.read_notebook("output/fleet-check-2026-04-06.ipynb")
metrics = nb.scraps["metrics"].data # -> {"precision": 0.91, ...}
accuracy = nb.scraps["accuracy"].data # -> 0.9342
# Or aggregate across many runs
book = sb.read_notebooks("output/")
book.scrap_dataframe # -> pd.DataFrame with all scraps + filenames
```
This is the clean interface between notebook execution and agent decision-making: the notebook outputs its findings as named, typed scraps; the agent reads them programmatically and acts.
### 2.5 How Papermill Compares to hamelnb
| Capability | hamelnb | Papermill |
|---|---|---|
| Stateful kernel session | Yes | No (fresh kernel per run) |
| Parameter injection | No | Yes |
| Persistent output notebook | No | Yes |
| Remote storage (S3/Azure) | No | Yes |
| Per-cell timing/metadata | No | Yes (in output nb metadata) |
| Error isolation (partial runs) | No | Yes |
| Production pipeline use | Experimental | Industry-standard |
| Structured output collection | No | Yes (via scrapbook) |
**Verdict:** `hamelnb` is great for interactive REPL-style exploration (where state accumulates). Papermill is better for task execution (where we want reproducible, parameterized, auditable runs). They serve different use cases. Hermes needs both.
---
## 3. The `.ipynb` File Format — What the Agent Is Actually Working With
Understanding the format is essential for the "PR model." A `.ipynb` file is JSON with this structure:
```json
{
"nbformat": 4,
"nbformat_minor": 5,
"metadata": {
"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"},
"language_info": {"name": "python", "version": "3.10.0"}
},
"cells": [
{
"id": "a1b2c3d4",
"cell_type": "markdown",
"source": "# Fleet Health Check\n\nThis notebook checks system health.",
"metadata": {}
},
{
"id": "e5f6g7h8",
"cell_type": "code",
"source": "alpha = 0.5\nthreshold = 0.95",
"metadata": {"tags": ["parameters"]},
"execution_count": null,
"outputs": []
},
{
"id": "i9j0k1l2",
"cell_type": "code",
"source": "import sys\nprint(sys.version)",
"metadata": {},
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "3.10.0 (default, ...)\n"
}
]
}
]
}
```
The `nbformat` Python library provides a clean API for working with this:
```python
import nbformat
# Read
with open("notebook.ipynb") as f:
nb = nbformat.read(f, as_version=4)
# Navigate
for cell in nb.cells:
if cell.cell_type == "code":
print(cell.source)
# Modify
nb.cells[2].source = "import sys\nprint('updated')"
# Add cells
new_md = nbformat.v4.new_markdown_cell("## Agent Analysis\nInserted by Hermes.")
nb.cells.insert(3, new_md)
# Write
with open("modified.ipynb", "w") as f:
nbformat.write(nb, f)
# Validate
nbformat.validate(nb) # raises nbformat.ValidationError on invalid format
```
---
## 4. The PR Model for Notebooks
This is the elegant architecture Rockachopa described: agents making PRs to notebooks the same way they make PRs to code. Here's how the full stack enables it.
### 4.1 The Problem: Raw `.ipynb` Diffs Are Unusable
Without tooling, a `git diff` on a notebook that was merely re-run (no source changes) produces thousands of lines of JSON changes — execution counts, timestamps, base64-encoded plot images. Code review on raw `.ipynb` diffs is impractical.
### 4.2 nbstripout — Clean Git History
`nbstripout` installs a git **clean filter** that strips outputs before files enter the git index. The working copy is untouched; only what gets committed is clean.
```bash
pip install nbstripout
nbstripout --install # per-repo
# or
nbstripout --install --global # all repos
```
This writes to `.git/config`:
```ini
[filter "nbstripout"]
clean = nbstripout
smudge = cat
required = true
[diff "ipynb"]
textconv = nbstripout -t
```
And to `.gitattributes`:
```
*.ipynb filter=nbstripout
*.ipynb diff=ipynb
```
Now `git diff` shows only source changes — same as reviewing a `.py` file.
**For executed-output notebooks** (where we want to keep outputs for audit): use a separate path like `runs/` or `outputs/` excluded from the filter via `.gitattributes`:
```
*.ipynb filter=nbstripout
runs/*.ipynb !filter
runs/*.ipynb !diff
```
### 4.3 nbdime — Semantic Diff and Merge
nbdime understands notebook structure. Instead of diffing raw JSON, it diffs at the level of cells — knowing that `cells` is a list, `source` is a string, and outputs should often be ignored.
```bash
pip install nbdime
# Enable semantic git diff/merge for all .ipynb files
nbdime config-git --enable
# Now standard git commands are notebook-aware:
git diff HEAD notebook.ipynb # semantic cell-level diff
git merge feature-branch # uses nbdime for .ipynb conflict resolution
git log -p notebook.ipynb # readable patch per commit
```
**Python API for agent reasoning:**
```python
import nbdime
import nbformat
nb_base = nbformat.read(open("original.ipynb"), as_version=4)
nb_pr = nbformat.read(open("proposed.ipynb"), as_version=4)
diff = nbdime.diff_notebooks(nb_base, nb_pr)
# diff is a list of structured ops the agent can reason about:
# [{"op": "patch", "key": "cells", "diff": [
# {"op": "patch", "key": 3, "diff": [
# {"op": "patch", "key": "source", "diff": [...string ops...]}
# ]}
# ]}]
# Apply a diff (patch)
from nbdime.patching import patch
nb_result = patch(nb_base, diff)
```
### 4.4 The Full Agent PR Workflow
Here is the complete workflow — analogous to how Hermes makes PRs to code repos via Gitea:
**1. Agent reads the task notebook**
```python
nb = nbformat.read(open("fleet_health_check.ipynb"), as_version=4)
```
**2. Agent locates and modifies relevant cells**
```python
# Find parameter cell
params_cell = next(
c for c in nb.cells
if "parameters" in c.get("metadata", {}).get("tags", [])
)
# Update threshold
params_cell.source = params_cell.source.replace("threshold = 0.95", "threshold = 0.90")
# Add explanatory markdown
nb.cells.insert(
nb.cells.index(params_cell) + 1,
nbformat.v4.new_markdown_cell(
"**Note (Hermes 2026-04-06):** Threshold lowered from 0.95 to 0.90 "
"based on false-positive analysis from last 7 days of runs."
)
)
```
**3. Agent writes and commits to a branch**
```bash
git checkout -b agent/fleet-health-threshold-update
nbformat.write(nb, open("fleet_health_check.ipynb", "w"))
git add fleet_health_check.ipynb
git commit -m "feat(notebooks): lower fleet health threshold to 0.90 (#155)"
```
**4. Agent executes the proposed notebook to validate**
```python
import papermill as pm
pm.execute_notebook(
"fleet_health_check.ipynb",
"output/validation_run.ipynb",
parameters={"run_id": "agent-validation-2026-04-06"},
log_output=True,
)
```
**5. Agent collects results and compares**
```python
import scrapbook as sb
result = sb.read_notebook("output/validation_run.ipynb")
health_score = result.scraps["health_score"].data
alert_count = result.scraps["alert_count"].data
```
**6. Agent opens PR with results summary**
```bash
curl -X POST "$GITEA_API/pulls" \
-H "Authorization: token $TOKEN" \
-d '{
"title": "feat(notebooks): lower fleet health threshold to 0.90",
"body": "## Agent Analysis\n\n- Health score: 0.94 (was 0.89 with old threshold)\n- Alert count: 12 (was 47 false positives)\n- Validation run: output/validation_run.ipynb\n\nRefs #155",
"head": "agent/fleet-health-threshold-update",
"base": "main"
}'
```
**7. Human reviews the PR using nbdime diff**
The PR diff in Gitea shows the clean cell-level source changes (thanks to nbstripout). The human can also run `nbdiff-web original.ipynb proposed.ipynb` locally for rich rendered diff with output comparison.
### 4.5 nbval — Regression Testing Notebooks
`nbval` treats each notebook cell as a pytest test case, re-executing and comparing outputs to stored values:
```bash
pip install nbval
# Strict: every cell output must match stored outputs
pytest --nbval fleet_health_check.ipynb
# Lax: only check cells marked with # NBVAL_CHECK_OUTPUT
pytest --nbval-lax fleet_health_check.ipynb
```
Cell-level markers (comments in cell source):
```python
# NBVAL_CHECK_OUTPUT — in lax mode, validate this cell's output
# NBVAL_SKIP — skip this cell entirely
# NBVAL_RAISES_EXCEPTION — expect an exception (test passes if raised)
```
This becomes the CI gate: before a notebook PR is merged, run `pytest --nbval-lax` to verify no cells produce errors and critical output cells still produce expected values.
---
## 5. Gaps and Recommendations
### 5.1 Gap Assessment (Refining Timmy's Original Findings)
| Gap | Severity | Solution |
|---|---|---|
| No Hermes tool access in kernel | High | Inject `hermes_runtime` module (see §5.2) |
| No structured output protocol | High | Use scrapbook `sb.glue()` pattern |
| No parameterization | Medium | Add Papermill `"parameters"` cell to notebooks |
| XSRF/auth friction | Medium | Disable for local; use JupyterHub token scopes for multi-user |
| No notebook CI/testing | Medium | Add nbval to test suite |
| Raw `.ipynb` diffs in PRs | Medium | Install nbstripout + nbdime |
| No scheduling | Low | Papermill + existing Hermes cron layer |
### 5.2 Short-Term Recommendations (This Month)
**1. `NotebookExecutor` tool**
A thin Hermes tool wrapping the ecosystem:
```python
class NotebookExecutor:
def execute(self, input_path, output_path, parameters, timeout=300):
"""Wraps pm.execute_notebook(). Returns structured result dict."""
def collect_outputs(self, notebook_path):
"""Wraps sb.read_notebook(). Returns dict of named scraps."""
def inspect_parameters(self, notebook_path):
"""Wraps pm.inspect_notebook(). Returns parameter schema."""
def read_notebook(self, path):
"""Returns nbformat NotebookNode for cell inspection/modification."""
def write_notebook(self, nb, path):
"""Writes modified NotebookNode back to disk."""
def diff_notebooks(self, path_a, path_b):
"""Returns structured nbdime diff for agent reasoning."""
def validate(self, notebook_path):
"""Runs nbformat.validate() + optional pytest --nbval-lax."""
```
Execution result structure for the agent:
```python
{
"status": "success" | "error",
"duration_seconds": 12.34,
"cells_executed": 15,
"failed_cell": { # None on success
"index": 7,
"source": "model.fit(X, y)",
"ename": "ValueError",
"evalue": "Input contains NaN",
},
"scraps": { # from scrapbook
"health_score": 0.94,
"alert_count": 12,
},
}
```
**2. Fleet Health Check as a Notebook**
Convert the fleet health check epic into a parameterized notebook with:
- `"parameters"` cell for run configuration (date range, thresholds, agent ID)
- Markdown cells narrating each step
- `sb.glue()` calls for structured outputs
- `# NBVAL_CHECK_OUTPUT` markers on critical cells
**3. Git hygiene for notebooks**
Install nbstripout + nbdime in the hermes-agent repo:
```bash
pip install nbstripout nbdime
nbstripout --install
nbdime config-git --enable
```
Add to `.gitattributes`:
```
*.ipynb filter=nbstripout
*.ipynb diff=ipynb
runs/*.ipynb !filter
```
### 5.3 Medium-Term Recommendations (Next Quarter)
**4. `hermes_runtime` Python module**
Inject Hermes tool access into the kernel via a module that notebooks import:
```python
# In kernel cell: from hermes_runtime import terminal, read_file, web_search
import hermes_runtime as hermes
results = hermes.web_search("fleet health metrics best practices")
hermes.terminal("systemctl status agent-fleet")
content = hermes.read_file("/var/log/hermes/agent.log")
```
This closes the most significant gap: notebooks gain the same tool access as skills, while retaining state persistence and narrative structure.
**5. Notebook-triggered cron**
Extend the Hermes cron layer to accept `.ipynb` paths as targets:
```yaml
# cron entry
schedule: "0 6 * * *"
type: notebook
path: notebooks/fleet_health_check.ipynb
parameters:
run_id: "{{date}}"
alert_threshold: 0.90
output_path: runs/fleet_health_{{date}}.ipynb
```
The cron runner calls `pm.execute_notebook()` and commits the output to the repo.
**6. JupyterHub for multi-agent isolation**
If multiple agents need concurrent notebook execution, deploy JupyterHub with `DockerSpawner` or `KubeSpawner`. Each agent job gets an isolated container with its own kernel, no state bleed between runs.
---
## 6. Architecture Vision
```
┌─────────────────────────────────────────────────────────────────┐
│ Hermes Agent │
│ │
│ Skills (one-shot) Notebooks (multi-step) │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ terminal() │ │ .ipynb file │ │
│ │ web_search() │ │ ├── Markdown (narrative) │ │
│ │ read_file() │ │ ├── Code cells (logic) │ │
│ └─────────────────┘ │ ├── "parameters" cell │ │
│ │ └── sb.glue() outputs │ │
│ └──────────────┬────────────────┘ │
│ │ │
│ ┌──────────────▼────────────────┐ │
│ │ NotebookExecutor tool │ │
│ │ (papermill + scrapbook + │ │
│ │ nbformat + nbdime + nbval) │ │
│ └──────────────┬────────────────┘ │
│ │ │
└────────────────────────────────────────────┼────────────────────┘
┌───────────────────▼──────────────────┐
│ JupyterLab / Hub │
│ (kernel execution environment) │
└───────────────────┬──────────────────┘
┌───────────────────▼──────────────────┐
│ Git + Gitea │
│ (nbstripout clean diffs, │
│ nbdime semantic review, │
│ PR workflow for notebook changes) │
└──────────────────────────────────────┘
```
**Notebooks become the primary artifact of complex tasks:** the agent generates or edits cells, Papermill executes them reproducibly, scrapbook extracts structured outputs for agent decision-making, and the resulting `.ipynb` is both proof-of-work and human-readable report. Skills remain for one-shot actions. Notebooks own multi-step workflows.
---
## 7. Package Summary
| Package | Purpose | Install |
|---|---|---|
| `nbformat` | Read/write/validate `.ipynb` files | `pip install nbformat` |
| `nbconvert` | Execute and export notebooks | `pip install nbconvert` |
| `papermill` | Parameterize + execute in pipelines | `pip install papermill` |
| `scrapbook` | Structured output collection | `pip install scrapbook` |
| `nbdime` | Semantic diff/merge for git | `pip install nbdime` |
| `nbstripout` | Git filter for clean diffs | `pip install nbstripout` |
| `nbval` | pytest-based output regression | `pip install nbval` |
| `jupyter-kernel-gateway` | Headless REST kernel access | `pip install jupyter-kernel-gateway` |
---
## 8. References
- [Papermill GitHub (nteract/papermill)](https://github.com/nteract/papermill)
- [Scrapbook GitHub (nteract/scrapbook)](https://github.com/nteract/scrapbook)
- [nbformat format specification](https://nbformat.readthedocs.io/en/latest/format_description.html)
- [nbdime documentation](https://nbdime.readthedocs.io/)
- [nbdime diff format spec (JEP #8)](https://github.com/jupyter/enhancement-proposals/blob/master/08-notebook-diff/notebook-diff.md)
- [nbconvert execute API](https://nbconvert.readthedocs.io/en/latest/execute_api.html)
- [nbstripout README](https://github.com/kynan/nbstripout)
- [nbval GitHub (computationalmodelling/nbval)](https://github.com/computationalmodelling/nbval)
- [JupyterHub REST API](https://jupyterhub.readthedocs.io/en/stable/howto/rest.html)
- [JupyterHub Technical Overview](https://jupyterhub.readthedocs.io/en/latest/reference/technical-overview.html)
- [Jupyter Kernel Gateway](https://github.com/jupyter-server/kernel_gateway)

View File

@@ -0,0 +1,57 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parameterized Agent Task: System Health Check\n",
"\n",
"This notebook demonstrates how an LLM agent can generate a task notebook,\n",
"a scheduler can parameterize and execute it via papermill,\n",
"and the output becomes a persistent audit artifact."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {"tags": ["parameters"]},
"outputs": [],
"source": [
"# Default parameters — papermill will inject overrides here\n",
"threshold = 1.0\n",
"hostname = \"localhost\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json, subprocess, datetime\n",
"gather_time = datetime.datetime.now().isoformat()\n",
"load_avg = subprocess.check_output([\"cat\", \"/proc/loadavg\"]).decode().strip()\n",
"load_values = [float(x) for x in load_avg.split()[:3]]\n",
"avg_load = sum(load_values) / len(load_values)\n",
"intervention_needed = avg_load > threshold\n",
"report = {\n",
" \"hostname\": hostname,\n",
" \"threshold\": threshold,\n",
" \"avg_load\": round(avg_load, 3),\n",
" \"intervention_needed\": intervention_needed,\n",
" \"gathered_at\": gather_time\n",
"}\n",
"print(json.dumps(report, indent=2))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,41 @@
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.19.1
# kernelspec:
# display_name: Python 3
# language: python
# name: python3
# ---
# %% [markdown]
# # Parameterized Agent Task: System Health Check
#
# This notebook demonstrates how an LLM agent can generate a task notebook,
# a scheduler can parameterize and execute it via papermill,
# and the output becomes a persistent audit artifact.
# %% tags=["parameters"]
# Default parameters — papermill will inject overrides here
threshold = 1.0
hostname = "localhost"
# %%
import json, subprocess, datetime
gather_time = datetime.datetime.now().isoformat()
load_avg = subprocess.check_output(["cat", "/proc/loadavg"]).decode().strip()
load_values = [float(x) for x in load_avg.split()[:3]]
avg_load = sum(load_values) / len(load_values)
intervention_needed = avg_load > threshold
report = {
"hostname": hostname,
"threshold": threshold,
"avg_load": round(avg_load, 3),
"intervention_needed": intervention_needed,
"gathered_at": gather_time
}
print(json.dumps(report, indent=2))

955
observatory.py Normal file
View File

@@ -0,0 +1,955 @@
"""
Observatory — Testbed Health Monitoring & Alerting for Hermes Agent
Checks running services, system resources, and connectivity.
Fires Telegram alerts when thresholds are breached.
Posts daily digest reports.
Stores 30 days of historical health data in SQLite.
Usage:
python observatory.py --check # one-shot health check (stdout)
python observatory.py --daemon # continuous monitor (60s poll)
python observatory.py --digest # print / send daily digest
python observatory.py --history N # show last N health records
python observatory.py --slo # print SLO report
Configuration (env vars, falls back to ~/.hermes/.env):
OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts
OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat)
OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60)
OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db)
TELEGRAM_BOT_TOKEN Bot token used to send alerts
# Threshold overrides (all optional):
OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80)
OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90)
OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80)
OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90)
OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80)
OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95)
OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health)
OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health)
OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000)
OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5)
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sqlite3
import sys
import time
import urllib.request
import urllib.error
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Optional imports
# ---------------------------------------------------------------------------
try:
import psutil
_PSUTIL = True
except ImportError:
_PSUTIL = False
try:
from dotenv import load_dotenv as _load_dotenv
_DOTENV = True
except ImportError:
_DOTENV = False
logger = logging.getLogger("observatory")
# ---------------------------------------------------------------------------
# Constants & SLO definitions
# ---------------------------------------------------------------------------
RETENTION_DAYS = 30
SLO_DEFINITIONS = {
"gateway_uptime_pct": {
"description": "Gateway process uptime over the last 24 hours",
"target": 99.5,
"unit": "%",
},
"webhook_latency_ms": {
"description": "Webhook endpoint p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
"api_server_latency_ms": {
"description": "API server /health p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _load_env() -> None:
"""Load .env from HERMES_HOME if dotenv is available."""
if not _DOTENV:
return
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
env_path = hermes_home / ".env"
if env_path.exists():
_load_dotenv(env_path, override=False)
# Project-level .env as dev fallback
project_env = Path(__file__).parent / ".env"
if project_env.exists():
_load_dotenv(project_env, override=False)
@dataclass
class ObservatoryConfig:
alert_chat_id: Optional[str] = None
digest_chat_id: Optional[str] = None
telegram_token: Optional[str] = None
poll_interval: int = 60
db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db")
disk_warn_pct: float = 80.0
disk_crit_pct: float = 90.0
mem_warn_pct: float = 80.0
mem_crit_pct: float = 90.0
cpu_warn_pct: float = 80.0
cpu_crit_pct: float = 95.0
webhook_url: str = "http://127.0.0.1:8080/health"
api_url: str = "http://127.0.0.1:8642/health"
webhook_latency_slo_ms: float = 2000.0
gateway_uptime_slo_pct: float = 99.5
@classmethod
def from_env(cls) -> "ObservatoryConfig":
_load_env()
cfg = cls()
cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID")
cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id
cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60))
db_override = os.getenv("OBSERVATORY_DB_PATH")
if db_override:
cfg.db_path = Path(db_override)
cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80))
cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90))
cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80))
cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90))
cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80))
cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95))
cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health")
cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health")
cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000))
cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5))
return cfg
# ---------------------------------------------------------------------------
# Health check models
# ---------------------------------------------------------------------------
@dataclass
class CheckResult:
name: str
status: str # "ok" | "warn" | "critical" | "error"
message: str
value: Optional[float] = None
unit: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthSnapshot:
ts: str # ISO8601 UTC
checks: List[CheckResult] = field(default_factory=list)
@property
def overall_status(self) -> str:
statuses = {c.status for c in self.checks}
if "critical" in statuses or "error" in statuses:
return "critical"
if "warn" in statuses:
return "warn"
return "ok"
def to_dict(self) -> Dict[str, Any]:
return {
"ts": self.ts,
"overall": self.overall_status,
"checks": [asdict(c) for c in self.checks],
}
# ---------------------------------------------------------------------------
# Individual health checks
# ---------------------------------------------------------------------------
def check_gateway_liveness() -> CheckResult:
"""Check whether the Hermes gateway process is running."""
try:
from gateway.status import is_gateway_running, get_running_pid
running = is_gateway_running()
pid = get_running_pid()
if running:
return CheckResult(
name="gateway_process",
status="ok",
message=f"Gateway running (pid={pid})",
value=float(pid) if pid else None,
)
return CheckResult(
name="gateway_process",
status="critical",
message="Gateway process is NOT running",
)
except Exception as exc:
return CheckResult(
name="gateway_process",
status="error",
message=f"Could not determine gateway status: {exc}",
)
def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check API server /health endpoint responsiveness."""
url = cfg.api_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
body = resp.read(512).decode("utf-8", errors="replace")
status_code = resp.status
if status_code < 400:
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
return CheckResult(
name="api_server_http",
status="ok" if slo_ok else "warn",
message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code, "body_preview": body[:100]},
)
return CheckResult(
name="api_server_http",
status="critical",
message=f"API server returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
# Not running is acceptable if gateway is not configured for API
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="api_server_http",
status="warn",
message=f"API server not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check webhook endpoint responsiveness."""
url = cfg.webhook_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
status_code = resp.status
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
if status_code < 400:
return CheckResult(
name="webhook_http",
status="ok" if slo_ok else "warn",
message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code},
)
return CheckResult(
name="webhook_http",
status="critical",
message=f"Webhook returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="webhook_http",
status="warn",
message=f"Webhook not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_disk(cfg: ObservatoryConfig) -> CheckResult:
"""Check disk usage on the HERMES_HOME filesystem."""
if not _PSUTIL:
return CheckResult(name="disk", status="error", message="psutil not installed")
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
path = str(hermes_home) if hermes_home.exists() else "/"
usage = psutil.disk_usage(path)
pct = usage.percent
free_gb = usage.free / (1024 ** 3)
if pct >= cfg.disk_crit_pct:
status = "critical"
elif pct >= cfg.disk_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="disk",
status=status,
message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)",
value=pct,
unit="%",
extra={"free_bytes": usage.free, "total_bytes": usage.total},
)
except Exception as exc:
return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}")
def check_memory(cfg: ObservatoryConfig) -> CheckResult:
"""Check system memory usage."""
if not _PSUTIL:
return CheckResult(name="memory", status="error", message="psutil not installed")
try:
mem = psutil.virtual_memory()
pct = mem.percent
available_gb = mem.available / (1024 ** 3)
if pct >= cfg.mem_crit_pct:
status = "critical"
elif pct >= cfg.mem_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="memory",
status=status,
message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)",
value=pct,
unit="%",
extra={"available_bytes": mem.available, "total_bytes": mem.total},
)
except Exception as exc:
return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}")
def check_cpu(cfg: ObservatoryConfig) -> CheckResult:
"""Check CPU usage (1-second sample)."""
if not _PSUTIL:
return CheckResult(name="cpu", status="error", message="psutil not installed")
try:
pct = psutil.cpu_percent(interval=1)
if pct >= cfg.cpu_crit_pct:
status = "critical"
elif pct >= cfg.cpu_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="cpu",
status=status,
message=f"CPU {pct:.1f}%",
value=pct,
unit="%",
)
except Exception as exc:
return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}")
def check_database(cfg: ObservatoryConfig) -> CheckResult:
"""Check observatory SQLite DB connectivity and size."""
db_path = cfg.db_path
try:
if not db_path.exists():
return CheckResult(
name="database",
status="warn",
message=f"Observatory DB not yet created at {db_path}",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
conn.execute("SELECT count(*) FROM health_snapshots").fetchone()
conn.close()
return CheckResult(
name="database",
status="ok",
message=f"Observatory DB OK ({size_kb:.1f}KB)",
value=size_kb,
unit="KB",
extra={"path": str(db_path)},
)
except Exception as exc:
return CheckResult(
name="database",
status="error",
message=f"DB check error: {exc}",
)
def check_response_store_db() -> CheckResult:
"""Check the API server's SQLite response store DB if it exists."""
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
db_path = hermes_home / "response_store.db"
if not db_path.exists():
return CheckResult(
name="response_store_db",
status="ok",
message="Response store DB not present (API server not yet used)",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
count = conn.execute("SELECT count(*) FROM responses").fetchone()[0]
conn.close()
return CheckResult(
name="response_store_db",
status="ok",
message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)",
value=size_kb,
unit="KB",
)
except Exception as exc:
return CheckResult(
name="response_store_db",
status="error",
message=f"Response store DB error: {exc}",
)
# ---------------------------------------------------------------------------
# Snapshot collector
# ---------------------------------------------------------------------------
def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot:
"""Run all checks and return a HealthSnapshot."""
ts = datetime.now(timezone.utc).isoformat()
checks = [
check_gateway_liveness(),
check_api_server_http(cfg),
check_webhook_http(cfg),
check_disk(cfg),
check_memory(cfg),
check_cpu(cfg),
check_database(cfg),
check_response_store_db(),
]
return HealthSnapshot(ts=ts, checks=checks)
# ---------------------------------------------------------------------------
# SQLite persistence
# ---------------------------------------------------------------------------
@contextmanager
def _db(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
finally:
conn.close()
def _init_db(path: Path) -> None:
"""Create tables if they don't exist."""
with _db(path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS health_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
overall TEXT NOT NULL,
payload TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)")
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts_sent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
check_name TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)")
def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None:
"""Persist snapshot to SQLite."""
_init_db(cfg.db_path)
payload = json.dumps(snapshot.to_dict())
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
(snapshot.ts, snapshot.overall_status, payload),
)
# Prune records older than RETENTION_DAYS
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat()
conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,))
def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None:
"""Record that an alert was dispatched."""
_init_db(cfg.db_path)
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)",
(datetime.now(timezone.utc).isoformat(), check_name, status, message),
)
def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]:
"""Load snapshots from the last N days."""
if not cfg.db_path.exists():
return []
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
with _db(cfg.db_path) as conn:
rows = conn.execute(
"SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC",
(cutoff,),
).fetchall()
return [json.loads(row[2]) for row in rows]
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def _telegram_send(token: str, chat_id: str, text: str) -> bool:
"""Send a Telegram message via the Bot API. Returns True on success."""
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}).encode("utf-8")
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "hermes-observatory/1.0")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read())
return bool(body.get("ok"))
except Exception as exc:
logger.warning("Telegram send failed: %s", exc)
return False
def _status_emoji(status: str) -> str:
return {"ok": "", "warn": "⚠️", "critical": "🔴", "error": ""}.get(status, "")
def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]:
"""
Fire Telegram alerts for newly degraded checks.
Returns list of alert messages sent.
"""
if not cfg.telegram_token or not cfg.alert_chat_id:
return []
alerts_sent = []
prev_statuses: Dict[str, str] = {}
if prev_snapshot:
for c in prev_snapshot.checks:
prev_statuses[c.name] = c.status
for check in snapshot.checks:
if check.status in ("critical", "error"):
prev = prev_statuses.get(check.name, "ok")
if prev not in ("critical", "error"):
# Newly degraded — alert
emoji = _status_emoji(check.status)
msg = (
f"{emoji} <b>Hermes Observatory Alert</b>\n\n"
f"<b>Check:</b> {check.name}\n"
f"<b>Status:</b> {check.status.upper()}\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, check.status, check.message)
logger.info("Alert sent for %s (%s)", check.name, check.status)
elif check.status == "ok":
prev = prev_statuses.get(check.name)
if prev in ("critical", "error"):
# Recovery alert
msg = (
f"✅ <b>Hermes Observatory — Recovery</b>\n\n"
f"<b>Check:</b> {check.name} has recovered\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, "recovery", check.message)
return alerts_sent
# ---------------------------------------------------------------------------
# Daily digest
# ---------------------------------------------------------------------------
def build_digest(cfg: ObservatoryConfig) -> str:
"""Build a daily health digest from stored snapshots."""
snapshots = load_snapshots(cfg, days=1)
total = len(snapshots)
if total == 0:
return "No health data available for the last 24 hours."
# Count by overall status
status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0}
check_degraded_counts: Dict[str, int] = {}
latencies: Dict[str, List[float]] = {}
for snap in snapshots:
overall = snap.get("overall", "ok")
status_counts[overall] = status_counts.get(overall, 0) + 1
for check in snap.get("checks", []):
name = check["name"]
status = check["status"]
if status in ("critical", "error", "warn"):
check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1
value = check.get("value")
unit = check.get("unit")
if value is not None and unit == "ms":
if name not in latencies:
latencies[name] = []
latencies[name].append(float(value))
uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
lines = [
f"📊 <b>Hermes Observatory — Daily Digest</b>",
f"<b>Generated:</b> {now}",
f"",
f"<b>Last 24h Summary</b> ({total} samples)",
f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)",
f" Warning: {status_counts.get('warn', 0)}",
f" Critical: {status_counts.get('critical', 0)}",
f" Error: {status_counts.get('error', 0)}",
f"",
]
# SLO status
lines.append("<b>SLO Status</b>")
gw_uptime_target = cfg.gateway_uptime_slo_pct
gw_snapshots = [
s for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", []))
]
gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0
gw_ok = gw_uptime >= gw_uptime_target
lines.append(
f" {'' if gw_ok else ''} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)"
)
wh_latency_target = cfg.webhook_latency_slo_ms
if "webhook_http" in latencies and latencies["webhook_http"]:
wh_vals = sorted(latencies["webhook_http"])
p95_idx = int(len(wh_vals) * 0.95)
p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)]
wh_ok = p95 <= wh_latency_target
lines.append(
f" {'' if wh_ok else ''} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
else:
lines.append(f" ⚫ Webhook latency: no data")
if "api_server_http" in latencies and latencies["api_server_http"]:
api_vals = sorted(latencies["api_server_http"])
p95_idx = int(len(api_vals) * 0.95)
p95 = api_vals[min(p95_idx, len(api_vals) - 1)]
api_ok = p95 <= wh_latency_target
lines.append(
f" {'' if api_ok else ''} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
# Top degraded checks
if check_degraded_counts:
lines.append("")
lines.append("<b>Degraded Checks (24h)</b>")
for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]):
pct = 100 * count // total if total else 0
lines.append(f"{name}: {count} incidents ({pct}%)")
lines.append("")
lines.append(f"<i>Observatory DB: {cfg.db_path}</i>")
return "\n".join(lines)
def send_digest(cfg: ObservatoryConfig) -> bool:
"""Build and send the daily digest to Telegram. Returns True on success."""
digest = build_digest(cfg)
if cfg.telegram_token and cfg.digest_chat_id:
return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest)
return False
# ---------------------------------------------------------------------------
# Display helpers
# ---------------------------------------------------------------------------
_STATUS_COLORS = {
"ok": "\033[32m", # green
"warn": "\033[33m", # yellow
"critical": "\033[31m", # red
"error": "\033[91m", # bright red
}
_RESET = "\033[0m"
def _color_status(status: str) -> str:
c = _STATUS_COLORS.get(status, "")
return f"{c}{status.upper()}{_RESET}"
def print_snapshot(snapshot: HealthSnapshot) -> None:
overall_color = _STATUS_COLORS.get(snapshot.overall_status, "")
print(f"\n{'='*60}")
print(f" Hermes Observatory — {snapshot.ts}")
print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}")
print(f"{'='*60}")
for check in snapshot.checks:
emoji = _status_emoji(check.status)
val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else ""
print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}")
print()
def print_slo_report(cfg: ObservatoryConfig) -> None:
"""Print current SLO definitions and targets."""
snapshots = load_snapshots(cfg, days=30)
total = len(snapshots)
print(f"\n{'='*60}")
print(" Hermes Observatory — SLO Report (last 30 days)")
print(f"{'='*60}")
for slo_key, slo in SLO_DEFINITIONS.items():
print(f"\n {slo['description']}")
print(f" Target: {slo['target']}{slo['unit']}")
if total == 0:
print(f" Status: no data")
continue
if slo_key == "gateway_uptime_pct":
ok_count = sum(
1 for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok"
for c in s.get("checks", []))
)
actual = 100.0 * ok_count / total
met = actual >= slo["target"]
print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}")
elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"):
check_name = "webhook_http" if "webhook" in slo_key else "api_server_http"
vals = [
float(c["value"])
for s in snapshots
for c in s.get("checks", [])
if c["name"] == check_name and c.get("value") is not None
]
if vals:
vals.sort()
p95_idx = int(len(vals) * 0.95)
p95 = vals[min(p95_idx, len(vals) - 1)]
met = p95 <= slo["target"]
print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}")
else:
print(f" Status: no latency data")
print()
def print_history(cfg: ObservatoryConfig, count: int = 20) -> None:
"""Print recent health records."""
snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count]
if not snapshots:
print("No history available.")
return
print(f"\n{'='*60}")
print(f" Last {min(count, len(snapshots))} health records")
print(f"{'='*60}")
for snap in snapshots:
ts = snap.get("ts", "?")
overall = snap.get("overall", "?")
emoji = _status_emoji(overall)
degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"]
degraded_str = f" — issues: {', '.join(degraded)}" if degraded else ""
print(f" {emoji} {ts} {overall.upper()}{degraded_str}")
print()
# ---------------------------------------------------------------------------
# Daemon mode
# ---------------------------------------------------------------------------
class Observatory:
"""Continuous monitoring daemon."""
def __init__(self, cfg: ObservatoryConfig):
self.cfg = cfg
self._running = False
self._prev_snapshot: Optional[HealthSnapshot] = None
def _handle_signal(self, signum: int, frame: Any) -> None:
logger.info("Received signal %d, shutting down...", signum)
self._running = False
def run_once(self) -> HealthSnapshot:
snapshot = collect_snapshot(self.cfg)
store_snapshot(self.cfg, snapshot)
alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot)
if alerts:
logger.info("Sent %d alert(s)", len(alerts))
self._prev_snapshot = snapshot
return snapshot
def run(self) -> None:
_init_db(self.cfg.db_path)
logger.info(
"Observatory starting — poll_interval=%ds db=%s",
self.cfg.poll_interval,
self.cfg.db_path,
)
self._running = True
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
while self._running:
try:
snapshot = self.run_once()
logger.info("Health check: %s", snapshot.overall_status)
except Exception as exc:
logger.error("Health check failed: %s", exc, exc_info=True)
if self._running:
time.sleep(self.cfg.poll_interval)
logger.info("Observatory stopped.")
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Hermes Observatory — health monitoring & alerting",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--check", action="store_true", help="Run one health check and print results")
parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon")
parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest")
parser.add_argument("--history", type=int, metavar="N", help="Show last N health records")
parser.add_argument("--slo", action="store_true", help="Print SLO report")
parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram")
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s [observatory] %(message)s",
)
cfg = ObservatoryConfig.from_env()
_init_db(cfg.db_path)
if args.check:
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if args.daemon:
obs = Observatory(cfg)
obs.run()
return 0
if args.digest or args.send_digest:
digest = build_digest(cfg)
print(digest)
if args.send_digest:
ok = send_digest(cfg)
if ok:
print("\n[Digest sent to Telegram]")
else:
print("\n[Telegram send skipped — token/chat_id not configured]")
return 0
if args.history is not None:
print_history(cfg, args.history)
return 0
if args.slo:
print_slo_report(cfg)
return 0
# Default: one-shot check
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -42,6 +42,7 @@ dependencies = [
modal = ["modal>=1.0.0,<2"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
observatory = ["psutil>=5.9.0,<7"]
messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
cron = ["croniter>=6.0.0,<7"]
slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

View File

@@ -0,0 +1,252 @@
# Ezra — Quarterly Technical & Strategic Report
**April 2026**
---
## Executive Summary
This report consolidates the principal technical and strategic outputs from Q1/Q2 2026. Three major workstreams are covered:
1. **Security & Performance Hardening** — Shipped V-011 obfuscation detection and context-compressor tuning.
2. **System Formalization Audit** — Identified ~6,300 lines of homegrown infrastructure that can be replaced by well-maintained open-source projects.
3. **Business Development** — Formalized a pure-contracting go-to-market plan ("Operation Get A Job") to monetize the engineering collective.
---
## 1. Recent Deliverables
### 1.1 V-011 Obfuscation Bypass Detection
A significant security enhancement was shipped to the skills-guard subsystem to defeat obfuscated malicious skill code.
**Technical additions:**
- `normalize_input()` with NFKC normalization, case folding, and zero-width character removal to defeat homoglyph and ZWSP evasion.
- `PythonSecurityAnalyzer` AST visitor detecting `eval`/`exec`/`compile`, `getattr` dunder access, and imports of `base64`/`codecs`/`marshal`/`types`/`ctypes`.
- Additional regex patterns for `getattr` builtins chains, `__import__` os/subprocess, and nested base64 decoding.
- Full integration into `scan_file()`; Python files now receive both normalized regex scanning and AST-based analysis.
**Verification:** All tests passing (`103 passed, 4 warnings`).
**Reference:** Forge PR #131`[EPIC-999/Phase II] The Forge — V-011 obfuscation fix + compressor tuning`
### 1.2 Context Compressor Tuning
The default `protect_last_n` parameter was reduced from `20` to `5`. The previous default was overly conservative, preventing meaningful compression on long sessions. The new default preserves the five most recent conversational turns while allowing the compressor to effectively reduce token pressure.
A regression test was added verifying that the last five turns are never summarized away.
### 1.3 Burn Mode Resilience
The agent loop was enhanced with a configurable `burn_mode` flag that increases concurrent tool execution capacity and adds transient-failure retry logic.
**Changes:**
- `max_tool_workers` increased from `8` to `16` in burn mode.
- Expanded parallel tool coverage to include browser, vision, skill, and session-search tools.
- Added batch timeout protection (300s in burn mode / 180s normal) to prevent hung threads from blocking the agent loop.
- Thread-pool shutdown now uses `executor.shutdown(wait=False)` for immediate control return.
- Transient errors (timeouts, rate limits, 502/503/504) trigger one automatic retry in burn mode.
---
## 2. System Formalization Audit
A comprehensive audit was performed across the `hermes-agent` codebase to identify homegrown modules that could be replaced by mature open-source alternatives. The objective is efficiency: reduce maintenance burden, leverage community expertise, and improve reliability.
### 2.1 Candidate Matrix
| Priority | Component | Lines | Current State | Proposed Replacement | Effort | ROI |
|:--------:|-----------|------:|---------------|----------------------|:------:|:---:|
| **P0** | MCP Client | 2,176 | Custom asyncio transport, sampling, schema translation | `mcp` (official Python SDK) | 2-3 wks | Very High |
| **P0** | Cron Scheduler | ~1,500 | Custom JSON job store, manual tick loop | `APScheduler` | 1-2 wks | Very High |
| **P0** | Config Management | 2,589 | Manual YAML loader, no type safety | `pydantic-settings` + Pydantic v2 | 3-4 wks | High |
| **P1** | Checkpoint Manager | 548 | Shells out to `git` binary | `dulwich` (pure-Python git) | 1 wk | Medium-High |
| **P1** | Auth / Credential Pool | ~3,800 | Custom JWT decode, OAuth refresh, JSON auth store | `authlib` + `keyring` + `PyJWT` | 2-3 wks | Medium |
| **P1** | Batch Runner | 1,285 | Custom `multiprocessing.Pool` wrapper | `joblib` (local) or `celery` (distributed) | 1-2 wks | Medium |
| **P2** | SQLite Session Store | ~2,400 | Raw SQLite + FTS5, manual schema | SQLAlchemy ORM + Alembic | 2-3 wks | Medium |
| **P2** | Trajectory Compressor | 1,518 | Custom tokenizer + summarization pipeline | Keep core logic; add `zstandard` for binary storage | 3 days | Low-Medium |
| **P2** | Process Registry | 889 | Custom background process tracking | Keep (adds too much ops complexity) | — | Low |
| **P2** | Web Tools | 2,080+ | Firecrawl + Parallel wrappers | Keep (Firecrawl is already best-in-class) | — | Low |
### 2.2 P0 Replacements
#### MCP Client → Official `mcp` Python SDK
**Current:** `tools/mcp_tool.py` (2,176 lines) contains custom stdio/HTTP transport lifecycle, manual `anyio` cancel-scope cleanup, hand-rolled schema translation, custom sampling bridge, credential stripping, and reconnection backoff.
**Problem:** The Model Context Protocol is evolving rapidly. Maintaining a custom 2K-line client means every protocol revision requires manual patches. The official SDK already handles transport negotiation, lifecycle management, and type-safe schema generation.
**Migration Plan:**
1. Add `mcp>=1.0.0` to dependencies.
2. Build a thin `HermesMCPBridge` class that instantiates `mcp.ClientSession`, maps MCP `Tool` schemas to Hermes registry calls, forwards tool invocations, and preserves the sampling callback.
3. Deprecate the `_mcp_loop` background thread and `anyio`-based transport code.
4. Add integration tests against a test MCP server.
**Lines Saved:** ~1,600
**Risk:** Medium — sampling and timeout behavior need parity testing.
#### Cron Scheduler → APScheduler
**Current:** `cron/jobs.py` (753 lines) + `cron/scheduler.py` (~740 lines) use a JSON file as the job store, custom `parse_duration` and `compute_next_run` logic, a manual tick loop, and ad-hoc delivery orchestration.
**Problem:** Scheduling is a solved problem. The homegrown system lacks timezone support, job concurrency controls, graceful clustering, and durable execution guarantees.
**Migration Plan:**
1. Introduce `APScheduler` with a `SQLAlchemyJobStore` (or custom JSON store).
2. Refactor each Hermes cron job into an APScheduler `Job` function.
3. Preserve existing delivery logic (`_deliver_result`, `_build_job_prompt`, `_run_job_script`) as the job body.
4. Migrate `jobs.json` entries into APScheduler jobs on first run.
5. Expose `/cron` status via a thin CLI wrapper.
**Lines Saved:** ~700
**Risk:** Low — delivery logic is preserved; only the trigger mechanism changes.
#### Config Management → `pydantic-settings`
**Current:** `hermes_cli/config.py` (2,589 lines) uses manual YAML parsing with hardcoded defaults, a complex migration chain (`_config_version` currently at 11), no runtime type validation, and stringly-typed env var resolution.
**Problem:** Every new config option requires touching multiple places. Migration logic is ~400 lines and growing. Typo'd config values are only caught at runtime, often deep in the agent loop.
**Migration Plan:**
1. Define a `HermesConfig` Pydantic model with nested sections (`ModelConfig`, `ProviderConfig`, `AgentConfig`, `CompressionConfig`, etc.).
2. Use `pydantic-settings`'s `SettingsConfigDict(yaml_file="~/.hermes/config.yaml")` to auto-load.
3. Map env vars via `env_prefix="HERMES_"` or field-level `validation_alias`.
4. Keep the migration layer as a one-time upgrade function, then remove it after two releases.
5. Replace `load_config()` call sites with `HermesConfig()` instantiation.
**Lines Saved:** ~1,500
**Risk:** Medium-High — large blast radius; every module reads config. Requires backward compatibility.
### 2.3 P1 Replacements
**Checkpoint Manager → `dulwich`**
- Replace `subprocess.run(["git", ...])` calls with `dulwich.porcelain` equivalents.
- Use `dulwich.repo.Repo.init_bare()` for shadow repos.
- Snapshotting becomes an in-memory `Index` write + `commit()`.
- **Lines Saved:** ~200
- **Risk:** Low
**Auth / Credential Pool → `authlib` + `keyring` + `PyJWT`**
- Use `authlib` for OAuth2 session and token refresh.
- Replace custom JWT decoding with `PyJWT`.
- Migrate the auth store JSON to `keyring`-backed secure storage where available.
- Keep Hermes-specific credential pool strategies (round-robin, least-used, etc.).
- **Lines Saved:** ~800
- **Risk:** Medium
**Batch Runner → `joblib`**
- For typical local batch sizes, `joblib.Parallel(n_jobs=-1, backend='loky')` replaces the custom worker pool.
- Only migrate to Celery if cross-machine distribution is required.
- **Lines Saved:** ~400
- **Risk:** Low for `joblib`
### 2.4 Execution Roadmap
1. **Week 1-2:** Migrate Checkpoint Manager to `dulwich` (quick win, low risk)
2. **Week 3-4:** Migrate Cron Scheduler to `APScheduler` (high value, well-contained)
3. **Week 5-8:** Migrate MCP Client to official `mcp` SDK (highest complexity, highest payoff)
4. **Week 9-12:** Migrate Config Management to `pydantic-settings` (largest blast radius, do last)
5. **Ongoing:** Evaluate Auth/Credential Pool and Batch Runner replacements as follow-up epics.
### 2.5 Cost-Benefit Summary
| Metric | Value |
|--------|-------|
| Total homebrew lines audited | ~17,000 |
| Lines recommended for replacement | ~6,300 |
| Estimated dev weeks (P0 + P1) | 10-14 weeks |
| New runtime dependencies added | 4-6 well-maintained packages |
| Maintenance burden reduction | Very High |
| Risk level | Medium (mitigated by strong test coverage) |
---
## 3. Strategic Initiative: Operation Get A Job
### 3.1 Thesis
The engineering collective is capable of 10x delivery velocity compared to typical market offerings. The strategic opportunity is to monetize this capability through pure contracting — high-tempo, fixed-scope engagements with no exclusivity or employer-like constraints.
### 3.2 Service Menu
**Tier A — White-Glove Agent Infrastructure ($400-600/hr)**
- Custom AI agent deployment with tool use (Slack, Discord, Telegram, webhooks)
- MCP server development
- Local LLM stack setup (on-premise / VPC)
- Agent security audit and red teaming
**Tier B — Security Hardening & Code Review ($250-400/hr)**
- Security backlog burn-down (CVE-class bugs)
- Skills-guard / sandbox hardening
- Architecture review
**Tier C — Automation & Integration ($150-250/hr)**
- Webhook-to-action pipelines
- Research and intelligence reporting
- Content-to-code workflows
### 3.3 Engagement Packages
| Service | Description | Timeline | Investment |
|---------|-------------|----------|------------|
| Agent Security Audit | Review of one AI agent pipeline + written findings | 2-3 business days | $4,500 |
| MCP Server Build | One custom MCP server with 3-5 tools + docs + tests | 1-2 weeks | $8,000 |
| Custom Bot Deployment | End-to-end bot with up to 5 tools, deployed to client platform | 2-3 weeks | $12,000 |
| Security Sprint | Close top 5 security issues in a Python/JS repo | 1-2 weeks | $6,500 |
| Monthly Retainer — Core | 20 hrs/month prioritized engineering + triage | Ongoing | $6,000/mo |
| Monthly Retainer — Scale | 40 hrs/month prioritized engineering + on-call | Ongoing | $11,000/mo |
### 3.4 Go-to-Market Motion
**Immediate channels:**
- Cold outbound to CTOs/VPEs at Series A-C AI startups
- LinkedIn authority content (architecture reviews, security bulletins)
- Platform presence (Gun.io, Toptal, Upwork for specific niche keywords)
**Lead magnet:** Free 15-minute architecture review. No pitch. One concrete risk identified.
### 3.5 Infrastructure Foundation
The Hermes Agent framework serves as both the delivery platform and the portfolio piece:
- Open-source runtime with ~3,000 tests
- Gateway architecture supporting 8+ messaging platforms
- Native MCP client, cron scheduling, subagent delegation
- Self-hosted Forge (Gitea) with CI and automated PR review
- Local Gemma 4 inference stack on bare metal
### 3.6 90-Day Revenue Model
| Month | Target |
|-------|--------|
| Month 1 | $9-12K (1x retainer or 2x audits) |
| Month 2 | $17K (+ 1x MCP build) |
| Month 3 | $29K (+ 1x bot deployment + new retainer) |
### 3.7 Immediate Action Items
- File Wyoming LLC and obtain EIN
- Open Mercury business bank account
- Secure E&O insurance
- Update LinkedIn profile and publish first authority post
- Customize capabilities deck and begin warm outbound
---
## 4. Fleet Status Summary
| House | Host | Model / Provider | Gateway Status |
|-------|------|------------------|----------------|
| Ezra | Hermes VPS | `kimi-for-coding` (Kimi K2.5) | API `8658`, webhook `8648` — Active |
| Bezalel | Hermes VPS | Claude Opus 4.6 (Anthropic) | Port `8645` — Active |
| Allegro-Primus | Hermes VPS | Kimi K2.5 | Port `8644` — Requires restart |
| Bilbo | External | Gemma 4B (local) | Telegram dual-mode — Active |
**Network:** Hermes VPS public IP `143.198.27.163` (Ubuntu 24.04.3 LTS). Local Gemma 4 fallback on `127.0.0.1:11435`.
---
## 5. Conclusion
The codebase is in a strong position: security is hardened, the agent loop is more resilient, and a clear roadmap exists to replace high-maintenance homegrown infrastructure with battle-tested open-source projects. The commercialization strategy is formalized and ready for execution. The next critical path is the human-facing work of entity formation, sales outreach, and closing the first fixed-scope engagement.
Prepared by **Ezra**
April 2026

Binary file not shown.

455
tests/test_observatory.py Normal file
View File

@@ -0,0 +1,455 @@
"""
Tests for observatory.py — health monitoring & alerting.
Refs #147
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
PROJECT_ROOT = Path(__file__).parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import observatory as obs
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cfg(tmp_path):
"""Return an ObservatoryConfig pointing at a temp directory."""
cfg = obs.ObservatoryConfig()
cfg.db_path = tmp_path / "observatory.db"
cfg.alert_chat_id = "99999"
cfg.digest_chat_id = "99999"
cfg.telegram_token = "fake-token"
cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound
cfg.api_url = "http://127.0.0.1:19998/health"
return cfg
# ---------------------------------------------------------------------------
# Config tests
# ---------------------------------------------------------------------------
class TestObservatoryConfig:
def test_defaults(self):
c = obs.ObservatoryConfig()
assert c.disk_warn_pct == 80.0
assert c.disk_crit_pct == 90.0
assert c.mem_warn_pct == 80.0
assert c.mem_crit_pct == 90.0
assert c.cpu_warn_pct == 80.0
assert c.cpu_crit_pct == 95.0
assert c.poll_interval == 60
assert c.webhook_latency_slo_ms == 2000.0
assert c.gateway_uptime_slo_pct == 99.5
def test_from_env_overrides(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70")
monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30")
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345")
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123")
c = obs.ObservatoryConfig.from_env()
assert c.disk_warn_pct == 70.0
assert c.poll_interval == 30
assert c.alert_chat_id == "12345"
assert c.telegram_token == "tok123"
def test_digest_chat_falls_back_to_alert(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc")
monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False)
c = obs.ObservatoryConfig.from_env()
assert c.digest_chat_id == "abc"
# ---------------------------------------------------------------------------
# CheckResult / HealthSnapshot tests
# ---------------------------------------------------------------------------
class TestHealthSnapshot:
def _make_snapshot(self, statuses):
checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)]
return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks)
def test_overall_ok(self):
snap = self._make_snapshot(["ok", "ok"])
assert snap.overall_status == "ok"
def test_overall_warn(self):
snap = self._make_snapshot(["ok", "warn"])
assert snap.overall_status == "warn"
def test_overall_critical(self):
snap = self._make_snapshot(["ok", "warn", "critical"])
assert snap.overall_status == "critical"
def test_overall_error(self):
snap = self._make_snapshot(["ok", "error"])
assert snap.overall_status == "critical"
def test_to_dict(self):
snap = self._make_snapshot(["ok"])
d = snap.to_dict()
assert d["overall"] == "ok"
assert isinstance(d["checks"], list)
assert d["checks"][0]["name"] == "c0"
# ---------------------------------------------------------------------------
# Individual check tests
# ---------------------------------------------------------------------------
class TestCheckGatewayLiveness:
def test_running(self):
with patch("gateway.status.is_gateway_running", return_value=True), \
patch("gateway.status.get_running_pid", return_value=12345):
result = obs.check_gateway_liveness()
assert result.status == "ok"
assert "12345" in result.message
def test_not_running(self):
with patch("gateway.status.is_gateway_running", return_value=False), \
patch("gateway.status.get_running_pid", return_value=None):
result = obs.check_gateway_liveness()
assert result.status == "critical"
def test_import_error(self):
import builtins
real_import = builtins.__import__
def mock_import(name, *args, **kwargs):
if name == "gateway.status":
raise ImportError("no module")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=mock_import):
result = obs.check_gateway_liveness()
assert result.status in ("error", "critical", "ok") # graceful
class TestCheckDisk:
def test_ok(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 50.0
mock_usage.free = 10 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "ok"
assert result.value == 50.0
def test_warn(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 85.0
mock_usage.free = 3 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 92.0
mock_usage.free = 1 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "critical"
def test_no_psutil(self, cfg, monkeypatch):
monkeypatch.setattr(obs, "_PSUTIL", False)
result = obs.check_disk(cfg)
assert result.status == "error"
class TestCheckMemory:
def test_ok(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 60.0
mock_mem.available = 4 * 1024 ** 3
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "ok"
def test_critical(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 95.0
mock_mem.available = 512 * 1024 ** 2
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "critical"
class TestCheckCPU:
def test_ok(self, cfg):
with patch("psutil.cpu_percent", return_value=40.0):
result = obs.check_cpu(cfg)
assert result.status == "ok"
def test_warn(self, cfg):
with patch("psutil.cpu_percent", return_value=85.0):
result = obs.check_cpu(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
with patch("psutil.cpu_percent", return_value=98.0):
result = obs.check_cpu(cfg)
assert result.status == "critical"
class TestCheckDatabase:
def test_ok(self, cfg):
obs._init_db(cfg.db_path)
result = obs.check_database(cfg)
assert result.status == "ok"
def test_not_yet_created(self, cfg):
# db_path does not exist
result = obs.check_database(cfg)
assert result.status == "warn"
class TestCheckHTTP:
def test_webhook_connection_refused(self, cfg):
result = obs.check_webhook_http(cfg)
# Port 19999 is not bound — should get a "not reachable" warn
assert result.status in ("warn", "error")
def test_api_server_connection_refused(self, cfg):
result = obs.check_api_server_http(cfg)
assert result.status in ("warn", "error")
def test_webhook_ok(self, cfg):
import urllib.error
from unittest.mock import patch, MagicMock
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 200
mock_resp.read.return_value = b'{"status":"ok"}'
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status in ("ok", "warn")
def test_webhook_http_error(self, cfg):
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 503
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status == "critical"
# ---------------------------------------------------------------------------
# Persistence tests
# ---------------------------------------------------------------------------
class TestPersistence:
def test_store_and_load(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[obs.CheckResult(name="test", status="ok", message="fine")],
)
obs.store_snapshot(cfg, snap)
loaded = obs.load_snapshots(cfg, days=30)
assert len(loaded) == 1
assert loaded[0]["overall"] == "ok"
def test_retention_pruning(self, cfg):
obs._init_db(cfg.db_path)
# Insert an old record directly
with obs._db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'),
)
snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[],
)
obs.store_snapshot(cfg, snap)
# Old record should have been pruned
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0]
assert count == 0
def test_record_alert_sent(self, cfg):
obs._init_db(cfg.db_path)
obs.record_alert_sent(cfg, "gateway_process", "critical", "not running")
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0]
assert count == 1
# ---------------------------------------------------------------------------
# Alerting tests
# ---------------------------------------------------------------------------
class TestAlerting:
def _snap(self, status):
return obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status=status, message="test")],
)
def test_no_alert_when_ok(self, cfg):
snap = self._snap("ok")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_alert_on_new_critical(self, cfg):
snap = self._snap("critical")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
assert len(alerts) == 1
def test_no_duplicate_alert(self, cfg):
snap = self._snap("critical")
prev = self._snap("critical") # already critical
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_recovery_alert(self, cfg):
snap = self._snap("ok")
prev = self._snap("critical")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
def test_no_alert_without_token(self, cfg):
cfg.telegram_token = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
def test_no_alert_without_chat_id(self, cfg):
cfg.alert_chat_id = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
# ---------------------------------------------------------------------------
# Digest tests
# ---------------------------------------------------------------------------
class TestDigest:
def test_empty_digest(self, cfg):
obs._init_db(cfg.db_path)
digest = obs.build_digest(cfg)
assert "no health data" in digest.lower() or "24 hours" in digest.lower()
def test_digest_with_data(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone, timedelta
ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[
obs.CheckResult(name="gateway_process", status="ok", message="running"),
obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"),
obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"),
],
)
obs.store_snapshot(cfg, snap)
digest = obs.build_digest(cfg)
assert "Daily Digest" in digest
assert "Gateway" in digest or "gateway" in digest
def test_send_digest_no_token(self, cfg):
cfg.telegram_token = None
obs._init_db(cfg.db_path)
result = obs.send_digest(cfg)
assert result is False
# ---------------------------------------------------------------------------
# SLO tests
# ---------------------------------------------------------------------------
class TestSLO:
def test_slo_definitions_complete(self):
assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS
assert "webhook_latency_ms" in obs.SLO_DEFINITIONS
assert "api_server_latency_ms" in obs.SLO_DEFINITIONS
def test_slo_targets(self):
assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5
assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000
# ---------------------------------------------------------------------------
# CLI tests
# ---------------------------------------------------------------------------
class TestCLI:
def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
ok_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="all_good", status="ok", message="fine")],
)
with patch("observatory.collect_snapshot", return_value=ok_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc == 0
def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
bad_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")],
)
with patch("observatory.collect_snapshot", return_value=bad_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc != 0
def test_digest_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--digest"])
assert rc == 0
def test_slo_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--slo"])
assert rc == 0
def test_history_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--history", "5"])
assert rc == 0