Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
477ec86467 feat: harden Bezalel tailscale bootstrap packet (#535)
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 43s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 30s
Smoke Test / smoke (pull_request) Failing after 28s
Agent PR Gate / report (pull_request) Successful in 7s
2026-04-22 00:08:33 -04:00
Alexander Whitestone
f83fdb7d55 test: cover hardened Bezalel Tailscale bootstrap packet (#535) 2026-04-22 00:07:32 -04:00
9 changed files with 188 additions and 556 deletions

View File

@@ -169,14 +169,6 @@ _config_version: 9
session_reset:
mode: none
idle_minutes: 0
blackboard:
enabled: true
redis:
url: redis://localhost:6379/0
password: ""
keyspace_prefix: timmy
ttl_seconds: 3600
fallback_to_memory: true
custom_providers:
- name: Local Ollama
base_url: http://localhost:11434/v1

View File

@@ -0,0 +1,96 @@
# Bezalel Tailscale Bootstrap
Refs #535
This is the repo-side operator packet for installing Tailscale on the Bezalel VPS and verifying the internal network path for federation work.
Important truth:
- issue #535 names `104.131.15.18`
- older Bezalel control-plane docs also mention `159.203.146.185`
- the current source of truth in this repo is `ansible/inventory/hosts.ini`, which currently resolves `bezalel` to `67.205.155.108`
Because of that drift, `scripts/bezalel_tailscale_bootstrap.py` now resolves the target host from `ansible/inventory/hosts.ini` by default instead of trusting a stale hardcoded IP.
## What the script does
`python3 scripts/bezalel_tailscale_bootstrap.py`
Safe by default:
- builds the remote bootstrap script
- writes it locally to `/tmp/bezalel_tailscale_bootstrap.sh`
- prints the SSH command needed to run it
- does **not** touch the VPS unless `--apply` is passed
When applied, the remote script does all of the issues repo-side bootstrap steps:
- installs Tailscale
- runs `tailscale up --ssh --hostname bezalel`
- appends the provided Mac SSH public key to `~/.ssh/authorized_keys`
- prints `tailscale status --json`
- pings the expected peer targets:
- Mac: `100.124.176.28`
- Ezra: `100.126.61.75`
## Required secrets / inputs
- Tailscale auth key
- Mac SSH public key
Provide them either directly or through files:
- `--auth-key` or `--auth-key-file`
- `--ssh-public-key` or `--ssh-public-key-file`
## Dry-run example
```bash
python3 scripts/bezalel_tailscale_bootstrap.py \
--auth-key-file ~/.config/tailscale/auth_key \
--ssh-public-key-file ~/.ssh/id_ed25519.pub \
--json
```
This prints:
- resolved host
- host source (`inventory:<path>` when pulled from `ansible/inventory/hosts.ini`)
- local script path
- SSH command to execute
- peer targets
## Apply example
```bash
python3 scripts/bezalel_tailscale_bootstrap.py \
--auth-key-file ~/.config/tailscale/auth_key \
--ssh-public-key-file ~/.ssh/id_ed25519.pub \
--apply \
--json
```
## Verifying success after apply
The script now parses the remote stdout into structured verification data:
- `verification.tailscale.self.tailscale_ips`
- `verification.tailscale.self.dns_name`
- `verification.peers`
- `verification.ping_ok`
A successful run should show:
- at least one Bezalel Tailscale IP under `tailscale_ips`
- `ping_ok.mac = 100.124.176.28`
- `ping_ok.ezra = 100.126.61.75`
## Expected remote install commands
```bash
curl -fsSL https://tailscale.com/install.sh | sh
tailscale up --ssh --hostname bezalel
install -d -m 700 ~/.ssh
touch ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys
tailscale status --json
```
## Why this PR does not claim live completion
This repo can safely ship the bootstrap script, host resolution logic, structured proof parsing, and operator packet.
It cannot honestly claim that Bezalel was actually joined to the tailnet unless a human/operator runs the script with a real auth key and real SSH access to the VPS.
That means the correct PR language for #535 is advancement, not pretend closure.

View File

@@ -14,6 +14,7 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
| Run nightly codebase genome pass | timmy-home | `python3 scripts/codebase_genome_nightly.py --dry-run` |
| Prepare Bezalel Tailscale bootstrap | timmy-home | `python3 scripts/bezalel_tailscale_bootstrap.py --auth-key-file <path> --ssh-public-key-file <path> --json` |
## the-nexus (Frontend + Brain)

View File

@@ -1,19 +0,0 @@
# Local Redis Blackboard for Agent Coordination
This directory contains the Redis deployment for the Timmy Home "Blackboard" — a
shared coordination layer for multi-agent orchestration.
## Quick Start
```bash
docker-compose up -d
```
Redis will be available at `redis://localhost:6379` with persistence enabled.
## Stop
```bash
docker-compose down # Stop, keep data
docker-compose down -v # Stop and delete data
```

View File

@@ -1,18 +0,0 @@
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: timmy-redis
restart: unless-stopped
ports:
- "6379:6379"
volumes:
- ./data:/data
command: ["redis-server", "--appendonly", "yes"]
networks:
- timmy-network
networks:
timmy-network:
driver: bridge

View File

@@ -16,11 +16,14 @@ import argparse
import json
import shlex
import subprocess
import re
from json import JSONDecoder
from pathlib import Path
from typing import Any
DEFAULT_HOST = "159.203.146.185"
DEFAULT_HOST = "67.205.155.108"
DEFAULT_HOSTNAME = "bezalel"
DEFAULT_INVENTORY_PATH = Path(__file__).resolve().parents[1] / "ansible" / "inventory" / "hosts.ini"
DEFAULT_PEERS = {
"mac": "100.124.176.28",
"ezra": "100.126.61.75",
@@ -66,6 +69,37 @@ def parse_tailscale_status(payload: dict[str, Any]) -> dict[str, Any]:
}
def resolve_host(host: str | None, inventory_path: Path = DEFAULT_INVENTORY_PATH, hostname: str = DEFAULT_HOSTNAME) -> tuple[str, str]:
if host:
return host, "explicit"
if inventory_path.exists():
pattern = re.compile(rf"^{re.escape(hostname)}\s+.*ansible_host=([^\s]+)")
for line in inventory_path.read_text().splitlines():
match = pattern.search(line.strip())
if match:
return match.group(1), f"inventory:{inventory_path}"
return DEFAULT_HOST, "default"
def parse_apply_output(stdout: str) -> dict[str, Any]:
result: dict[str, Any] = {"tailscale": None, "ping_ok": {}}
text = stdout or ""
start = text.find("{")
if start != -1:
try:
payload, _ = JSONDecoder().raw_decode(text[start:])
if isinstance(payload, dict):
result["tailscale"] = parse_tailscale_status(payload)
except Exception:
pass
for line in text.splitlines():
if line.startswith("PING_OK:"):
_, name, ip = line.split(":", 2)
result["ping_ok"][name] = ip
return result
def build_ssh_command(host: str, remote_script_path: str = "/tmp/bezalel_tailscale_bootstrap.sh") -> list[str]:
return ["ssh", host, f"bash {shlex.quote(remote_script_path)}"]
@@ -89,8 +123,9 @@ def parse_peer_args(items: list[str]) -> dict[str, str]:
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Prepare or execute Tailscale bootstrap for the Bezalel VPS.")
parser.add_argument("--host", default=DEFAULT_HOST)
parser.add_argument("--host")
parser.add_argument("--hostname", default=DEFAULT_HOSTNAME)
parser.add_argument("--inventory-path", type=Path, default=DEFAULT_INVENTORY_PATH)
parser.add_argument("--auth-key", help="Tailscale auth key")
parser.add_argument("--auth-key-file", type=Path, help="Path to file containing the Tailscale auth key")
parser.add_argument("--ssh-public-key", help="SSH public key to append to authorized_keys")
@@ -116,6 +151,7 @@ def main() -> None:
auth_key = _read_secret(args.auth_key, args.auth_key_file)
ssh_public_key = _read_secret(args.ssh_public_key, args.ssh_public_key_file)
peers = parse_peer_args(args.peer)
resolved_host, host_source = resolve_host(args.host, args.inventory_path, args.hostname)
if not auth_key:
raise SystemExit("Missing Tailscale auth key. Use --auth-key or --auth-key-file.")
@@ -126,28 +162,31 @@ def main() -> None:
write_script(args.script_out, script)
payload: dict[str, Any] = {
"host": args.host,
"host": resolved_host,
"host_source": host_source,
"hostname": args.hostname,
"inventory_path": str(args.inventory_path),
"script_out": str(args.script_out),
"remote_script_path": args.remote_script_path,
"ssh_command": build_ssh_command(args.host, args.remote_script_path),
"ssh_command": build_ssh_command(resolved_host, args.remote_script_path),
"peer_targets": peers,
"applied": False,
}
if args.apply:
result = run_remote(args.host, args.remote_script_path)
result = run_remote(resolved_host, args.remote_script_path)
payload["applied"] = True
payload["exit_code"] = result.returncode
payload["stdout"] = result.stdout
payload["stderr"] = result.stderr
payload["verification"] = parse_apply_output(result.stdout)
if args.json:
print(json.dumps(payload, indent=2))
return
print("--- Bezalel Tailscale Bootstrap ---")
print(f"Host: {args.host}")
print(f"Host: {resolved_host} ({host_source})")
print(f"Local script: {args.script_out}")
print("SSH command: " + " ".join(payload["ssh_command"]))
if args.apply:

View File

@@ -1,311 +0,0 @@
#!/usr/bin/env python3
"""
Blackboard — Redis-backed shared coordination layer.
Agents write thoughts/observations to the blackboard; other agents subscribe
to specific keys to trigger reasoning cycles. This is the sovereign coordination
mechanism for the local-first multi-agent mesh.
Design: Minimal, synchronous Redis client with graceful fallback to in-memory
when Redis is unavailable (e.g., during local dev without Docker).
SOUL.md: "Sovereignty and service always." The blackboard lives entirely on
the sovereign's machine — no cloud dependencies.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
logger = logging.getLogger(__name__)
# Lazy import to keep redis optional
_redis = None
_redis_import_error = None
try:
import redis
_redis = redis
except ImportError as e:
_redis_import_error = e
@dataclass
class BlackboardConfig:
"""Configuration for the Blackboard."""
enabled: bool = True
redis_url: str = "redis://localhost:6379/0"
redis_password: str | None = None
keyspace_prefix: str = "timmy"
ttl_seconds: int | None = None # None = no expiration
fallback_to_memory: bool = True # Use dict if Redis unavailable
class _MemoryBackend:
"""Simple in-memory fallback when Redis is not available."""
def __init__(self):
self._store: dict[str, str] = {}
self._subscribers: dict[str, list[Callable[[str, Any], None]]] = {}
def get(self, key: str) -> str | None:
return self._store.get(key)
def set(self, key: str, value: str, ttl: int | None = None) -> bool:
self._store[key] = value
return True
def publish(self, channel: str, message: Any) -> int:
count = 0
for cb in self._subscribers.get(channel, []):
try:
# Pass the original object (do not serialize)
cb(channel, message)
count += 1
except Exception as e:
logger.warning("MemoryBackend subscriber error: %s", e)
return count
def subscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
self._subscribers.setdefault(channel, []).append(callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
if channel in self._subscribers:
self._subscribers[channel].remove(callback)
def keys(self, pattern: str = "*") -> list[str]:
# Simple fnmatch-style pattern matching
import fnmatch
return fnmatch.filter(list(self._store.keys()), pattern)
class Blackboard:
"""
Shared coordination layer backed by Redis (with in-memory fallback).
Usage:
bb = Blackboard()
bb.set("agent:timmy:thought", "checking queue...")
value = bb.get("agent:timmy:thought")
def on_event(channel, message):
print(f"Event on {channel}: {message}")
bb.subscribe("dispatch:new", on_event)
bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
"""
def __init__(self, config: BlackboardConfig | None = None):
cfg = config or BlackboardConfig()
self.enabled = cfg.enabled
self.prefix = cfg.keyspace_prefix
self.ttl = cfg.ttl_seconds
self._backend: _MemoryBackend | Any
if not _redis:
if cfg.fallback_to_memory:
logger.warning(
"redis-py not installed; using in-memory fallback. "
"Install with: pip install redis"
)
self._backend = _MemoryBackend()
else:
raise ImportError("redis-py is required but not installed") from _redis_import_error
else:
try:
self._backend = _redis.from_url(
cfg.redis_url,
password=cfg.redis_password,
decode_responses=True,
)
# Test connection
self._backend.ping()
logger.info("Blackboard connected to Redis at %s", cfg.redis_url)
except Exception as e:
if cfg.fallback_to_memory:
logger.warning("Redis connection failed (%s); falling back to in-memory", e)
self._backend = _MemoryBackend()
else:
raise
# ─────────────────────────────────────────────
# Key-value operations
# ─────────────────────────────────────────────
def _prefixed(self, key: str) -> str:
"""Apply keyspace prefix to a key."""
return f"{self.prefix}:{key}" if self.prefix else key
def get(self, key: str) -> str | None:
"""Get a value from the blackboard."""
return self._backend.get(self._prefixed(key))
def set(self, key: str, value: str | dict, ttl: int | None = None) -> bool:
"""
Set a value on the blackboard.
Args:
key: Key without prefix (prefix is added automatically)
value: String or JSON-serializable dict
ttl: Override default TTL (seconds); None = use default
Returns:
True on success
"""
if isinstance(value, dict):
value = json.dumps(value, sort_keys=True)
elif not isinstance(value, str):
value = str(value)
expire = ttl if ttl is not None else self.ttl
result = self._backend.set(self._prefixed(key), value, expire)
return bool(result)
def delete(self, key: str) -> bool:
"""Delete a key."""
try:
return bool(self._backend.delete(self._prefixed(key)))
except AttributeError:
# MemoryBackend
k = self._prefixed(key)
if k in self._backend._store:
del self._backend._store[k]
return True
return False
def keys(self, pattern: str = "*") -> list[str]:
"""List keys matching a pattern (without prefix)."""
full_pattern = self._prefixed(pattern)
raw_keys = self._backend.keys(full_pattern)
# Strip prefix
prefix_len = len(self.prefix) + 1 if self.prefix else 0
return [k[prefix_len:] if k.startswith(f"{self.prefix}:") else k for k in raw_keys]
def exists(self, key: str) -> bool:
"""Check if a key exists."""
try:
return bool(self._backend.exists(self._prefixed(key)))
except AttributeError:
# MemoryBackend
return self._prefixed(key) in self._backend._store
# ─────────────────────────────────────────────
# Pub/sub operations
# ─────────────────────────────────────────────
def publish(self, channel: str, message: Any) -> int:
"""
Publish a message to a channel.
Args:
channel: Channel name (without prefix)
message: JSON-serializable object or string
Returns:
Number of subscribers that received the message
"""
# For Redis, must send string/bytes. For MemoryBackend, pass object.
if isinstance(self._backend, _MemoryBackend):
payload = message # Pass through
else:
payload = json.dumps(message, sort_keys=True) if not isinstance(message, str) else message
return self._backend.publish(self._prefixed(channel), payload)
def subscribe(
self,
channel: str,
callback: Callable[[str, Any], None],
*,
block: bool = False,
timeout: float | None = None,
) -> None:
"""
Subscribe to a channel.
Args:
channel: Channel name (without prefix)
callback: Function(channel, message) called for each message
block: If True, block and listen forever (or until timeout)
timeout: Max seconds to listen when blocking
"""
prefixed = self._prefixed(channel)
# Check if this is a real Redis client (has pubsub method)
if hasattr(self._backend, 'pubsub') and callable(getattr(self._backend, 'pubsub', None)):
# Real Redis pub/sub
import threading
pubsub = self._backend.pubsub()
pubsub.subscribe(prefixed)
def listener():
for msg in pubsub.listen():
if msg['type'] == 'message':
try:
data = json.loads(msg['data'])
except (json.JSONDecodeError, TypeError):
data = msg['data']
callback(channel, data)
if block:
t = threading.Thread(target=listener, daemon=True)
t.start()
if timeout:
t.join(timeout)
else:
t.join()
else:
# Fire-and-forget thread
threading.Thread(target=listener, daemon=True).start()
else:
# MemoryBackend — synchronous callback registration
self._backend.subscribe(prefixed, callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe from a channel."""
try:
self._backend.unsubscribe(self._prefixed(channel), callback)
except AttributeError:
pass # MemoryBackend supports it
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def clear_namespace(self, pattern: str = "*") -> int:
"""Delete all keys matching pattern in this namespace."""
full = self._prefixed(pattern)
try:
keys = self._backend.keys(full)
if keys:
return self._backend.delete(*keys)
return 0
except AttributeError:
store_keys = list(self._backend._store.keys())
import fnmatch
matched = fnmatch.filter(store_keys, full)
for k in matched:
del self._backend._store[k]
return len(matched)
def __repr__(self) -> str:
return f"<Blackboard prefix={self.prefix!r} backend={type(self._backend).__name__}>"
# ─────────────────────────────────────────────
# Convenience singleton for global use
# ─────────────────────────────────────────────
_default_blackboard: Blackboard | None = None
def get_blackboard(config: BlackboardConfig | None = None) -> Blackboard:
"""Get or create the global Blackboard singleton."""
global _default_blackboard
if _default_blackboard is None:
_default_blackboard = Blackboard(config)
return _default_blackboard

View File

@@ -2,9 +2,12 @@ from scripts.bezalel_tailscale_bootstrap import (
DEFAULT_PEERS,
build_remote_script,
build_ssh_command,
parse_apply_output,
parse_peer_args,
parse_tailscale_status,
resolve_host,
)
from pathlib import Path
def test_build_remote_script_contains_install_up_and_key_append():
@@ -78,3 +81,46 @@ def test_parse_peer_args_merges_overrides_into_defaults():
"ezra": "100.126.61.76",
"forge": "100.70.0.9",
}
def test_resolve_host_prefers_inventory_over_stale_default(tmp_path: Path):
inventory = tmp_path / "hosts.ini"
inventory.write_text(
"[fleet]\n"
"ezra ansible_host=143.198.27.163 ansible_user=root\n"
"bezalel ansible_host=67.205.155.108 ansible_user=root\n"
)
host, source = resolve_host(None, inventory)
assert host == "67.205.155.108"
assert source == f"inventory:{inventory}"
def test_parse_apply_output_extracts_status_and_ping_markers():
stdout = (
'{"Self": {"HostName": "bezalel", "DNSName": "bezalel.tailnet.ts.net", "TailscaleIPs": ["100.90.0.10"]}, '
'"Peer": {"node-1": {"HostName": "ezra", "TailscaleIPs": ["100.126.61.75"]}}}'
"\nPING_OK:mac:100.124.176.28\n"
"PING_OK:ezra:100.126.61.75\n"
)
result = parse_apply_output(stdout)
assert result["tailscale"]["self"]["tailscale_ips"] == ["100.90.0.10"]
assert result["ping_ok"] == {"mac": "100.124.176.28", "ezra": "100.126.61.75"}
def test_runbook_doc_exists_and_mentions_inventory_auth_and_peer_checks():
doc = Path("docs/BEZALEL_TAILSCALE_BOOTSTRAP.md")
assert doc.exists(), "missing docs/BEZALEL_TAILSCALE_BOOTSTRAP.md"
text = doc.read_text()
assert "ansible/inventory/hosts.ini" in text
assert "tailscale up" in text
assert "authorized_keys" in text
assert "100.124.176.28" in text
assert "100.126.61.75" in text
runbook = Path("docs/RUNBOOK_INDEX.md").read_text()
assert "Prepare Bezalel Tailscale bootstrap" in runbook
assert "scripts/bezalel_tailscale_bootstrap.py" in runbook

View File

@@ -1,194 +0,0 @@
"""
Smoke tests for Blackboard — ensures the Redis-backed coordination layer
works with both real Redis and in-memory fallback.
"""
import json
import time
import pytest
from src.timmy.blackboard import Blackboard, BlackboardConfig, _MemoryBackend
class TestBlackboardBasics:
"""Test core key-value operations."""
def test_kv_memory_backend(self):
"""KV operations work using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Set and get
assert bb.set("test:key", "hello") is True
assert bb.get("test:key") == "hello"
# Dict serialization
assert bb.set("test:obj", {"a": 1, "b": 2}) is True
val = bb.get("test:obj")
assert json.loads(val) == {"a": 1, "b": 2}
# Exists
assert bb.exists("test:key") is True
assert bb.exists("missing") is False
# Delete
assert bb.delete("test:key") is True
assert bb.get("test:key") is None
# Keys with prefix
bb.set("agent:timmy:state", "ready")
bb.set("agent:ezra:state", "idle")
keys = bb.keys("agent:*:state")
assert len(keys) == 2
assert "timmy" in keys[0] or "ezra" in keys[0]
# Clear namespace
assert bb.clear_namespace("agent:*") == 2
assert bb.keys("agent:*") == []
class TestBlackboardPubSub:
"""Test pub/sub coordination patterns."""
def test_pubsub_memory_backend(self):
"""Publish/subscribe works using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
received = []
def callback(channel, message):
received.append((channel, message))
bb.subscribe("dispatch:new", callback)
# Publish
count = bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
assert count == 1
assert len(received) == 1
ch, msg = received[0]
assert ch == "dispatch:new"
assert msg == {"issue": 123, "action": "comment"}
bb.unsubscribe("dispatch:new", callback)
bb.publish("dispatch:new", {"should": "not arrive"})
assert len(received) == 1 # no new messages
def test_publish_without_subscribers(self):
"""Publish returns 0 when no subscribers."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
count = bb.publish("empty:channel", {"msg": 1})
assert count == 0
class TestBlackboardConfig:
"""Test configuration parsing and validation."""
def test_default_config(self):
cfg = BlackboardConfig()
assert cfg.enabled is True
assert cfg.redis_url == "redis://localhost:6379/0"
assert cfg.keyspace_prefix == "timmy"
assert cfg.ttl_seconds == 3600
assert cfg.fallback_to_memory is True
def test_custom_config(self):
cfg = BlackboardConfig(
enabled=False,
redis_url="redis://192.168.1.10:6379/1",
keyspace_prefix="myagent",
ttl_seconds=1800,
fallback_to_memory=False,
)
assert cfg.enabled is False
assert cfg.redis_url == "redis://192.168.1.10:6379/1"
assert cfg.keyspace_prefix == "myagent"
assert cfg.ttl_seconds == 1800
assert cfg.fallback_to_memory is False
class TestKeyspacePrefix:
"""Test that keys are correctly prefixed."""
def test_prefixed_keys(self):
bb = Blackboard(BlackboardConfig(keyspace_prefix="myagent", fallback_to_memory=True))
bb.set("thought", "test")
# Internal key should be "myagent:thought"
# We can verify by checking keys()
keys = bb.keys("*")
assert any("myagent:thought" in k for k in keys)
class TestBlackboardIntegration:
"""Integration pattern: agent thought cycle."""
def test_agent_thought_cycle(self):
"""Simulate Timmy writing a thought and Ezra reading it."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Agent A writes observation
bb.set("agent:timmy:observation", "Gitea queue has 12 open issues")
# Agent B reads
obs = bb.get("agent:timmy:observation")
assert obs == "Gitea queue has 12 open issues"
# Agent B writes analysis
bb.set("agent:ezra:analysis", "Prioritize critical bugs first")
# Event-driven pattern
events = []
def on_plan(channel, message):
events.append(message)
bb.subscribe("fleet:plan", on_plan)
bb.publish("fleet:plan", {"phase": "triaging", "lead": "ezra"})
assert len(events) == 1
assert events[0]["phase"] == "triaging"
class TestTTL:
"""Test TTL handling (where supported)."""
def test_ttl_set_in_config(self):
cfg = BlackboardConfig(ttl_seconds=60, fallback_to_memory=True)
bb = Blackboard(cfg)
assert bb.ttl == 60
# Setting a value uses TTL from config
bb.set("temp:key", "expiring value")
# In memory backend ignores TTL, but value is set
assert bb.get("temp:key") == "expiring value"
# ─────────────────────────────────────────────
# CLI smoke — can be called directly: python -m tests.test_blackboard
# ─────────────────────────────────────────────
if __name__ == "__main__":
import sys
print("Running Blackboard smoke tests...")
suite = [
TestBlackboardBasics().test_kv_memory_backend,
TestBlackboardPubSub().test_pubsub_memory_backend,
TestBlackboardConfig().test_default_config,
TestBlackboardIntegration().test_agent_thought_cycle,
]
failures = 0
for test in suite:
name = test.__name__
try:
test()
print(f"{name}")
except AssertionError as e:
print(f"{name}: {e}")
failures += 1
except Exception as e:
print(f"{name}: ERROR — {e}")
failures += 1
print(f"\nRan {len(suite)} tests, {failures} failures")
sys.exit(failures)