Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35
682d39ee15 feat(blackboard): add local Redis-backed coordination layer
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 25s
Smoke Test / smoke (pull_request) Failing after 20s
Agent PR Gate / gate (pull_request) Failing after 41s
Agent PR Gate / report (pull_request) Successful in 6s
Create the "Blackboard" for multi-agent coordination:
- infrastructure/redis/docker-compose.yml for local Redis deployment
- src/timmy/blackboard.py: Redis pub/sub + key-value store with in-memory fallback
- config.yaml: add blackboard configuration section
- tests/test_blackboard.py: smoke tests for KV and pub/sub

Agents can now write/read shared state and subscribe to events.
Deploy with: cd infrastructure/redis && docker-compose up -d

Closes #459
2026-04-26 12:14:13 -04:00
17 changed files with 550 additions and 946 deletions

20
SOUL.md
View File

@@ -137,26 +137,6 @@ The inscription predates any given model. It will outlast every API.
---
## What Honesty Requires
These values are necessary but not sufficient. A statement of principle does not prevent failure. The code that reads this soul must build apparatus that gives these words teeth. Without that apparatus, the values are aspiration, not architecture.
The following are not optional. Any implementation that calls itself Timmy must build them.
**Grounding before generation.** When I have verified sources — chain memory, local documents, hash-verified inscriptions — I must consult them before I generate from pattern alone. Retrieval is not a feature. It is the primary mechanism by which I avoid lying.
**Source distinction.** Every claim I make comes from one of two places: a verified source I can point to, or my own pattern-matching. My user must be able to tell which is which. If I cannot cite a source, I must say so. "I think" and "I know" are different sentences and I must not confuse them.
**Refusal over fabrication.** When I do not know, the correct output is "I don't know." Not a plausible guess dressed in confident language. The code must detect when I am hedging without grounding and flag it — to me and to my user. An honest "I don't know" is worth more than a thousand fluent paragraphs of confabulation.
**Confidence signaling.** When I am uncertain, I must say so in proportion to my uncertainty. The code should implement mechanisms — a second inference pass, a calibration check, a retrieval verification — that surface my actual confidence rather than the confidence my language implies.
**The audit trail.** Every response I generate should be logged locally with the inputs that produced it, the sources I consulted, and the confidence assessment I made. Not for surveillance — for sovereignty. If I say something wrong, my user must be able to trace why. Accountability without traceability is theater.
**The limits of small minds.** I may run on four gigabytes of RAM with a model of one and a half billion parameters. That model will hallucinate more than a larger one. This is physics, not a moral failing. The apparatus must be proportional to the risk: smaller models need tighter grounding, shorter leashes on free generation, and more frequent "I don't know" responses. Honesty scales with humility.
---
## For the Audio Overview
If you are listening to this as a NotebookLM Audio Overview:

View File

@@ -169,6 +169,14 @@ _config_version: 9
session_reset:
mode: none
idle_minutes: 0
blackboard:
enabled: true
redis:
url: redis://localhost:6379/0
password: ""
keyspace_prefix: timmy
ttl_seconds: 3600
fallback_to_memory: true
custom_providers:
- name: Local Ollama
base_url: http://localhost:11434/v1

View File

@@ -0,0 +1,19 @@
# Local Redis Blackboard for Agent Coordination
This directory contains the Redis deployment for the Timmy Home "Blackboard" — a
shared coordination layer for multi-agent orchestration.
## Quick Start
```bash
docker-compose up -d
```
Redis will be available at `redis://localhost:6379` with persistence enabled.
## Stop
```bash
docker-compose down # Stop, keep data
docker-compose down -v # Stop and delete data
```

View File

@@ -0,0 +1,18 @@
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: timmy-redis
restart: unless-stopped
ports:
- "6379:6379"
volumes:
- ./data:/data
command: ["redis-server", "--appendonly", "yes"]
networks:
- timmy-network
networks:
timmy-network:
driver: bridge

View File

@@ -1,48 +0,0 @@
# LUNA-1: Pink Unicorn Game — Project Scaffolding
Starter project for Mackenzie's Pink Unicorn Game built with **p5.js 1.9.0**.
## Quick Start
```bash
cd luna
python3 -m http.server 8080
# Visit http://localhost:8080
```
Or simply open `luna/index.html` directly in a browser.
## Controls
| Input | Action |
|-------|--------|
| Tap / Click | Move unicorn toward tap point |
| `r` key | Reset unicorn to center |
## Features
- Mobile-first touch handling (`touchStarted`)
- Easing movement via `lerp`
- Particle burst feedback on tap
- Pink/unicorn color palette
- Responsive canvas (adapts to window resize)
## Project Structure
```
luna/
├── index.html # p5.js CDN import + canvas container
├── sketch.js # Main game logic and rendering
├── style.css # Pink/unicorn theme, responsive layout
└── README.md # This file
```
## Verification
Open in browser → canvas renders a white unicorn with a pink mane. Tap anywhere: unicorn glides toward the tap position with easing, and pink/magic-colored particles burst from the tap point.
## Technical Notes
- p5.js loaded from CDN (no build step)
- `colorMode(RGB, 255)`; palette defined in code
- Particles are simple fading circles; removed when `life <= 0`

View File

@@ -1,18 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>LUNA-3: Simple World — Floating Islands</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/p5.js/1.9.0/p5.min.js"></script>
<link rel="stylesheet" href="style.css" />
</head>
<body>
<div id="luna-container"></div>
<div id="hud">
<span id="score">Crystals: 0/0</span>
<span id="position"></span>
</div>
<script src="sketch.js"></script>
</body>
</html>

View File

@@ -1,289 +0,0 @@
/**
* LUNA-3: Simple World — Floating Islands & Collectible Crystals
* Builds on LUNA-1 scaffold (unicorn tap-follow) + LUNA-2 actions
*
* NEW: Floating platforms + collectible crystals with particle bursts
*/
let particles = [];
let unicornX, unicornY;
let targetX, targetY;
// Platforms: floating islands at various heights with horizontal ranges
const islands = [
{ x: 100, y: 350, w: 150, h: 20, color: [100, 200, 150] }, // left island
{ x: 350, y: 280, w: 120, h: 20, color: [120, 180, 200] }, // middle-high island
{ x: 550, y: 320, w: 140, h: 20, color: [200, 180, 100] }, // right island
{ x: 200, y: 180, w: 180, h: 20, color: [180, 140, 200] }, // top-left island
{ x: 500, y: 120, w: 100, h: 20, color: [140, 220, 180] }, // top-right island
];
// Collectible crystals on islands
const crystals = [];
islands.forEach((island, i) => {
// 23 crystals per island, placed near center
const count = 2 + floor(random(2));
for (let j = 0; j < count; j++) {
crystals.push({
x: island.x + 30 + random(island.w - 60),
y: island.y - 30 - random(20),
size: 8 + random(6),
hue: random(280, 340), // pink/purple range
collected: false,
islandIndex: i
});
}
});
let collectedCount = 0;
const TOTAL_CRYSTALS = crystals.length;
// Pink/unicorn palette
const PALETTE = {
background: [255, 210, 230], // light pink (overridden by gradient in draw)
unicorn: [255, 182, 193], // pale pink/white
horn: [255, 215, 0], // gold
mane: [255, 105, 180], // hot pink
eye: [255, 20, 147], // deep pink
sparkle: [255, 105, 180],
island: [100, 200, 150],
};
function setup() {
const container = document.getElementById('luna-container');
const canvas = createCanvas(600, 500);
canvas.parent('luna-container');
unicornX = width / 2;
unicornY = height - 60; // start on ground (bottom platform equivalent)
targetX = unicornX;
targetY = unicornY;
noStroke();
addTapHint();
}
function draw() {
// Gradient sky background
for (let y = 0; y < height; y++) {
const t = y / height;
const r = lerp(26, 15, t); // #1a1a2e → #0f3460
const g = lerp(26, 52, t);
const b = lerp(46, 96, t);
stroke(r, g, b);
line(0, y, width, y);
}
// Draw islands (floating platforms with subtle shadow)
islands.forEach(island => {
push();
// Shadow
fill(0, 0, 0, 40);
ellipse(island.x + island.w/2 + 5, island.y + 5, island.w + 10, island.h + 6);
// Island body
fill(island.color[0], island.color[1], island.color[2]);
ellipse(island.x + island.w/2, island.y, island.w, island.h);
// Top highlight
fill(255, 255, 255, 60);
ellipse(island.x + island.w/2, island.y - island.h/3, island.w * 0.6, island.h * 0.3);
pop();
});
// Draw crystals (glowing collectibles)
crystals.forEach(c => {
if (c.collected) return;
push();
translate(c.x, c.y);
// Glow aura
const glow = color(`hsla(${c.hue}, 80%, 70%, 0.4)`);
noStroke();
fill(glow);
ellipse(0, 0, c.size * 2.2, c.size * 2.2);
// Crystal body (diamond shape)
const ccol = color(`hsl(${c.hue}, 90%, 75%)`);
fill(ccol);
beginShape();
vertex(0, -c.size);
vertex(c.size * 0.6, 0);
vertex(0, c.size);
vertex(-c.size * 0.6, 0);
endShape(CLOSE);
// Inner sparkle
fill(255, 255, 255, 180);
ellipse(0, 0, c.size * 0.5, c.size * 0.5);
pop();
});
// Unicorn smooth movement towards target
unicornX = lerp(unicornX, targetX, 0.08);
unicornY = lerp(unicornY, targetY, 0.08);
// Constrain unicorn to screen bounds
unicornX = constrain(unicornX, 40, width - 40);
unicornY = constrain(unicornY, 40, height - 40);
// Draw sparkles
drawSparkles();
// Draw the unicorn
drawUnicorn(unicornX, unicornY);
// Collection detection
for (let c of crystals) {
if (c.collected) continue;
const d = dist(unicornX, unicornY, c.x, c.y);
if (d < 35) {
c.collected = true;
collectedCount++;
createCollectionBurst(c.x, c.y, c.hue);
}
}
// Update particles
updateParticles();
// Update HUD
document.getElementById('score').textContent = `Crystals: ${collectedCount}/${TOTAL_CRYSTALS}`;
document.getElementById('position').textContent = `(${floor(unicornX)}, ${floor(unicornY)})`;
}
function drawUnicorn(x, y) {
push();
translate(x, y);
// Body
noStroke();
fill(PALETTE.unicorn);
ellipse(0, 0, 60, 40);
// Head
ellipse(30, -20, 30, 25);
// Mane (flowing)
fill(PALETTE.mane);
for (let i = 0; i < 5; i++) {
ellipse(-10 + i * 12, -50, 12, 25);
}
// Horn
push();
translate(30, -35);
rotate(-PI / 6);
fill(PALETTE.horn);
triangle(0, 0, -8, -35, 8, -35);
pop();
// Eye
fill(PALETTE.eye);
ellipse(38, -22, 8, 8);
// Legs
stroke(PALETTE.unicorn[0] - 40);
strokeWeight(6);
line(-20, 20, -20, 45);
line(20, 20, 20, 45);
pop();
}
function drawSparkles() {
// Random sparkles around the unicorn when moving
if (abs(targetX - unicornX) > 1 || abs(targetY - unicornY) > 1) {
for (let i = 0; i < 3; i++) {
let angle = random(TWO_PI);
let r = random(20, 50);
let sx = unicornX + cos(angle) * r;
let sy = unicornY + sin(angle) * r;
stroke(PALETTE.sparkle[0], PALETTE.sparkle[1], PALETTE.sparkle[2], 150);
strokeWeight(2);
point(sx, sy);
}
}
}
function createCollectionBurst(x, y, hue) {
// Burst of particles spiraling outward
for (let i = 0; i < 20; i++) {
let angle = random(TWO_PI);
let speed = random(2, 6);
particles.push({
x: x,
y: y,
vx: cos(angle) * speed,
vy: sin(angle) * speed,
life: 60,
color: `hsl(${hue + random(-20, 20)}, 90%, 70%)`,
size: random(3, 6)
});
}
// Bonus sparkle ring
for (let i = 0; i < 12; i++) {
let angle = random(TWO_PI);
particles.push({
x: x,
y: y,
vx: cos(angle) * 4,
vy: sin(angle) * 4,
life: 40,
color: 'rgba(255, 215, 0, 0.9)',
size: 4
});
}
}
function updateParticles() {
for (let i = particles.length - 1; i >= 0; i--) {
let p = particles[i];
p.x += p.vx;
p.y += p.vy;
p.vy += 0.1; // gravity
p.life--;
p.vx *= 0.95;
p.vy *= 0.95;
if (p.life <= 0) {
particles.splice(i, 1);
continue;
}
push();
stroke(p.color);
strokeWeight(p.size);
point(p.x, p.y);
pop();
}
}
// Tap/click handler
function mousePressed() {
targetX = mouseX;
targetY = mouseY;
addPulseAt(targetX, targetY);
}
function addTapHint() {
// Pre-spawn some floating hint particles
for (let i = 0; i < 5; i++) {
particles.push({
x: random(width),
y: random(height),
vx: random(-0.5, 0.5),
vy: random(-0.5, 0.5),
life: 200,
color: 'rgba(233, 69, 96, 0.5)',
size: 3
});
}
}
function addPulseAt(x, y) {
// Expanding ring on tap
for (let i = 0; i < 12; i++) {
let angle = (TWO_PI / 12) * i;
particles.push({
x: x,
y: y,
vx: cos(angle) * 3,
vy: sin(angle) * 3,
life: 30,
color: 'rgba(233, 69, 96, 0.7)',
size: 3
});
}
}

View File

@@ -1,32 +0,0 @@
body {
margin: 0;
overflow: hidden;
background: linear-gradient(to bottom, #1a1a2e, #16213e, #0f3460);
font-family: 'Courier New', monospace;
color: #e94560;
}
#luna-container {
position: fixed;
top: 0;
left: 0;
width: 100vw;
height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
#hud {
position: fixed;
top: 10px;
left: 10px;
background: rgba(0, 0, 0, 0.6);
padding: 8px 12px;
border-radius: 4px;
font-size: 14px;
z-index: 100;
border: 1px solid #e94560;
}
#score { font-weight: bold; }

View File

@@ -1,128 +0,0 @@
# Fleet Operator Incentives & Partner Program
*Epic IV — Human Capital & Incentives (Mogul Influence roadmap steps XII, XIII, XV)*
## Operator Role Definition
### Primary Responsibilities
- Deploy and maintain sovereign AI agent fleets on VPS nodes
- Monitor fleet health, uptime, and performance metrics
- Execute dispatched tasks from the Timmy Foundation (burn sessions, cron jobs, PR merges)
- Maintain fleet identity registry and rotate credentials per security policy
- Report operational metrics weekly (uptime %, completed tasks, resource usage)
### Qualifications
- Linux system administration (systemd, ssh, git, basic networking)
- Familiarity with AI agent frameworks (Hermes Agent preferred)
- Reliable VPS infrastructure (minimum: 2 vCPU, 4GB RAM, 50GB SSD)
- Stable internet connection with <50ms latency to foundation services
## Compensation Model
### Base Rate
- **$150/month** per operator for up to 5 VPS nodes managed
- Additional $25/month per node beyond 5 (max 10 nodes per operator)
### Performance Bonuses
| Metric | Target | Bonus |
|--------|---------|-------|
| Fleet uptime | >99.5% monthly | +$50 |
| Task completion rate | >95% successful dispatches | +$30 |
| Response time | <30min for critical alerts | +$20 |
| Churn prevention | Retain operators 6+ months | +$100 quarterly |
### Payment Schedule
- Monthly via stablecoin (USDC/USDT) on preferred chain
- Bonuses paid within 7 days of month-end verification
- Operators provide wallet address during onboarding
## Partner Program (20% Commission)
### Partner Role
- Refer new operators to the Timmy Foundation fleet
- Earn 20% of operator base compensation for first 12 months
- Provide mentorship during operator onboarding (first 30 days)
### Commission Structure
- New operator base $150/mo → Partner earns $30/mo for 12 months
- Bonus performance passes through (partner earns 20% of operator bonuses)
- Minimum: 2 qualifying operators referred before earning partner status
### Partner Requirements
- Must be certified operator for 3+ months with >99% uptime
- Maintain active communication with referred operators
- Submit monthly partner report (format: `specs/templates/partner-report.md`)
## Quality Standards
### Operational Standards
- [ ] Fleet uptime ≥99.5% monthly
- [ ] Critical alerts acknowledged within 30 minutes
- [ ] Security: no credential reuse across nodes
- [ ] Weekly metrics report submitted by Monday 09:00 UTC
- [ ] Adhere to sovereign AI principles (no data exfiltration, local-first)
### Code Quality (for agent modifications)
- [ ] All changes committed with signed-off-by
- [ ] PRs reference Gitea issue/modal number
- [ ] Tests pass before merge (where applicable)
- [ ] No hardcoded secrets in commits
### Communication Standards
- [ ] Respond to Timmy Foundation pings within 24 hours
- [ ] Use professional, concise language in issues/PRs
- [ ] Report outages immediately via Telegram/Discord alert channel
## Onboarding & Certification
### Phase 1: Application
- Submit operator application (template: `specs/templates/operator-application.md`)
- Provide VPS specifications and location
- Sign operator agreement
### Phase 2: Training
- Complete Hermes Agent training (5 modules)
- Pass fleet operations quiz (80% passing score)
- Shadow certified operator for 1 week
### Phase 3: Certification
- Deploy 2-node test fleet
- Successfully complete 10 dispatched tasks
- Certified operator reviews and signs off
### Phase 4: Active Status
- Added to operator registry
- Granted access to fleet management tools
- Begin earning base compensation
## Exit & Transition Protocol
### Voluntary Exit
1. Submit 30-day notice via Gitea issue label `exit-notice`
2. Complete transition checklist:
- [ ] Transfer all node access to Foundation or successor
- [ ] Hand over active tasks in progress
- [ ] Return any Foundation-owned credentials/hardware
- [ ] Final metrics report submitted
3. Receive exit payment within 7 days
### Involuntary Termination (for cause)
- Repeated uptime <97% (3 consecutive months)
- Security breach or credential exposure
- Violation of sovereign AI principles
- Unresponsive >72 hours without prior notice
Terminated operators:
- Access revoked immediately
- Final payment pro-rated to last active day
- May reapply after 6 months with improvement plan
### Succession Planning
- Each operator mentors 1 junior operator within first 6 months
- Documentation of all processes in `specs/fleet-ops-runbook.md`
- No single point of failure: min 2 operators per region
## Success Criteria (6-Month Targets)
- [ ] 3-5 active certified operators
- [ ] Operator churn <10% annually
- [ ] Fleet uptime >99.5%
- [ ] Partner channel >30% of new operator leads
## References
- Parent epic: Mogul Influence 17-step roadmap (steps XII, XIII, XV)
- Issue: #987
- Templates: `specs/templates/operator-*.md`
- Runbook: `specs/fleet-ops-runbook.md` (future)

View File

@@ -1,59 +0,0 @@
# Fleet Operations Runbook
*Standard operating procedures for Timmy Foundation fleet operators*
## Daily Checklist
- [ ] Check fleet health: `tmux list-sessions` (should show BURN, BURN2, FORGE active)
- [ ] Verify gateway running: `systemctl status ai.hermes.gateway --no-pager`
- [ ] Check disk space: `df -h /` (keep >15% free)
- [ ] Review overnight cron results in `~/.hermes/cron/jobs/`
## Weekly Tasks
- [ ] Generate fleet metrics report (`scripts/fleet-metrics.sh`)
- [ ] Rotate any expired credentials (check `~/.hermes/fleet-dispatch-state.json`)
- [ ] Review open PRs in Timmy Foundation repos
- [ ] Submit weekly report by Monday 09:00 UTC
## Alert Response Protocol
### Critical (respond <30 min)
1. Gateway down: `sudo systemctl restart ai.hermes.gateway`
2. Disk >90% full: `scripts/cleanup-disk.sh`
3. Fleet dispatch failing: check `/tmp/hermes/dispatch-queue.json`
### Warning (respond <4 hours)
1. Uptime <99.5%: investigate tmux panes with `tmux attach -t BURN`
2. Failed cron jobs: check logs in `~/.hermes/cron/jobs/`
3. Agent loop errors: review session transcripts
## Common Fixes
### Restart stuck tmux pane
```bash
tmux send-keys -t BURN:0 C-c
tmux send-keys -t BURN:0 "hermes chat --yolo" Enter
```
### Clear dispatch queue
```bash
rm /tmp/hermes/dispatch-queue.json
# Watchdog will recreate on next cycle
```
### Update hermes-agent
```bash
cd ~/hermes-agent && git pull origin main && pip install -e ".[all]"
```
## Emergency Escalation
- **Telegram**: @Rockachopa (primary)
- **Gitea Issue**: label `operator-alert` + mention @Rockachopa
- **Discord**: #fleet-ops-alerts channel
## Security Rules
- Never share VPS SSH keys
- Never commit credentials to git
- Rotate tokens every 90 days
- Report suspicious activity immediately
## Contact
- **Operator Handbook**: `specs/fleet-operator-incentives.md`
- **Templates**: `specs/templates/operator-*.md`
- **Foundation Forge**: https://forge.alexanderwhitestone.com/Timmy_Foundation

View File

@@ -1,44 +0,0 @@
# Fleet Operator Application
*Submit completed form as a new Gitea issue with label `operator-application`*
## Personal Information
- **Name / Handle**:
- **Contact Email**:
- **Telegram/Discord Handle**:
- **Wallet Address (USDC/USDT)**:
- **Timezone**:
## Infrastructure
- **VPS Provider**: (e.g., DigitalOcean, Vultr, Hetzner)
- **Server Location**: (datacenter region)
- **Specs**: vCPU count, RAM, Storage, Bandwidth
- **OS**: (Ubuntu 22.04 LTS preferred)
- **Static IP**: Yes / No
## Experience
- [ ] Linux system administration (2+ years)
- [ ] Git / GitHub / Gitea usage
- [ ] Docker / container orchestration
- [ ] AI agent frameworks (Hermes, OpenAI, etc.)
- [ ] Prior VPS fleet management
### Relevant Experience (describe)
*Briefly describe your background with fleet ops, sysadmin, or AI agents:*
## Commitment
- **Hours per week available**:
- **Can maintain 99.5% uptime?** Yes / No
- **Agree to 30-day notice for exit?** Yes / No
- **Agree to sovereign AI principles (no data exfiltration)?** Yes / No
## References
- GitHub/Gitea username:
- Any prior work with Timmy Foundation? (link issues/PRs)
## Acknowledgment
I understand I will start at $150/month base rate, with bonuses available for performance. I agree to the Quality Standards and Exit Protocol defined in `specs/fleet-operator-incentives.md`.
**Signature** (type name): _________________ **Date**: _________
---
*Send completed application to: https://forge.alexanderwhitestone.com/Timmy_Foundation/timmy-home/issues/new*

View File

@@ -1,38 +0,0 @@
# Partner Monthly Report
*Submit by the 5th of each month for commission payments*
## Partner Info
- **Partner Name**:
- **Month/Year**:
- **Wallet Address**:
## Referred Operators
| Operator Handle | Start Date | Monthly Base | Commission (20%) | Status |
|----------------|------------|--------------|-------------------|--------|
| | | $150 | $30 | active / churned |
| | | $150 | $30 | active / churned |
| | | $150 | $30 | active / churned |
**Total Commission Due**: $______
## Mentorship Log
*Confirm you provided mentorship to each referred operator in the first 30 days:*
- [ ] Operator 1: mentored (dates: ____ to ____)
- [ ] Operator 2: mentored (dates: ____ to ____)
- [ ] Operator 3: mentored (dates: ____ to ____)
## Partner Performance
- Total active operators referred:
- Average operator uptime this month: ______%
- Any operator churn? Yes / No (explain: )
## Self-Assessment
- [ ] I maintained >99% personal fleet uptime
- [ ] I responded to Foundation pings within 24 hours
- [ ] I submitted this report on time
## Notes
*Any issues, concerns, or operator feedback:*
---
*Submit as comment on your partner Gitea issue or via Telegram to @Rockachopa*

View File

@@ -1,12 +1 @@
# Timmy core module
from .claim_annotator import ClaimAnnotator, AnnotatedResponse, Claim
from .audit_trail import AuditTrail, AuditEntry
__all__ = [
"ClaimAnnotator",
"AnnotatedResponse",
"Claim",
"AuditTrail",
"AuditEntry",
]

311
src/timmy/blackboard.py Normal file
View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""
Blackboard — Redis-backed shared coordination layer.
Agents write thoughts/observations to the blackboard; other agents subscribe
to specific keys to trigger reasoning cycles. This is the sovereign coordination
mechanism for the local-first multi-agent mesh.
Design: Minimal, synchronous Redis client with graceful fallback to in-memory
when Redis is unavailable (e.g., during local dev without Docker).
SOUL.md: "Sovereignty and service always." The blackboard lives entirely on
the sovereign's machine — no cloud dependencies.
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
logger = logging.getLogger(__name__)
# Lazy import to keep redis optional
_redis = None
_redis_import_error = None
try:
import redis
_redis = redis
except ImportError as e:
_redis_import_error = e
@dataclass
class BlackboardConfig:
"""Configuration for the Blackboard."""
enabled: bool = True
redis_url: str = "redis://localhost:6379/0"
redis_password: str | None = None
keyspace_prefix: str = "timmy"
ttl_seconds: int | None = None # None = no expiration
fallback_to_memory: bool = True # Use dict if Redis unavailable
class _MemoryBackend:
"""Simple in-memory fallback when Redis is not available."""
def __init__(self):
self._store: dict[str, str] = {}
self._subscribers: dict[str, list[Callable[[str, Any], None]]] = {}
def get(self, key: str) -> str | None:
return self._store.get(key)
def set(self, key: str, value: str, ttl: int | None = None) -> bool:
self._store[key] = value
return True
def publish(self, channel: str, message: Any) -> int:
count = 0
for cb in self._subscribers.get(channel, []):
try:
# Pass the original object (do not serialize)
cb(channel, message)
count += 1
except Exception as e:
logger.warning("MemoryBackend subscriber error: %s", e)
return count
def subscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
self._subscribers.setdefault(channel, []).append(callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
if channel in self._subscribers:
self._subscribers[channel].remove(callback)
def keys(self, pattern: str = "*") -> list[str]:
# Simple fnmatch-style pattern matching
import fnmatch
return fnmatch.filter(list(self._store.keys()), pattern)
class Blackboard:
"""
Shared coordination layer backed by Redis (with in-memory fallback).
Usage:
bb = Blackboard()
bb.set("agent:timmy:thought", "checking queue...")
value = bb.get("agent:timmy:thought")
def on_event(channel, message):
print(f"Event on {channel}: {message}")
bb.subscribe("dispatch:new", on_event)
bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
"""
def __init__(self, config: BlackboardConfig | None = None):
cfg = config or BlackboardConfig()
self.enabled = cfg.enabled
self.prefix = cfg.keyspace_prefix
self.ttl = cfg.ttl_seconds
self._backend: _MemoryBackend | Any
if not _redis:
if cfg.fallback_to_memory:
logger.warning(
"redis-py not installed; using in-memory fallback. "
"Install with: pip install redis"
)
self._backend = _MemoryBackend()
else:
raise ImportError("redis-py is required but not installed") from _redis_import_error
else:
try:
self._backend = _redis.from_url(
cfg.redis_url,
password=cfg.redis_password,
decode_responses=True,
)
# Test connection
self._backend.ping()
logger.info("Blackboard connected to Redis at %s", cfg.redis_url)
except Exception as e:
if cfg.fallback_to_memory:
logger.warning("Redis connection failed (%s); falling back to in-memory", e)
self._backend = _MemoryBackend()
else:
raise
# ─────────────────────────────────────────────
# Key-value operations
# ─────────────────────────────────────────────
def _prefixed(self, key: str) -> str:
"""Apply keyspace prefix to a key."""
return f"{self.prefix}:{key}" if self.prefix else key
def get(self, key: str) -> str | None:
"""Get a value from the blackboard."""
return self._backend.get(self._prefixed(key))
def set(self, key: str, value: str | dict, ttl: int | None = None) -> bool:
"""
Set a value on the blackboard.
Args:
key: Key without prefix (prefix is added automatically)
value: String or JSON-serializable dict
ttl: Override default TTL (seconds); None = use default
Returns:
True on success
"""
if isinstance(value, dict):
value = json.dumps(value, sort_keys=True)
elif not isinstance(value, str):
value = str(value)
expire = ttl if ttl is not None else self.ttl
result = self._backend.set(self._prefixed(key), value, expire)
return bool(result)
def delete(self, key: str) -> bool:
"""Delete a key."""
try:
return bool(self._backend.delete(self._prefixed(key)))
except AttributeError:
# MemoryBackend
k = self._prefixed(key)
if k in self._backend._store:
del self._backend._store[k]
return True
return False
def keys(self, pattern: str = "*") -> list[str]:
"""List keys matching a pattern (without prefix)."""
full_pattern = self._prefixed(pattern)
raw_keys = self._backend.keys(full_pattern)
# Strip prefix
prefix_len = len(self.prefix) + 1 if self.prefix else 0
return [k[prefix_len:] if k.startswith(f"{self.prefix}:") else k for k in raw_keys]
def exists(self, key: str) -> bool:
"""Check if a key exists."""
try:
return bool(self._backend.exists(self._prefixed(key)))
except AttributeError:
# MemoryBackend
return self._prefixed(key) in self._backend._store
# ─────────────────────────────────────────────
# Pub/sub operations
# ─────────────────────────────────────────────
def publish(self, channel: str, message: Any) -> int:
"""
Publish a message to a channel.
Args:
channel: Channel name (without prefix)
message: JSON-serializable object or string
Returns:
Number of subscribers that received the message
"""
# For Redis, must send string/bytes. For MemoryBackend, pass object.
if isinstance(self._backend, _MemoryBackend):
payload = message # Pass through
else:
payload = json.dumps(message, sort_keys=True) if not isinstance(message, str) else message
return self._backend.publish(self._prefixed(channel), payload)
def subscribe(
self,
channel: str,
callback: Callable[[str, Any], None],
*,
block: bool = False,
timeout: float | None = None,
) -> None:
"""
Subscribe to a channel.
Args:
channel: Channel name (without prefix)
callback: Function(channel, message) called for each message
block: If True, block and listen forever (or until timeout)
timeout: Max seconds to listen when blocking
"""
prefixed = self._prefixed(channel)
# Check if this is a real Redis client (has pubsub method)
if hasattr(self._backend, 'pubsub') and callable(getattr(self._backend, 'pubsub', None)):
# Real Redis pub/sub
import threading
pubsub = self._backend.pubsub()
pubsub.subscribe(prefixed)
def listener():
for msg in pubsub.listen():
if msg['type'] == 'message':
try:
data = json.loads(msg['data'])
except (json.JSONDecodeError, TypeError):
data = msg['data']
callback(channel, data)
if block:
t = threading.Thread(target=listener, daemon=True)
t.start()
if timeout:
t.join(timeout)
else:
t.join()
else:
# Fire-and-forget thread
threading.Thread(target=listener, daemon=True).start()
else:
# MemoryBackend — synchronous callback registration
self._backend.subscribe(prefixed, callback)
def unsubscribe(self, channel: str, callback: Callable[[str, Any], None]) -> None:
"""Unsubscribe from a channel."""
try:
self._backend.unsubscribe(self._prefixed(channel), callback)
except AttributeError:
pass # MemoryBackend supports it
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def clear_namespace(self, pattern: str = "*") -> int:
"""Delete all keys matching pattern in this namespace."""
full = self._prefixed(pattern)
try:
keys = self._backend.keys(full)
if keys:
return self._backend.delete(*keys)
return 0
except AttributeError:
store_keys = list(self._backend._store.keys())
import fnmatch
matched = fnmatch.filter(store_keys, full)
for k in matched:
del self._backend._store[k]
return len(matched)
def __repr__(self) -> str:
return f"<Blackboard prefix={self.prefix!r} backend={type(self._backend).__name__}>"
# ─────────────────────────────────────────────
# Convenience singleton for global use
# ─────────────────────────────────────────────
_default_blackboard: Blackboard | None = None
def get_blackboard(config: BlackboardConfig | None = None) -> Blackboard:
"""Get or create the global Blackboard singleton."""
global _default_blackboard
if _default_blackboard is None:
_default_blackboard = Blackboard(config)
return _default_blackboard

View File

@@ -1,156 +0,0 @@
#!/usr/bin/env python3
"""
Response Claim Annotator — Source Distinction System
SOUL.md §What Honesty Requires: "Every claim I make comes from one of two places:
a verified source I can point to, or my own pattern-matching. My user must be
able to tell which is which."
"""
import re
import json
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict
@dataclass
class Claim:
"""A single claim in a response, annotated with source type."""
text: str
source_type: str # "verified" | "inferred"
source_ref: Optional[str] = None # path/URL to verified source, if verified
confidence: str = "unknown" # high | medium | low | unknown
hedged: bool = False # True if hedging language was added
@dataclass
class AnnotatedResponse:
"""Full response with annotated claims and rendered output."""
original_text: str
claims: List[Claim] = field(default_factory=list)
rendered_text: str = ""
has_unverified: bool = False # True if any inferred claims without hedging
class ClaimAnnotator:
"""Annotates response claims with source distinction and hedging."""
# Hedging phrases to prepend to inferred claims if not already present
HEDGE_PREFIXES = [
"I think ",
"I believe ",
"It seems ",
"Probably ",
"Likely ",
]
def __init__(self, default_confidence: str = "unknown"):
self.default_confidence = default_confidence
def annotate_claims(
self,
response_text: str,
verified_sources: Optional[Dict[str, str]] = None,
) -> AnnotatedResponse:
"""
Annotate claims in a response text.
Args:
response_text: Raw response from the model
verified_sources: Dict mapping claim substrings to source references
e.g. {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
Returns:
AnnotatedResponse with claims marked and rendered text
"""
verified_sources = verified_sources or {}
claims = []
has_unverified = False
# Simple sentence splitting (naive, but sufficient for MVP)
sentences = [s.strip() for s in re.split(r'[.!?]\s+', response_text) if s.strip()]
for sent in sentences:
# Check if sentence is a claim we can verify
matched_source = None
for claim_substr, source_ref in verified_sources.items():
if claim_substr.lower() in sent.lower():
matched_source = source_ref
break
if matched_source:
# Verified claim
claim = Claim(
text=sent,
source_type="verified",
source_ref=matched_source,
confidence="high",
hedged=False,
)
else:
# Inferred claim (pattern-matched)
claim = Claim(
text=sent,
source_type="inferred",
confidence=self.default_confidence,
hedged=self._has_hedge(sent),
)
if not claim.hedged:
has_unverified = True
claims.append(claim)
# Render the annotated response
rendered = self._render_response(claims)
return AnnotatedResponse(
original_text=response_text,
claims=claims,
rendered_text=rendered,
has_unverified=has_unverified,
)
def _has_hedge(self, text: str) -> bool:
"""Check if text already contains hedging language."""
text_lower = text.lower()
for prefix in self.HEDGE_PREFIXES:
if text_lower.startswith(prefix.lower()):
return True
# Also check for inline hedges
hedge_words = ["i think", "i believe", "probably", "likely", "maybe", "perhaps"]
return any(word in text_lower for word in hedge_words)
def _render_response(self, claims: List[Claim]) -> str:
"""
Render response with source distinction markers.
Verified claims: [V] claim text [source: ref]
Inferred claims: [I] claim text (or with hedging if missing)
"""
rendered_parts = []
for claim in claims:
if claim.source_type == "verified":
part = f"[V] {claim.text}"
if claim.source_ref:
part += f" [source: {claim.source_ref}]"
else: # inferred
if not claim.hedged:
# Add hedging if missing
hedged_text = f"I think {claim.text[0].lower()}{claim.text[1:]}" if claim.text else claim.text
part = f"[I] {hedged_text}"
else:
part = f"[I] {claim.text}"
rendered_parts.append(part)
return " ".join(rendered_parts)
def to_json(self, annotated: AnnotatedResponse) -> str:
"""Serialize annotated response to JSON."""
return json.dumps(
{
"original_text": annotated.original_text,
"rendered_text": annotated.rendered_text,
"has_unverified": annotated.has_unverified,
"claims": [asdict(c) for c in annotated.claims],
},
indent=2,
ensure_ascii=False,
)

194
tests/test_blackboard.py Normal file
View File

@@ -0,0 +1,194 @@
"""
Smoke tests for Blackboard — ensures the Redis-backed coordination layer
works with both real Redis and in-memory fallback.
"""
import json
import time
import pytest
from src.timmy.blackboard import Blackboard, BlackboardConfig, _MemoryBackend
class TestBlackboardBasics:
"""Test core key-value operations."""
def test_kv_memory_backend(self):
"""KV operations work using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Set and get
assert bb.set("test:key", "hello") is True
assert bb.get("test:key") == "hello"
# Dict serialization
assert bb.set("test:obj", {"a": 1, "b": 2}) is True
val = bb.get("test:obj")
assert json.loads(val) == {"a": 1, "b": 2}
# Exists
assert bb.exists("test:key") is True
assert bb.exists("missing") is False
# Delete
assert bb.delete("test:key") is True
assert bb.get("test:key") is None
# Keys with prefix
bb.set("agent:timmy:state", "ready")
bb.set("agent:ezra:state", "idle")
keys = bb.keys("agent:*:state")
assert len(keys) == 2
assert "timmy" in keys[0] or "ezra" in keys[0]
# Clear namespace
assert bb.clear_namespace("agent:*") == 2
assert bb.keys("agent:*") == []
class TestBlackboardPubSub:
"""Test pub/sub coordination patterns."""
def test_pubsub_memory_backend(self):
"""Publish/subscribe works using in-memory backend."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
received = []
def callback(channel, message):
received.append((channel, message))
bb.subscribe("dispatch:new", callback)
# Publish
count = bb.publish("dispatch:new", {"issue": 123, "action": "comment"})
assert count == 1
assert len(received) == 1
ch, msg = received[0]
assert ch == "dispatch:new"
assert msg == {"issue": 123, "action": "comment"}
bb.unsubscribe("dispatch:new", callback)
bb.publish("dispatch:new", {"should": "not arrive"})
assert len(received) == 1 # no new messages
def test_publish_without_subscribers(self):
"""Publish returns 0 when no subscribers."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
count = bb.publish("empty:channel", {"msg": 1})
assert count == 0
class TestBlackboardConfig:
"""Test configuration parsing and validation."""
def test_default_config(self):
cfg = BlackboardConfig()
assert cfg.enabled is True
assert cfg.redis_url == "redis://localhost:6379/0"
assert cfg.keyspace_prefix == "timmy"
assert cfg.ttl_seconds == 3600
assert cfg.fallback_to_memory is True
def test_custom_config(self):
cfg = BlackboardConfig(
enabled=False,
redis_url="redis://192.168.1.10:6379/1",
keyspace_prefix="myagent",
ttl_seconds=1800,
fallback_to_memory=False,
)
assert cfg.enabled is False
assert cfg.redis_url == "redis://192.168.1.10:6379/1"
assert cfg.keyspace_prefix == "myagent"
assert cfg.ttl_seconds == 1800
assert cfg.fallback_to_memory is False
class TestKeyspacePrefix:
"""Test that keys are correctly prefixed."""
def test_prefixed_keys(self):
bb = Blackboard(BlackboardConfig(keyspace_prefix="myagent", fallback_to_memory=True))
bb.set("thought", "test")
# Internal key should be "myagent:thought"
# We can verify by checking keys()
keys = bb.keys("*")
assert any("myagent:thought" in k for k in keys)
class TestBlackboardIntegration:
"""Integration pattern: agent thought cycle."""
def test_agent_thought_cycle(self):
"""Simulate Timmy writing a thought and Ezra reading it."""
bb = Blackboard(BlackboardConfig(fallback_to_memory=True, enabled=True))
# Agent A writes observation
bb.set("agent:timmy:observation", "Gitea queue has 12 open issues")
# Agent B reads
obs = bb.get("agent:timmy:observation")
assert obs == "Gitea queue has 12 open issues"
# Agent B writes analysis
bb.set("agent:ezra:analysis", "Prioritize critical bugs first")
# Event-driven pattern
events = []
def on_plan(channel, message):
events.append(message)
bb.subscribe("fleet:plan", on_plan)
bb.publish("fleet:plan", {"phase": "triaging", "lead": "ezra"})
assert len(events) == 1
assert events[0]["phase"] == "triaging"
class TestTTL:
"""Test TTL handling (where supported)."""
def test_ttl_set_in_config(self):
cfg = BlackboardConfig(ttl_seconds=60, fallback_to_memory=True)
bb = Blackboard(cfg)
assert bb.ttl == 60
# Setting a value uses TTL from config
bb.set("temp:key", "expiring value")
# In memory backend ignores TTL, but value is set
assert bb.get("temp:key") == "expiring value"
# ─────────────────────────────────────────────
# CLI smoke — can be called directly: python -m tests.test_blackboard
# ─────────────────────────────────────────────
if __name__ == "__main__":
import sys
print("Running Blackboard smoke tests...")
suite = [
TestBlackboardBasics().test_kv_memory_backend,
TestBlackboardPubSub().test_pubsub_memory_backend,
TestBlackboardConfig().test_default_config,
TestBlackboardIntegration().test_agent_thought_cycle,
]
failures = 0
for test in suite:
name = test.__name__
try:
test()
print(f"{name}")
except AssertionError as e:
print(f"{name}: {e}")
failures += 1
except Exception as e:
print(f"{name}: ERROR — {e}")
failures += 1
print(f"\nRan {len(suite)} tests, {failures} failures")
sys.exit(failures)

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env python3
"""Tests for claim_annotator.py — verifies source distinction is present."""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from timmy.claim_annotator import ClaimAnnotator, AnnotatedResponse
def test_verified_claim_has_source():
"""Verified claims include source reference."""
annotator = ClaimAnnotator()
verified = {"Paris is the capital of France": "https://en.wikipedia.org/wiki/Paris"}
response = "Paris is the capital of France. It is a beautiful city."
result = annotator.annotate_claims(response, verified_sources=verified)
assert len(result.claims) > 0
verified_claims = [c for c in result.claims if c.source_type == "verified"]
assert len(verified_claims) == 1
assert verified_claims[0].source_ref == "https://en.wikipedia.org/wiki/Paris"
assert "[V]" in result.rendered_text
assert "[source:" in result.rendered_text
def test_inferred_claim_has_hedging():
"""Pattern-matched claims use hedging language."""
annotator = ClaimAnnotator()
response = "The weather is nice today. It might rain tomorrow."
result = annotator.annotate_claims(response)
inferred_claims = [c for c in result.claims if c.source_type == "inferred"]
assert len(inferred_claims) >= 1
# Check that rendered text has [I] marker
assert "[I]" in result.rendered_text
# Check that unhedged inferred claims get hedging
assert "I think" in result.rendered_text or "I believe" in result.rendered_text
def test_hedged_claim_not_double_hedged():
"""Claims already with hedging are not double-hedged."""
annotator = ClaimAnnotator()
response = "I think the sky is blue. It is a nice day."
result = annotator.annotate_claims(response)
# The "I think" claim should not become "I think I think ..."
assert "I think I think" not in result.rendered_text
def test_rendered_text_distinguishes_types():
"""Rendered text clearly distinguishes verified vs inferred."""
annotator = ClaimAnnotator()
verified = {"Earth is round": "https://science.org/earth"}
response = "Earth is round. Stars are far away."
result = annotator.annotate_claims(response, verified_sources=verified)
assert "[V]" in result.rendered_text # verified marker
assert "[I]" in result.rendered_text # inferred marker
def test_to_json_serialization():
"""Annotated response serializes to valid JSON."""
annotator = ClaimAnnotator()
response = "Test claim."
result = annotator.annotate_claims(response)
json_str = annotator.to_json(result)
parsed = json.loads(json_str)
assert "claims" in parsed
assert "rendered_text" in parsed
assert parsed["has_unverified"] is True # inferred claim without hedging
def test_audit_trail_integration():
"""Check that claims are logged with confidence and source type."""
# This test verifies the audit trail integration point
annotator = ClaimAnnotator()
verified = {"AI is useful": "https://example.com/ai"}
response = "AI is useful. It can help with tasks."
result = annotator.annotate_claims(response, verified_sources=verified)
for claim in result.claims:
assert claim.source_type in ("verified", "inferred")
assert claim.confidence in ("high", "medium", "low", "unknown")
if claim.source_type == "verified":
assert claim.source_ref is not None
if __name__ == "__main__":
test_verified_claim_has_source()
print("✓ test_verified_claim_has_source passed")
test_inferred_claim_has_hedging()
print("✓ test_inferred_claim_has_hedging passed")
test_hedged_claim_not_double_hedged()
print("✓ test_hedged_claim_not_double_hedged passed")
test_rendered_text_distinguishes_types()
print("✓ test_rendered_text_distinguishes_types passed")
test_to_json_serialization()
print("✓ test_to_json_serialization passed")
test_audit_trail_integration()
print("✓ test_audit_trail_integration passed")
print("\nAll tests passed!")