Files
timmy-telemetry/exporter/exporter.py
perplexity 1a800d0c7b Fix: volume mount tilde expansion, PR counting bug, startup diagnostics
- docker-compose: use ${HOME}/.timmy instead of ~/.timmy (Docker doesn't expand tilde)
- exporter: start HTTP server BEFORE first collection (so /metrics is always available)
- exporter: add startup diagnostics (data dir check, Gitea reachability)
- exporter: fix agent PR counting that would crash on _value access
2026-03-27 01:26:48 +00:00

548 lines
19 KiB
Python

"""Timmy Telemetry Exporter — Prometheus metrics from sovereign infrastructure.
Reads existing data sources on Hermes and exposes them as /metrics:
- Gitea API → commit velocity, PR throughput, issue burn
- Heartbeat ticks → uptime, perception health
- Local inference JSONL → model call rates, latency proxy
- Model health JSON → Ollama status
- Sovereignty score → computed composite
Runs as a sidecar container, volumes mounted read-only.
"""
import json
import glob
import os
import time
import threading
from datetime import datetime, timezone, timedelta
from pathlib import Path
import requests
from prometheus_client import (
start_http_server, Gauge, Counter, Info, Summary, Enum,
REGISTRY, CollectorRegistry,
)
# ── Config ───────────────────────────────────────────────────────────
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
TIMMY_DATA = Path(os.environ.get("TIMMY_DATA", "/data/timmy"))
HERMES_DATA = Path(os.environ.get("HERMES_DATA", "/data/hermes"))
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-config",
]
SCRAPE_INTERVAL = 30 # seconds
# ── Prometheus Metrics ───────────────────────────────────────────────
# == TOP 5 COMMON SENSE METRICS ==
# 1. Commit velocity (commits in last 24h per repo)
commits_24h = Gauge(
"timmy_commits_24h",
"Commits in the last 24 hours",
["repo"],
)
# 2. Open issues / open PRs
open_issues = Gauge(
"timmy_open_issues",
"Open issue count",
["repo"],
)
open_prs = Gauge(
"timmy_open_prs",
"Open PR count",
["repo"],
)
# 3. Heartbeat health (ticks in last hour, consecutive healthy ticks)
heartbeat_ticks_1h = Gauge(
"timmy_heartbeat_ticks_1h",
"Heartbeat ticks recorded in the last hour",
)
heartbeat_consecutive_healthy = Gauge(
"timmy_heartbeat_consecutive_healthy",
"Consecutive heartbeat ticks with severity=ok",
)
heartbeat_last_tick_age_seconds = Gauge(
"timmy_heartbeat_last_tick_age_seconds",
"Seconds since last heartbeat tick",
)
# 4. Local inference stats (calls today, success rate)
inference_calls_today = Gauge(
"timmy_inference_calls_today",
"Local model inference calls today",
["model"],
)
inference_success_rate = Gauge(
"timmy_inference_success_rate",
"Local inference success rate (0-1) today",
["model"],
)
inference_avg_response_len = Gauge(
"timmy_inference_avg_response_len",
"Average response length (chars) today",
["model"],
)
# 5. Model health (is Ollama up, is inference working)
ollama_up = Gauge(
"timmy_ollama_up",
"Whether Ollama process is running (1=yes, 0=no)",
)
ollama_api_up = Gauge(
"timmy_ollama_api_up",
"Whether Ollama API is responding (1=yes, 0=no)",
)
ollama_inference_ok = Gauge(
"timmy_ollama_inference_ok",
"Whether local inference smoke test passed (1=yes, 0=no)",
)
models_loaded_count = Gauge(
"timmy_models_loaded",
"Number of models loaded in Ollama",
)
# == SOVEREIGNTY SCORE ==
sovereignty_score = Gauge(
"timmy_sovereignty_score",
"Composite sovereignty index (0-100)",
)
sovereignty_dimension = Gauge(
"timmy_sovereignty_dimension",
"Individual sovereignty dimension score (0-100)",
["dimension"],
)
# == BONUS: Gitea agent activity ==
agent_commits_24h = Gauge(
"timmy_agent_commits_24h",
"Commits by agent in last 24h",
["agent"],
)
agent_prs_open = Gauge(
"timmy_agent_prs_open",
"Open PRs by agent",
["agent"],
)
dpo_pairs_staged = Gauge(
"timmy_dpo_pairs_staged",
"DPO training pair files exported",
)
alerts_today = Gauge(
"timmy_alerts_today",
"Heartbeat alerts logged today",
)
gitea_up_gauge = Gauge(
"timmy_gitea_up",
"Whether Gitea API is reachable (1=yes, 0=no)",
)
# ── Data Collection Functions ────────────────────────────────────────
def gitea_get(path, params=None):
"""GET from Gitea API with token auth."""
headers = {"Authorization": f"token {GITEA_TOKEN}"} if GITEA_TOKEN else {}
try:
r = requests.get(
f"{GITEA_URL}/api/v1{path}",
headers=headers,
params=params or {},
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception:
return None
def collect_gitea_metrics():
"""Pull commit velocity, issue/PR counts, agent activity from Gitea."""
since_24h = (datetime.now(timezone.utc) - timedelta(hours=24)).isoformat()
agents = ["claude", "gemini", "kimi", "grok", "perplexity", "Timmy", "Rockachopa"]
agent_commit_counts = {a: 0 for a in agents}
agent_pr_counts = {a: 0 for a in agents}
gitea_alive = False
for repo in REPOS:
# Commits in last 24h
commits = gitea_get(f"/repos/{repo}/commits", {"since": since_24h, "limit": 50})
if commits is not None:
gitea_alive = True
commits_24h.labels(repo=repo).set(len(commits))
# Count per-agent commits
for c in commits:
author = c.get("commit", {}).get("author", {}).get("name", "")
for agent in agents:
if agent.lower() in author.lower():
agent_commit_counts[agent] += 1
else:
commits_24h.labels(repo=repo).set(0)
# Open issues
issues = gitea_get(f"/repos/{repo}/issues", {"state": "open", "type": "issues", "limit": 1})
if issues is not None:
# Gitea returns paginated — get total from header or count
all_issues = gitea_get(f"/repos/{repo}/issues", {"state": "open", "type": "issues", "limit": 50})
open_issues.labels(repo=repo).set(len(all_issues) if all_issues else 0)
# Open PRs
prs = gitea_get(f"/repos/{repo}/pulls", {"state": "open", "limit": 50})
if prs is not None:
open_prs.labels(repo=repo).set(len(prs))
# Count per-agent PRs
for pr in prs:
author = pr.get("user", {}).get("login", "")
for agent in agents:
if agent.lower() == author.lower():
agent_pr_counts[agent] = agent_pr_counts.get(agent, 0) + 1
# Set agent commit gauges
for agent, count in agent_commit_counts.items():
agent_commits_24h.labels(agent=agent).set(count)
for agent, count in agent_pr_counts.items():
agent_prs_open.labels(agent=agent).set(count)
gitea_up_gauge.set(1 if gitea_alive else 0)
return gitea_alive
def collect_heartbeat_metrics():
"""Read heartbeat tick JSONL files from ~/.timmy/heartbeat/."""
tick_dir = TIMMY_DATA / "heartbeat"
today = datetime.now(timezone.utc).strftime("%Y%m%d")
tick_file = tick_dir / f"ticks_{today}.jsonl"
if not tick_file.exists():
heartbeat_ticks_1h.set(0)
heartbeat_consecutive_healthy.set(0)
heartbeat_last_tick_age_seconds.set(9999)
alerts_today.set(0)
return
ticks = []
alert_count = 0
for line in tick_file.read_text().strip().split("\n"):
if not line.strip():
continue
try:
t = json.loads(line)
ticks.append(t)
actions = t.get("decision", {}).get("actions", [])
alert_count += len(actions)
except (json.JSONDecodeError, KeyError):
continue
alerts_today.set(alert_count)
if not ticks:
heartbeat_ticks_1h.set(0)
heartbeat_consecutive_healthy.set(0)
heartbeat_last_tick_age_seconds.set(9999)
return
# Ticks in last hour
one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1)
recent = 0
for t in ticks:
try:
ts = datetime.fromisoformat(t["timestamp"].replace("Z", "+00:00"))
if ts > one_hour_ago:
recent += 1
except (KeyError, ValueError):
pass
heartbeat_ticks_1h.set(recent)
# Consecutive healthy (severity = ok from end)
consecutive = 0
for t in reversed(ticks):
severity = t.get("decision", {}).get("severity", "unknown")
if severity == "ok":
consecutive += 1
else:
break
heartbeat_consecutive_healthy.set(consecutive)
# Age of last tick
try:
last_ts = datetime.fromisoformat(ticks[-1]["timestamp"].replace("Z", "+00:00"))
age = (datetime.now(timezone.utc) - last_ts).total_seconds()
heartbeat_last_tick_age_seconds.set(age)
except (KeyError, ValueError):
heartbeat_last_tick_age_seconds.set(9999)
def collect_inference_metrics():
"""Read local inference JSONL from ~/.timmy/metrics/."""
metrics_dir = TIMMY_DATA / "metrics"
today = datetime.now(timezone.utc).strftime("%Y%m%d")
metrics_file = metrics_dir / f"local_{today}.jsonl"
if not metrics_file.exists():
return
# Aggregate per model
model_stats = {} # model -> {calls, successes, total_response_len}
for line in metrics_file.read_text().strip().split("\n"):
if not line.strip():
continue
try:
record = json.loads(line)
model = record.get("model", "unknown")
if model not in model_stats:
model_stats[model] = {"calls": 0, "successes": 0, "total_resp_len": 0}
model_stats[model]["calls"] += 1
if record.get("success"):
model_stats[model]["successes"] += 1
model_stats[model]["total_resp_len"] += record.get("response_len", 0)
except (json.JSONDecodeError, KeyError):
continue
for model, stats in model_stats.items():
inference_calls_today.labels(model=model).set(stats["calls"])
rate = stats["successes"] / stats["calls"] if stats["calls"] > 0 else 0
inference_success_rate.labels(model=model).set(round(rate, 3))
avg_len = stats["total_resp_len"] / stats["successes"] if stats["successes"] > 0 else 0
inference_avg_response_len.labels(model=model).set(round(avg_len))
def collect_model_health():
"""Read model health from ~/.hermes/model_health.json."""
health_file = HERMES_DATA / "model_health.json"
if not health_file.exists():
ollama_up.set(0)
ollama_api_up.set(0)
ollama_inference_ok.set(0)
models_loaded_count.set(0)
return
try:
h = json.loads(health_file.read_text())
ollama_up.set(1 if h.get("ollama_running") else 0)
ollama_api_up.set(1 if h.get("api_responding") else 0)
ollama_inference_ok.set(1 if h.get("inference_ok") else 0)
models_loaded_count.set(len(h.get("models_loaded", [])))
except (json.JSONDecodeError, KeyError):
ollama_up.set(0)
ollama_api_up.set(0)
ollama_inference_ok.set(0)
models_loaded_count.set(0)
def collect_dpo_metrics():
"""Count DPO training pairs staged."""
dpo_dir = TIMMY_DATA / "training-data" / "dpo-pairs"
if dpo_dir.exists():
count = len(list(dpo_dir.glob("*.json")))
dpo_pairs_staged.set(count)
else:
dpo_pairs_staged.set(0)
# ── Sovereignty Score ────────────────────────────────────────────────
def compute_sovereignty_score():
"""Compute composite sovereignty score from available data.
Dimensions (each 0-100):
1. Inference sovereignty — % of calls going to local models
2. Data sovereignty — all data stored locally (Gitea self-hosted, SQLite, JSONL)
3. Compute sovereignty — local CPU/GPU utilization vs cloud
4. Infrastructure sovereignty — self-hosted services vs SaaS
5. Identity sovereignty — Nostr/self-managed keys vs platform accounts
6. Financial sovereignty — Lightning/Cashu vs fiat payment rails
7. Tool sovereignty — self-hosted tools vs cloud SaaS
For v1, we compute what we CAN measure and estimate the rest.
"""
scores = {}
# 1. INFERENCE SOVEREIGNTY
# Read today's inference log — check ratio of local vs cloud calls
metrics_dir = TIMMY_DATA / "metrics"
today = datetime.now(timezone.utc).strftime("%Y%m%d")
metrics_file = metrics_dir / f"local_{today}.jsonl"
local_calls = 0
cloud_calls = 0 # We don't log cloud calls yet, so estimate from known usage
if metrics_file.exists():
for line in metrics_file.read_text().strip().split("\n"):
if not line.strip():
continue
try:
r = json.loads(line)
if r.get("success"):
local_calls += 1
except (json.JSONDecodeError, KeyError):
pass
# Known cloud dependencies: Anthropic (rate-limited, occasional),
# Gemini/Groq agents, Perplexity. Estimate 30% cloud for now.
# As local models improve, this should trend toward 100.
if local_calls > 0:
# Rough estimate: each heartbeat tick = 1 local call (every 10 min = 144/day)
# Agent workers use cloud. Assume 30% cloud overhead for now.
estimated_total = max(local_calls * 1.3, 1)
scores["inference"] = min(100, round((local_calls / estimated_total) * 100))
else:
scores["inference"] = 0
# 2. DATA SOVEREIGNTY
# Gitea is self-hosted ✓, SQLite for Huey ✓, JSONL for metrics ✓
# Minus: some conversation history lives in Anthropic/Gemini cloud sessions
scores["data"] = 75 # High — Gitea+SQLite+JSONL all local. Cloud sessions are the gap.
# 3. COMPUTE SOVEREIGNTY
# Mac M3 Max runs Ollama locally ✓
# But agent workers (gemini, grok) call cloud APIs
# Heartbeat uses local model ✓
health_file = HERMES_DATA / "model_health.json"
ollama_running = False
if health_file.exists():
try:
h = json.loads(health_file.read_text())
ollama_running = h.get("inference_ok", False)
except Exception:
pass
# Base 40 for having Ollama + local models, +30 if inference working, +30 when no cloud agents
scores["compute"] = 40 + (30 if ollama_running else 0)
# Cloud agents (gemini, grok) are still in use → cap at 70
# 4. INFRASTRUCTURE SOVEREIGNTY
# Gitea self-hosted ✓, Huey local ✓, Ollama local ✓
# VPS for Gitea is rented (DigitalOcean) — not fully sovereign
# Prometheus+Grafana (this stack) adds sovereignty ✓
scores["infrastructure"] = 70 # VPS is rented, everything else is self-managed
# 5. IDENTITY SOVEREIGNTY
# Gitea accounts are self-managed ✓
# Nostr identity not yet deployed (see issue #13)
# Still using platform accounts (GitHub, Anthropic, Google) for some things
scores["identity"] = 30 # Self-hosted Gitea accounts, but Nostr not live yet
# 6. FINANCIAL SOVEREIGNTY
# Lightning/Cashu not deployed yet (issues #554, #555)
# Currently paying for cloud APIs with credit card
scores["financial"] = 10 # Lightning+Cashu are tickets, not live yet
# 7. TOOL SOVEREIGNTY
# Hermes Agent harness ✓, Huey ✓, Gitea ✓, Ollama ✓
# Still depends on: aider (cloud), opencode (cloud), Docker Hub
tool_count_local = 5 # hermes-agent, huey, gitea, ollama, prometheus/grafana
tool_count_cloud = 3 # aider (gemini), opencode (grok), docker hub
total_tools = tool_count_local + tool_count_cloud
scores["tools"] = round((tool_count_local / total_tools) * 100) if total_tools > 0 else 0
# Set individual dimension gauges
for dim, score in scores.items():
sovereignty_dimension.labels(dimension=dim).set(score)
# Composite: weighted average
weights = {
"inference": 0.25, # Most important — this is the core work
"data": 0.15,
"compute": 0.20,
"infrastructure": 0.15,
"identity": 0.05,
"financial": 0.05,
"tools": 0.15,
}
composite = sum(scores.get(d, 0) * w for d, w in weights.items())
sovereignty_score.set(round(composite))
return scores, round(composite)
# ── Main Collection Loop ─────────────────────────────────────────────
def collect_all():
"""Run all collectors."""
try:
collect_gitea_metrics()
except Exception as e:
print(f"[WARN] Gitea collection failed: {e}")
try:
collect_heartbeat_metrics()
except Exception as e:
print(f"[WARN] Heartbeat collection failed: {e}")
try:
collect_inference_metrics()
except Exception as e:
print(f"[WARN] Inference collection failed: {e}")
try:
collect_model_health()
except Exception as e:
print(f"[WARN] Model health collection failed: {e}")
try:
collect_dpo_metrics()
except Exception as e:
print(f"[WARN] DPO collection failed: {e}")
try:
scores, composite = compute_sovereignty_score()
print(f"[INFO] Sovereignty score: {composite} | {scores}")
except Exception as e:
print(f"[WARN] Sovereignty computation failed: {e}")
def collection_loop():
"""Background thread that refreshes metrics every SCRAPE_INTERVAL."""
while True:
collect_all()
time.sleep(SCRAPE_INTERVAL)
if __name__ == "__main__":
print("🔭 Timmy Telemetry Exporter starting on :9101")
print(f" Gitea: {GITEA_URL}")
print(f" Timmy data: {TIMMY_DATA}")
print(f" Hermes data: {HERMES_DATA}")
# Diagnostic: check what data is actually accessible
print("\n📂 Data directory check:")
for label, d in [("TIMMY_DATA", TIMMY_DATA), ("HERMES_DATA", HERMES_DATA)]:
exists = d.exists()
print(f" {label} ({d}): {'EXISTS' if exists else 'MISSING'}")
if exists:
contents = list(d.iterdir())[:10]
for c in contents:
print(f" {'dir ' if c.is_dir() else 'file'} {c.name}")
print(f"\n🌐 Gitea check:")
try:
r = requests.get(f"{GITEA_URL}/api/v1/repos/Timmy_Foundation/the-nexus",
headers={"Authorization": f"token {GITEA_TOKEN}"},
timeout=5)
print(f" Gitea API: {r.status_code} ({'OK' if r.status_code == 200 else 'FAIL'})")
except Exception as e:
print(f" Gitea API: UNREACHABLE ({e})")
# Start Prometheus HTTP server FIRST so /metrics is always available
start_http_server(9101)
print("\n🟢 Exporter ready — http://localhost:9101/metrics")
# Initial collection
collect_all()
# Start background collection thread
t = threading.Thread(target=collection_loop, daemon=True)
t.start()
# Block forever
while True:
time.sleep(3600)