Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35
682d39ee15 feat(blackboard): add local Redis-backed coordination layer
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 25s
Smoke Test / smoke (pull_request) Failing after 20s
Agent PR Gate / gate (pull_request) Failing after 41s
Agent PR Gate / report (pull_request) Successful in 6s
Create the "Blackboard" for multi-agent coordination:
- infrastructure/redis/docker-compose.yml for local Redis deployment
- src/timmy/blackboard.py: Redis pub/sub + key-value store with in-memory fallback
- config.yaml: add blackboard configuration section
- tests/test_blackboard.py: smoke tests for KV and pub/sub

Agents can now write/read shared state and subscribe to events.
Deploy with: cd infrastructure/redis && docker-compose up -d

Closes #459
2026-04-26 12:14:13 -04:00
9 changed files with 550 additions and 670 deletions

View File

@@ -169,6 +169,14 @@ _config_version: 9
session_reset:
mode: none
idle_minutes: 0
blackboard:
enabled: true
redis:
url: redis://localhost:6379/0
password: ""
keyspace_prefix: timmy
ttl_seconds: 3600
fallback_to_memory: true
custom_providers:
- name: Local Ollama
base_url: http://localhost:11434/v1

View File

@@ -1,18 +0,0 @@
{
"owner": "Timmy_Foundation",
"repos": [
"timmy-home",
"timmy-config",
"fleet-ops",
"the-beacon",
"the-door",
"the-nexus"
],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6
}
}

View File

@@ -1,70 +0,0 @@
# Burn-down Velocity Tracking
Refs #519.
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
## What it emits
Daily run outputs:
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
Tracked repos live in `configs/burn_velocity_repos.json`.
## Cron command
```bash
cd ~/timmy-home && \
python3 scripts/burn_velocity_tracker.py \
--config configs/burn_velocity_repos.json \
--output-json ~/.timmy/burn-velocity/latest.json \
--output-md ~/.timmy/burn-velocity/latest.md \
--history-file ~/.timmy/burn-velocity/history.json \
--write-history
```
Example crontab entry:
```cron
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
```
## Dashboard handoff
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
- `open_now`
- `opened_last_7d`
- `closed_last_7d`
- `baseline_closed`
- `weekly_net`
- `alert.status`
- `alert.kind`
- `alert.reason`
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
## Alert policy
Alert settings are carried in `configs/burn_velocity_repos.json`:
- `recent_days`
- `baseline_days`
- `minimum_baseline_closed`
- `drop_ratio`
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
## Gitea API contract
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
Live collection shape:
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
## Honest scope boundary
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.

View File

@@ -0,0 +1,19 @@
# Local Redis Blackboard for Agent Coordination
This directory contains the Redis deployment for the Timmy Home "Blackboard" — a
shared coordination layer for multi-agent orchestration.
## Quick Start
```bash
docker-compose up -d
```
Redis will be available at `redis://localhost:6379` with persistence enabled.
## Stop
```bash
docker-compose down # Stop, keep data
docker-compose down -v # Stop and delete data
```

View File

@@ -0,0 +1,18 @@
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: timmy-redis
restart: unless-stopped
ports:
- "6379:6379"
volumes:
- ./data:/data
command: ["redis-server", "--appendonly", "yes"]
networks:
- timmy-network
networks:
timmy-network:
driver: bridge

View File

@@ -1,406 +0,0 @@
#!/usr/bin/env python3
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
Refs: timmy-home #519
"""
from __future__ import annotations
import argparse
import json
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import parse, request
from base64 import b64encode
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
DEFAULT_CONFIG = {
"owner": DEFAULT_OWNER,
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def parse_iso8601(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def normalize_today(value: str | date | None = None) -> date:
if value is None:
return datetime.now(timezone.utc).date()
if isinstance(value, date):
return value
return date.fromisoformat(value)
def build_day_window(today: date, lookback_days: int) -> list[date]:
start = today - timedelta(days=lookback_days - 1)
return [start + timedelta(days=offset) for offset in range(lookback_days)]
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [item for item in items if not item.get("pull_request")]
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
days = build_day_window(today, lookback_days)
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
start_day = days[0]
for item in filter_issue_items(items):
created_at = parse_iso8601(item.get("created_at"))
if created_at is not None:
created_day = created_at.date()
if start_day <= created_day <= today:
counts[created_day.isoformat()]["opened"] += 1
closed_at = parse_iso8601(item.get("closed_at"))
if closed_at is not None:
closed_day = closed_at.date()
if start_day <= closed_day <= today:
counts[closed_day.isoformat()]["closed"] += 1
return [
{
"date": day.isoformat(),
"opened": counts[day.isoformat()]["opened"],
"closed": counts[day.isoformat()]["closed"],
}
for day in days
]
def summarize_velocity_alert(
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
) -> dict[str, Any]:
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
drop_ratio = float(config.get("drop_ratio", 0.6))
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": (
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
),
}
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
}
return {
"status": "ok",
"kind": "none",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity stable",
}
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
if days <= 0:
return 0
return sum(int(item[field]) for item in daily[-days:])
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
if baseline_days <= 0:
return 0
if recent_days <= 0:
return sum(int(item["closed"]) for item in daily[-baseline_days:])
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
return sum(int(item["closed"]) for item in baseline_slice)
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
report_day = normalize_today(today)
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
owner = config.get("owner", DEFAULT_OWNER)
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
lookback_days = int(config.get("lookback_days", 14))
alert_config = dict(DEFAULT_CONFIG["alert"])
alert_config.update(config.get("alert") or {})
recent_days = int(alert_config.get("recent_days", 7))
baseline_days = int(alert_config.get("baseline_days", 7))
repo_reports: list[dict[str, Any]] = []
total_open_now = 0
total_closed_last_7d = 0
repos_with_alerts: list[str] = []
for repo_name in repos:
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
daily = build_daily_series(recent_issues, report_day, lookback_days)
open_now = len(open_issues)
opened_last_7d = _sum_window(daily, "opened", recent_days)
closed_last_7d = _sum_window(daily, "closed", recent_days)
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
weekly_net = opened_last_7d - closed_last_7d
alert = summarize_velocity_alert(
recent_closed=closed_last_7d,
baseline_closed=baseline_closed,
open_now=open_now,
config=alert_config,
)
repo_report = {
"repo": repo_name,
"open_now": open_now,
"opened_last_7d": opened_last_7d,
"closed_last_7d": closed_last_7d,
"baseline_closed": baseline_closed,
"weekly_net": weekly_net,
"daily": daily,
"alert": alert,
}
repo_reports.append(repo_report)
total_open_now += open_now
total_closed_last_7d += closed_last_7d
if alert["status"] != "ok":
repos_with_alerts.append(repo_name)
return {
"owner": owner,
"generated_at": generated_at,
"generated_day": report_day.isoformat(),
"lookback_days": lookback_days,
"dashboard_contract_version": 1,
"repos": repo_reports,
"summary": {
"total_open_now": total_open_now,
"total_closed_last_7d": total_closed_last_7d,
"repos_with_alerts": repos_with_alerts,
},
}
def render_markdown(report: dict[str, Any]) -> str:
lines = [
"# Burn-down Velocity Tracking",
"",
f"Generated: {report['generated_at']}",
f"Owner: {report['owner']}",
f"Lookback days: {report['lookback_days']}",
"",
"## Per-repo velocity",
"",
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
"| --- | ---: | ---: | ---: | ---: | --- |",
]
for repo in report["repos"]:
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
lines.append(
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
)
lines.extend(
[
"",
"## Dashboard handoff for timmy-config",
"",
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
"- `open_now`",
"- `opened_last_7d`",
"- `closed_last_7d`",
"- `baseline_closed`",
"- `alert.status` / `alert.kind` / `alert.reason`",
"",
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
"",
"## Alerts",
"",
]
)
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
if not alerts:
lines.append("- none")
else:
for repo in alerts:
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
return "\n".join(lines) + "\n"
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
if history_path.exists():
history = json.loads(history_path.read_text(encoding="utf-8"))
else:
history = {"days": []}
entry = {
"date": report["generated_day"],
"generated_at": report["generated_at"],
"summary": report["summary"],
"repos": report["repos"],
}
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
retained.append(entry)
retained.sort(key=lambda item: item["date"])
history["days"] = retained
history_path.parent.mkdir(parents=True, exist_ok=True)
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
return history
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.base_url = base_url.rstrip("/")
def _headers(self) -> list[dict[str, str]]:
return [
{"Authorization": f"token {self.token}", "Accept": "application/json"},
{
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
"Accept": "application/json",
},
]
def _request_json(self, url: str) -> list[dict[str, Any]]:
last_error: Exception | None = None
for headers in self._headers():
try:
req = request.Request(url, headers=headers)
with request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except Exception as exc: # pragma: no cover - exercised only on live API failure
last_error = exc
if last_error is None: # pragma: no cover - defensive
raise RuntimeError("request failed without an exception")
raise last_error
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
issues: list[dict[str, Any]] = []
page = 1
while True:
query = {"state": state, "type": "issues", "limit": 100, "page": page}
if since:
query["since"] = since
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
batch = self._request_json(url)
if not batch:
break
issues.extend(filter_issue_items(batch))
page += 1
return issues
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def load_config(path: Path) -> dict[str, Any]:
config = dict(DEFAULT_CONFIG)
alert = dict(DEFAULT_CONFIG["alert"])
raw = load_json(path, {})
config.update(raw)
alert.update(raw.get("alert") or {})
config["alert"] = alert
return config
def collect_live_snapshot(
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
) -> dict[str, Any]:
token = token_file.read_text(encoding="utf-8").strip()
report_day = normalize_today(today)
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
repos = list(config.get("repos") or [])
repo_payload = {}
for repo in repos:
repo_payload[repo] = {
"open_issues": client.list_issues(repo, state="open"),
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
}
return {
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"repos": repo_payload,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config(args.config)
if args.snapshot_file:
snapshot = load_json(args.snapshot_file, {"repos": {}})
else:
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
report = build_velocity_report(config, snapshot, today=args.today)
args.output_json.parent.mkdir(parents=True, exist_ok=True)
args.output_md.parent.mkdir(parents=True, exist_ok=True)
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
args.output_md.write_text(render_markdown(report), encoding="utf-8")
if args.write_history:
update_history(args.history_file, report)
if args.json:
print(json.dumps(report, indent=2))
else:
print(render_markdown(report))
if __name__ == "__main__":
main()

311
src/timmy/blackboard.py Normal file
View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Blackboard — Redis-backed shared coordination layer.
Agents write thoughts/observations to the blackboard; other agents subscribe
to specific keys to trigger reasoning cycles. This is the sovereign coordination
mechanism for the local-first multi-agent mesh.
Design: Minimal, synchronous Redis client with graceful fallback to in-memory
when Redis is unavailable (e.g., during local dev without Docker).
SOUL.md: "Sovereignty and service always." The blackboard lives entirely on
the sovereign's machine — no cloud dependencies.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
logger = logging.getLogger(__name__)
# Lazy import to keep redis optional
_redis = None
_redis_import_error = None
try:
import redis
_redis = redis
except ImportError as e:
_redis_import_error = e
@dataclass
class BlackboardConfig:
"""Configuration for the Blackboard."""
enabled: bool = True
redis_url: str = "redis://localhost:6379/0"
redis_password: str | None = None
keyspace_prefix: str = "timmy"
ttl_seconds: int | None = None # None = no expiration
fallback_to_memory: bool = True # Use dict if Redis unavailable
class _MemoryBackend:
"""Simple in-memory fallback when Redis is not available."""
def __init__(self):
self._store: dict[str, str] = {}
self._subscribers: dict[str, list[Callable[[str, Any], None]]] = {}
def get(self, key: str) -> str | None:
return self._store.get(key)
def set(self, key: str, value: str, ttl: int | None = None) -> bool:
self._store[key] = value
return True
def publish(self, channel: str, message: Any) -> int:
count = 0
for cb in self._subscribers.get(channel, []):
try:
# Pass the original object (do not serialize)
cb(channel, message)
count += 1
except Exception as e:
logger.warning("MemoryBackend subscriber error: %s", e)
return count
def subscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
self._subscribers.setdefault(channel, []).append(callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
if channel in self._subscribers:
self._subscribers[channel].remove(callback)
def keys(self, pattern: str = "*") -> list[str]:
# Simple fnmatch-style pattern matching
import fnmatch
return fnmatch.filter(list(self._store.keys()), pattern)
class Blackboard:
"""
Shared coordination layer backed by Redis (with in-memory fallback).
Usage:
bb = Blackboard()
bb.set("agent:timmy:thought", "checking queue...")
value = bb.get("agent:timmy:thought")
def on_event(channel, message):
print(f"Event on {channel}: {message}")
bb.subscribe("dispatch:new", on_event)
bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
"""
def __init__(self, config: BlackboardConfig | None = None):
cfg = config or BlackboardConfig()
self.enabled = cfg.enabled
self.prefix = cfg.keyspace_prefix
self.ttl = cfg.ttl_seconds
self._backend: _MemoryBackend | Any
if not _redis:
if cfg.fallback_to_memory:
logger.warning(
"redis-py not installed; using in-memory fallback. "
"Install with: pip install redis"
)
self._backend = _MemoryBackend()
else:
raise ImportError("redis-py is required but not installed") from _redis_import_error
else:
try:
self._backend = _redis.from_url(
cfg.redis_url,
password=cfg.redis_password,
decode_responses=True,
)
# Test connection
self._backend.ping()
logger.info("Blackboard connected to Redis at %s", cfg.redis_url)
except Exception as e:
if cfg.fallback_to_memory:
logger.warning("Redis connection failed (%s); falling back to in-memory", e)
self._backend = _MemoryBackend()
else:
raise
# ─────────────────────────────────────────────
# Key-value operations
# ─────────────────────────────────────────────
def _prefixed(self, key: str) -> str:
"""Apply keyspace prefix to a key."""
return f"{self.prefix}:{key}" if self.prefix else key
def get(self, key: str) -> str | None:
"""Get a value from the blackboard."""
return self._backend.get(self._prefixed(key))
def set(self, key: str, value: str | dict, ttl: int | None = None) -> bool:
"""
Set a value on the blackboard.
Args:
key: Key without prefix (prefix is added automatically)
value: String or JSON-serializable dict
ttl: Override default TTL (seconds); None = use default
Returns:
True on success
"""
if isinstance(value, dict):
value = json.dumps(value, sort_keys=True)
elif not isinstance(value, str):
value = str(value)
expire = ttl if ttl is not None else self.ttl
result = self._backend.set(self._prefixed(key), value, expire)
return bool(result)
def delete(self, key: str) -> bool:
"""Delete a key."""
try:
return bool(self._backend.delete(self._prefixed(key)))
except AttributeError:
# MemoryBackend
k = self._prefixed(key)
if k in self._backend._store:
del self._backend._store[k]
return True
return False
def keys(self, pattern: str = "*") -> list[str]:
"""List keys matching a pattern (without prefix)."""
full_pattern = self._prefixed(pattern)
raw_keys = self._backend.keys(full_pattern)
# Strip prefix
prefix_len = len(self.prefix) + 1 if self.prefix else 0
return [k[prefix_len:] if k.startswith(f"{self.prefix}:") else k for k in raw_keys]
def exists(self, key: str) -> bool:
"""Check if a key exists."""
try:
return bool(self._backend.exists(self._prefixed(key)))
except AttributeError:
# MemoryBackend
return self._prefixed(key) in self._backend._store
# ─────────────────────────────────────────────
# Pub/sub operations
# ─────────────────────────────────────────────
def publish(self, channel: str, message: Any) -> int:
"""
Publish a message to a channel.
Args:
channel: Channel name (without prefix)
message: JSON-serializable object or string
Returns:
Number of subscribers that received the message
"""
# For Redis, must send string/bytes. For MemoryBackend, pass object.
if isinstance(self._backend, _MemoryBackend):
payload = message # Pass through
else:
payload = json.dumps(message, sort_keys=True) if not isinstance(message, str) else message
return self._backend.publish(self._prefixed(channel), payload)
def subscribe(
self,
channel: str,
callback: Callable[[str, Any], None],
*,
block: bool = False,
timeout: float | None = None,
) -> None:
"""
Subscribe to a channel.
Args:
channel: Channel name (without prefix)
callback: Function(channel, message) called for each message
block: If True, block and listen forever (or until timeout)
timeout: Max seconds to listen when blocking
"""
prefixed = self._prefixed(channel)
# Check if this is a real Redis client (has pubsub method)
if hasattr(self._backend, 'pubsub') and callable(getattr(self._backend, 'pubsub', None)):
# Real Redis pub/sub
import threading
pubsub = self._backend.pubsub()
pubsub.subscribe(prefixed)
def listener():
for msg in pubsub.listen():
if msg['type'] == 'message':
try:
data = json.loads(msg['data'])
except (json.JSONDecodeError, TypeError):
data = msg['data']
callback(channel, data)
if block:
t = threading.Thread(target=listener, daemon=True)
t.start()
if timeout:
t.join(timeout)
else:
t.join()
else:
# Fire-and-forget thread
threading.Thread(target=listener, daemon=True).start()
else:
# MemoryBackend — synchronous callback registration
self._backend.subscribe(prefixed, callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe from a channel."""
try:
self._backend.unsubscribe(self._prefixed(channel), callback)
except AttributeError:
pass # MemoryBackend supports it
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def clear_namespace(self, pattern: str = "*") -> int:
"""Delete all keys matching pattern in this namespace."""
full = self._prefixed(pattern)
try:
keys = self._backend.keys(full)
if keys:
return self._backend.delete(*keys)
return 0
except AttributeError:
store_keys = list(self._backend._store.keys())
import fnmatch
matched = fnmatch.filter(store_keys, full)
for k in matched:
del self._backend._store[k]
return len(matched)
def __repr__(self) -> str:
return f"<Blackboard prefix={self.prefix!r} backend={type(self._backend).__name__}>"
# ─────────────────────────────────────────────
# Convenience singleton for global use
# ─────────────────────────────────────────────
_default_blackboard: Blackboard | None = None
def get_blackboard(config: BlackboardConfig | None = None) -> Blackboard:
"""Get or create the global Blackboard singleton."""
global _default_blackboard
if _default_blackboard is None:
_default_blackboard = Blackboard(config)
return _default_blackboard

194
tests/test_blackboard.py Normal file
View File

@@ -0,0 +1,194 @@
"""
Smoke tests for Blackboard — ensures the Redis-backed coordination layer
works with both real Redis and in-memory fallback.
"""
import json
import time
import pytest
from src.timmy.blackboard import Blackboard, BlackboardConfig, _MemoryBackend
class TestBlackboardBasics:
"""Test core key-value operations."""
def test_kv_memory_backend(self):
"""KV operations work using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Set and get
assert bb.set("test:key", "hello") is True
assert bb.get("test:key") == "hello"
# Dict serialization
assert bb.set("test:obj", {"a": 1, "b": 2}) is True
val = bb.get("test:obj")
assert json.loads(val) == {"a": 1, "b": 2}
# Exists
assert bb.exists("test:key") is True
assert bb.exists("missing") is False
# Delete
assert bb.delete("test:key") is True
assert bb.get("test:key") is None
# Keys with prefix
bb.set("agent:timmy:state", "ready")
bb.set("agent:ezra:state", "idle")
keys = bb.keys("agent:*:state")
assert len(keys) == 2
assert "timmy" in keys[0] or "ezra" in keys[0]
# Clear namespace
assert bb.clear_namespace("agent:*") == 2
assert bb.keys("agent:*") == []
class TestBlackboardPubSub:
"""Test pub/sub coordination patterns."""
def test_pubsub_memory_backend(self):
"""Publish/subscribe works using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
received = []
def callback(channel, message):
received.append((channel, message))
bb.subscribe("dispatch:new", callback)
# Publish
count = bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
assert count == 1
assert len(received) == 1
ch, msg = received[0]
assert ch == "dispatch:new"
assert msg == {"issue": 123, "action": "comment"}
bb.unsubscribe("dispatch:new", callback)
bb.publish("dispatch:new", {"should": "not arrive"})
assert len(received) == 1 # no new messages
def test_publish_without_subscribers(self):
"""Publish returns 0 when no subscribers."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
count = bb.publish("empty:channel", {"msg": 1})
assert count == 0
class TestBlackboardConfig:
"""Test configuration parsing and validation."""
def test_default_config(self):
cfg = BlackboardConfig()
assert cfg.enabled is True
assert cfg.redis_url == "redis://localhost:6379/0"
assert cfg.keyspace_prefix == "timmy"
assert cfg.ttl_seconds == 3600
assert cfg.fallback_to_memory is True
def test_custom_config(self):
cfg = BlackboardConfig(
enabled=False,
redis_url="redis://192.168.1.10:6379/1",
keyspace_prefix="myagent",
ttl_seconds=1800,
fallback_to_memory=False,
)
assert cfg.enabled is False
assert cfg.redis_url == "redis://192.168.1.10:6379/1"
assert cfg.keyspace_prefix == "myagent"
assert cfg.ttl_seconds == 1800
assert cfg.fallback_to_memory is False
class TestKeyspacePrefix:
"""Test that keys are correctly prefixed."""
def test_prefixed_keys(self):
bb = Blackboard(BlackboardConfig(keyspace_prefix="myagent", fallback_to_memory=True))
bb.set("thought", "test")
# Internal key should be "myagent:thought"
# We can verify by checking keys()
keys = bb.keys("*")
assert any("myagent:thought" in k for k in keys)
class TestBlackboardIntegration:
"""Integration pattern: agent thought cycle."""
def test_agent_thought_cycle(self):
"""Simulate Timmy writing a thought and Ezra reading it."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Agent A writes observation
bb.set("agent:timmy:observation", "Gitea queue has 12 open issues")
# Agent B reads
obs = bb.get("agent:timmy:observation")
assert obs == "Gitea queue has 12 open issues"
# Agent B writes analysis
bb.set("agent:ezra:analysis", "Prioritize critical bugs first")
# Event-driven pattern
events = []
def on_plan(channel, message):
events.append(message)
bb.subscribe("fleet:plan", on_plan)
bb.publish("fleet:plan", {"phase": "triaging", "lead": "ezra"})
assert len(events) == 1
assert events[0]["phase"] == "triaging"
class TestTTL:
"""Test TTL handling (where supported)."""
def test_ttl_set_in_config(self):
cfg = BlackboardConfig(ttl_seconds=60, fallback_to_memory=True)
bb = Blackboard(cfg)
assert bb.ttl == 60
# Setting a value uses TTL from config
bb.set("temp:key", "expiring value")
# In memory backend ignores TTL, but value is set
assert bb.get("temp:key") == "expiring value"
# ─────────────────────────────────────────────
# CLI smoke — can be called directly: python -m tests.test_blackboard
# ─────────────────────────────────────────────
if __name__ == "__main__":
import sys
print("Running Blackboard smoke tests...")
suite = [
TestBlackboardBasics().test_kv_memory_backend,
TestBlackboardPubSub().test_pubsub_memory_backend,
TestBlackboardConfig().test_default_config,
TestBlackboardIntegration().test_agent_thought_cycle,
]
failures = 0
for test in suite:
name = test.__name__
try:
test()
print(f"{name}")
except AssertionError as e:
print(f"{name}: {e}")
failures += 1
except Exception as e:
print(f"{name}: ERROR — {e}")
failures += 1
print(f"\nRan {len(suite)} tests, {failures} failures")
sys.exit(failures)

View File

@@ -1,176 +0,0 @@
from __future__ import annotations
import json
import subprocess
import sys
from datetime import date
from pathlib import Path
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
ROOT = Path(__file__).resolve().parent.parent
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
SNAPSHOT = {
"generated_at": "2026-04-22T12:00:00Z",
"repos": {
"timmy-home": {
"open_issues": [
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
],
"recent_issues": [
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
],
},
"timmy-config": {
"open_issues": [
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
],
"recent_issues": [
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
],
},
},
}
CONFIG = {
"owner": "Timmy_Foundation",
"repos": ["timmy-home", "timmy-config"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
assert report["generated_day"] == "2026-04-22"
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
assert report["summary"]["total_open_now"] == 3
home = report["repos"][0]
assert home["repo"] == "timmy-home"
assert home["open_now"] == 2
assert home["opened_last_7d"] == 5
assert home["closed_last_7d"] == 3
assert home["baseline_closed"] == 7
assert home["weekly_net"] == 2
assert home["alert"]["status"] == "drop"
assert home["alert"]["recent_closed"] == 3
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
timmy_config = report["repos"][1]
assert timmy_config["repo"] == "timmy-config"
assert timmy_config["open_now"] == 1
assert timmy_config["closed_last_7d"] == 1
assert timmy_config["alert"]["status"] == "ok"
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
rendered = render_markdown(report)
for snippet in (
"# Burn-down Velocity Tracking",
"## Per-repo velocity",
"timmy-home",
"timmy-config",
"## Dashboard handoff for timmy-config",
"velocity_drop",
"## Alerts",
):
assert snippet in rendered
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
history_path = tmp_path / "burn-velocity-history.json"
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
update_history(history_path, report)
updated = json.loads(json.dumps(report))
updated["repos"][0]["open_now"] = 9
updated["summary"]["total_open_now"] = 10
update_history(history_path, updated)
history = json.loads(history_path.read_text(encoding="utf-8"))
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
assert history["days"][0]["summary"]["total_open_now"] == 10
assert history["days"][0]["repos"][0]["open_now"] == 9
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
snapshot_path = tmp_path / "snapshot.json"
output_json = tmp_path / "latest.json"
output_md = tmp_path / "latest.md"
history_path = tmp_path / "history.json"
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.burn_velocity_tracker",
"--snapshot-file",
str(snapshot_path),
"--today",
"2026-04-22",
"--output-json",
str(output_json),
"--output-md",
str(output_md),
"--history-file",
str(history_path),
"--write-history",
"--json",
],
check=True,
cwd=ROOT,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
assert output_json.exists()
assert output_md.exists()
assert history_path.exists()
assert "timmy-config" in output_md.read_text(encoding="utf-8")
def test_repo_contains_burn_velocity_tracking_doc() -> None:
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Burn-down Velocity Tracking",
"python3 scripts/burn_velocity_tracker.py",
"configs/burn_velocity_repos.json",
"~/.timmy/burn-velocity/latest.json",
"timmy-config dashboard",
"type=issues",
"velocity_drop",
]
for snippet in required:
assert snippet in text