Files
timmy-config/tasks.py

2339 lines
84 KiB
Python

"""Timmy's scheduled work — orchestration, sovereignty, heartbeat."""
import glob
import html
import json
import os
import re
import socket
import subprocess
import sys
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
from orchestration import huey
from huey import crontab
from gitea_client import GiteaClient
HERMES_HOME = Path.home() / ".hermes"
TIMMY_HOME = Path.home() / ".timmy"
HERMES_AGENT_DIR = HERMES_HOME / "hermes-agent"
HERMES_PYTHON = HERMES_AGENT_DIR / "venv" / "bin" / "python3"
METRICS_DIR = TIMMY_HOME / "metrics"
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-config",
]
NET_LINE_LIMIT = 10
BRIEFING_DIR = TIMMY_HOME / "briefings" / "good-morning"
TELEGRAM_BOT_TOKEN_FILE = Path.home() / ".config" / "telegram" / "special_bot"
TELEGRAM_CHAT_ID = "-1003664764329"
# ── Local Model Inference via Hermes Harness ─────────────────────────
HEARTBEAT_MODEL = "hermes4:14b"
FALLBACK_MODEL = "hermes3:8b"
LOCAL_PROVIDER_BASE_URL = "http://localhost:8081/v1"
LOCAL_PROVIDER_MODEL = HEARTBEAT_MODEL
JSON_DECODER = json.JSONDecoder()
def newest_file(directory, pattern):
files = sorted(directory.glob(pattern))
return files[-1] if files else None
def run_hermes_local(
prompt,
model=None,
caller_tag=None,
toolsets=None,
system_prompt=None,
disable_all_tools=False,
skip_context_files=False,
skip_memory=False,
max_iterations=30,
):
"""Call a local model through the Hermes harness.
Runs Hermes inside its own venv so task execution matches the same
environment and provider routing as normal Hermes usage.
Returns response text plus session metadata or None on failure.
Every call creates a Hermes session with telemetry.
"""
_model = model or HEARTBEAT_MODEL
tagged = f"[{caller_tag}] {prompt}" if caller_tag else prompt
try:
runner = """
import io
import json
import sys
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
agent_dir = Path(sys.argv[1])
query = sys.argv[2]
model = sys.argv[3]
system_prompt = sys.argv[4] or None
disable_all_tools = sys.argv[5] == "1"
skip_context_files = sys.argv[6] == "1"
skip_memory = sys.argv[7] == "1"
max_iterations = int(sys.argv[8])
if str(agent_dir) not in sys.path:
sys.path.insert(0, str(agent_dir))
from hermes_cli.runtime_provider import resolve_runtime_provider
from run_agent import AIAgent
from toolsets import get_all_toolsets
buf = io.StringIO()
err = io.StringIO()
payload = {}
exit_code = 0
try:
runtime = resolve_runtime_provider()
kwargs = {
"model": model,
"api_key": runtime.get("api_key"),
"base_url": runtime.get("base_url"),
"provider": runtime.get("provider"),
"api_mode": runtime.get("api_mode"),
"acp_command": runtime.get("command"),
"acp_args": list(runtime.get("args") or []),
"max_iterations": max_iterations,
"quiet_mode": True,
"ephemeral_system_prompt": system_prompt,
"skip_context_files": skip_context_files,
"skip_memory": skip_memory,
}
if disable_all_tools:
kwargs["disabled_toolsets"] = sorted(get_all_toolsets().keys())
agent = AIAgent(**kwargs)
with redirect_stdout(buf), redirect_stderr(err):
result = agent.run_conversation(query, sync_honcho=False)
payload = {
"response": result.get("final_response", ""),
"session_id": getattr(agent, "session_id", None),
"provider": runtime.get("provider"),
"base_url": runtime.get("base_url"),
"stdout": buf.getvalue(),
"stderr": err.getvalue(),
}
except Exception as exc:
exit_code = 1
payload = {
"error": str(exc),
"stdout": buf.getvalue(),
"stderr": err.getvalue(),
}
print(json.dumps(payload))
sys.exit(exit_code)
"""
command = [
str(HERMES_PYTHON) if HERMES_PYTHON.exists() else sys.executable,
"-c",
runner,
str(HERMES_AGENT_DIR),
tagged,
_model,
system_prompt or "",
"1" if disable_all_tools else "0",
"1" if skip_context_files else "0",
"1" if skip_memory else "0",
str(max_iterations),
]
result = subprocess.run(
command,
cwd=str(HERMES_AGENT_DIR),
capture_output=True,
text=True,
timeout=900,
)
payload = json.loads((result.stdout or "").strip() or "{}")
output = str(payload.get("response", "")).strip()
stderr_output = str(payload.get("stderr", "")).strip()
stdout_output = str(payload.get("stdout", "")).strip()
if result.returncode != 0:
raise RuntimeError(
(
result.stderr
or str(payload.get("error", "")).strip()
or stderr_output
or stdout_output
or output
or "hermes run failed"
).strip()
)
session_id = payload.get("session_id")
response = output
# Log to metrics jsonl
METRICS_DIR.mkdir(parents=True, exist_ok=True)
metrics_file = METRICS_DIR / f"local_{datetime.now().strftime('%Y%m%d')}.jsonl"
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"model": _model,
"caller": caller_tag or "unknown",
"prompt_len": len(prompt),
"response_len": len(response),
"session_id": session_id,
"success": bool(response),
}
with open(metrics_file, "a") as f:
f.write(json.dumps(record) + "\n")
if not response:
return None
return {
"response": response,
"session_id": session_id,
"raw_output": json.dumps(payload, sort_keys=True),
}
except Exception as e:
# Log failure
METRICS_DIR.mkdir(parents=True, exist_ok=True)
metrics_file = METRICS_DIR / f"local_{datetime.now().strftime('%Y%m%d')}.jsonl"
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"model": _model,
"caller": caller_tag or "unknown",
"error": str(e),
"success": False,
}
with open(metrics_file, "a") as f:
f.write(json.dumps(record) + "\n")
return None
def hermes_local(prompt, model=None, caller_tag=None, toolsets=None):
result = run_hermes_local(
prompt=prompt,
model=model,
caller_tag=caller_tag,
toolsets=toolsets,
)
if not result:
return None
return result.get("response")
ARCHIVE_EPHEMERAL_SYSTEM_PROMPT = (
"You are running a private archive-processing microtask for Timmy.\n"
"Use only the supplied user message.\n"
"Do not use tools, memory, Honcho, SOUL.md, AGENTS.md, or outside knowledge.\n"
"Do not invent facts.\n"
"If the prompt requests JSON, return only valid JSON."
)
def run_archive_hermes(prompt, caller_tag, model=None):
return run_hermes_local(
prompt=prompt,
model=model,
caller_tag=caller_tag,
system_prompt=ARCHIVE_EPHEMERAL_SYSTEM_PROMPT,
disable_all_tools=True,
skip_context_files=True,
skip_memory=True,
max_iterations=3,
)
# ── Know Thy Father: Twitter Archive Ingestion ───────────────────────
ARCHIVE_DIR = TIMMY_HOME / "twitter-archive"
ARCHIVE_EXTRACTED_DIR = ARCHIVE_DIR / "extracted"
ARCHIVE_NOTES_DIR = ARCHIVE_DIR / "notes"
ARCHIVE_KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge"
ARCHIVE_CANDIDATES_DIR = ARCHIVE_KNOWLEDGE_DIR / "candidates"
ARCHIVE_PROFILE_FILE = ARCHIVE_KNOWLEDGE_DIR / "profile.json"
ARCHIVE_CHANGES_FILE = ARCHIVE_KNOWLEDGE_DIR / "changes.jsonl"
ARCHIVE_INSIGHTS_DIR = ARCHIVE_DIR / "insights"
ARCHIVE_TRAINING_DIR = ARCHIVE_DIR / "training"
ARCHIVE_TRAINING_EXAMPLES_DIR = ARCHIVE_TRAINING_DIR / "examples"
ARCHIVE_TRAINING_DPO_DIR = ARCHIVE_TRAINING_DIR / "dpo"
ARCHIVE_TRAINING_EVALS_DIR = ARCHIVE_TRAINING_DIR / "evals"
ARCHIVE_TRAINING_RUNS_DIR = ARCHIVE_TRAINING_DIR / "runs"
ARCHIVE_METRICS_DIR = ARCHIVE_DIR / "metrics"
ARCHIVE_CHECKPOINT = ARCHIVE_DIR / "checkpoint.json"
ARCHIVE_LOCK = ARCHIVE_DIR / ".lock"
ARCHIVE_PROGRESS_FILE = ARCHIVE_METRICS_DIR / "progress.json"
ARCHIVE_SOURCE_CONFIG = ARCHIVE_DIR / "source_config.json"
ARCHIVE_PIPELINE_CONFIG = ARCHIVE_DIR / "pipeline_config.json"
ARCHIVE_TWEETS_FILE = ARCHIVE_EXTRACTED_DIR / "tweets.jsonl"
ARCHIVE_RETWEETS_FILE = ARCHIVE_EXTRACTED_DIR / "retweets.jsonl"
ARCHIVE_MANIFEST_FILE = ARCHIVE_EXTRACTED_DIR / "manifest.json"
ARCHIVE_TRAIN_STATE_FILE = ARCHIVE_TRAINING_DIR / "last_train_state.json"
ARCHIVE_ACTIVE_MODEL_FILE = ARCHIVE_TRAINING_DIR / "active_model.json"
ARCHIVE_PROMOTION_STATE_FILE = ARCHIVE_TRAINING_DIR / "promotion_state.json"
ARCHIVE_BATCH_SIZE = 50
def ensure_archive_layout():
for path in (
ARCHIVE_DIR,
ARCHIVE_EXTRACTED_DIR,
ARCHIVE_NOTES_DIR,
ARCHIVE_KNOWLEDGE_DIR,
ARCHIVE_CANDIDATES_DIR,
ARCHIVE_INSIGHTS_DIR,
ARCHIVE_TRAINING_DIR,
ARCHIVE_TRAINING_EXAMPLES_DIR,
ARCHIVE_TRAINING_DPO_DIR,
ARCHIVE_TRAINING_EVALS_DIR,
ARCHIVE_TRAINING_RUNS_DIR,
ARCHIVE_METRICS_DIR,
):
path.mkdir(parents=True, exist_ok=True)
def read_json(path, default):
if not path.exists():
return json.loads(json.dumps(default))
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
return json.loads(json.dumps(default))
def write_json(path, payload):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")
def write_text(path, payload):
path.parent.mkdir(parents=True, exist_ok=True)
cleaned = payload.rstrip()
path.write_text((cleaned + "\n") if cleaned else "")
def load_jsonl(path):
if not path.exists():
return []
rows = []
for line in path.read_text().splitlines():
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def write_jsonl(path, rows):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as handle:
for row in rows:
handle.write(json.dumps(row, sort_keys=True) + "\n")
def append_jsonl(path, rows):
if not rows:
return
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "a") as handle:
for row in rows:
handle.write(json.dumps(row, sort_keys=True) + "\n")
def latest_path(directory, pattern):
matches = sorted(directory.glob(pattern))
return matches[-1] if matches else None
def count_jsonl_rows(path):
if not path.exists():
return 0
with open(path) as handle:
return sum(1 for line in handle if line.strip())
def port_open(port):
sock = socket.socket()
sock.settimeout(1)
try:
sock.connect(("127.0.0.1", port))
return True
except Exception:
return False
finally:
sock.close()
def fetch_http_title(url):
try:
with urllib.request.urlopen(url, timeout=5) as resp:
raw = resp.read().decode("utf-8", "ignore")
match = re.search(r"<title>(.*?)</title>", raw, re.IGNORECASE | re.DOTALL)
return match.group(1).strip() if match else "NO TITLE"
except Exception as exc:
return f"ERROR: {exc}"
def latest_files(root, limit=5):
root = Path(root)
if not root.exists():
return []
items = []
for path in root.rglob("*"):
if not path.is_file():
continue
try:
stat = path.stat()
except OSError:
continue
items.append((stat.st_mtime, path, stat.st_size))
items.sort(reverse=True)
return [
{
"path": str(path),
"mtime": datetime.fromtimestamp(mtime).isoformat(),
"size": size,
}
for mtime, path, size in items[:limit]
]
def read_jsonl_rows(path):
path = Path(path)
if not path.exists():
return []
rows = []
with open(path) as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except Exception:
continue
return rows
def telegram_send_document(path, caption):
if not TELEGRAM_BOT_TOKEN_FILE.exists():
return {"ok": False, "error": "token file missing"}
token = TELEGRAM_BOT_TOKEN_FILE.read_text().strip()
result = subprocess.run(
[
"curl",
"-s",
"-X",
"POST",
f"https://api.telegram.org/bot{token}/sendDocument",
"-F",
f"chat_id={TELEGRAM_CHAT_ID}",
"-F",
f"caption={caption}",
"-F",
f"document=@{path}",
],
capture_output=True,
text=True,
timeout=30,
)
try:
return json.loads(result.stdout.strip() or "{}")
except Exception:
return {"ok": False, "error": result.stdout.strip() or result.stderr.strip()}
def telegram_send_message(text, parse_mode="HTML"):
if not TELEGRAM_BOT_TOKEN_FILE.exists():
return {"ok": False, "error": "token file missing"}
token = TELEGRAM_BOT_TOKEN_FILE.read_text().strip()
payload = urllib.parse.urlencode(
{
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": parse_mode,
"disable_web_page_preview": "false",
}
).encode()
try:
req = urllib.request.Request(
f"https://api.telegram.org/bot{token}/sendMessage",
data=payload,
)
with urllib.request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode())
except Exception as exc:
return {"ok": False, "error": str(exc)}
def open_report_in_browser(path):
try:
subprocess.run(["open", str(path)], check=True, timeout=10)
return {"ok": True}
except Exception as exc:
return {"ok": False, "error": str(exc)}
def render_evening_html(title, subtitle, executive_summary, local_pulse, gitea_lines, research_lines, what_matters, look_first):
return f"""<!doctype html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\">
<title>{html.escape(title)}</title>
<style>
:root {{ --bg:#07101b; --panel:#0d1b2a; --text:#ecf3ff; --muted:#9bb1c9; --accent:#5eead4; --link:#8ec5ff; }}
* {{ box-sizing:border-box; }}
body {{ margin:0; font-family:Inter,system-ui,-apple-system,sans-serif; background:radial-gradient(circle at top,#14253a 0%,#07101b 55%,#04080f 100%); color:var(--text); }}
.wrap {{ max-width:1100px; margin:0 auto; padding:48px 22px 80px; }}
.hero {{ background:linear-gradient(135deg, rgba(94,234,212,.14), rgba(124,58,237,.16)); border:1px solid rgba(142,197,255,.16); border-radius:24px; padding:34px 30px; box-shadow:0 20px 50px rgba(0,0,0,.25); }}
.kicker {{ text-transform:uppercase; letter-spacing:.16em; color:var(--accent); font-size:12px; font-weight:700; }}
h1 {{ margin:10px 0 8px; font-size:42px; line-height:1.05; }}
.subtitle {{ color:var(--muted); font-size:15px; }}
.grid {{ display:grid; grid-template-columns:repeat(auto-fit,minmax(280px,1fr)); gap:18px; margin-top:24px; }}
.card {{ background:rgba(13,27,42,.9); border:1px solid rgba(142,197,255,.12); border-radius:20px; padding:20px; }}
.card h2 {{ margin:0 0 12px; font-size:22px; }}
.card p, .card li {{ line-height:1.55; }}
.card ul {{ margin:0; padding-left:18px; }}
a {{ color:var(--link); text-decoration:none; }}
a:hover {{ text-decoration:underline; }}
.footer {{ margin-top:26px; color:var(--muted); font-size:14px; }}
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"hero\">
<div class=\"kicker\">timmy time · morning report</div>
<h1>{html.escape(title)}</h1>
<div class=\"subtitle\">{html.escape(subtitle)}</div>
</div>
<div class=\"grid\">
<div class=\"card\"><h2>Executive Summary</h2><p>{html.escape(executive_summary)}</p></div>
<div class=\"card\"><h2>Local Pulse</h2><ul>{''.join(f'<li>{html.escape(line)}</li>' for line in local_pulse)}</ul></div>
</div>
<div class=\"grid\">
<div class=\"card\"><h2>Gitea Pulse</h2><ul>{''.join(f'<li>{line}</li>' for line in gitea_lines)}</ul></div>
<div class=\"card\"><h2>Pertinent Research</h2><ul>{''.join(f'<li>{html.escape(line)}</li>' for line in research_lines)}</ul></div>
<div class=\"card\"><h2>What Matters Today</h2><ul>{''.join(f'<li>{html.escape(line)}</li>' for line in what_matters)}</ul></div>
</div>
<div class=\"card\" style=\"margin-top:18px\"><h2>Look Here First</h2><p>{html.escape(look_first)}</p></div>
<div class=\"footer\">Generated locally on the Mac for Alexander Whitestone. Sovereignty and service always.</div>
</div>
</body>
</html>"""
def archive_default_checkpoint():
return {
"data_source": "tweets",
"batch_size": ARCHIVE_BATCH_SIZE,
"next_offset": 0,
"batches_completed": 0,
"phase": "discovery",
"confidence": "low",
"next_focus": "look for recurring themes and recurring people",
"understanding_version": 0,
"last_batch_id": None,
"last_batch_sessions": {},
"last_profile_update": None,
"last_dpo_build": None,
"last_insight_file": None,
}
def load_archive_checkpoint():
checkpoint = archive_default_checkpoint()
checkpoint.update(read_json(ARCHIVE_CHECKPOINT, {}))
return checkpoint
def load_pipeline_config():
return read_json(ARCHIVE_PIPELINE_CONFIG, {})
def load_train_state():
return read_json(
ARCHIVE_TRAIN_STATE_FILE,
{
"last_total_batches": 0,
"last_total_pairs": 0,
"last_candidate_id": None,
"awaiting_eval": False,
"last_run_status": "never-run",
"last_run_at": None,
},
)
def extract_first_json_object(text):
cleaned = text.strip().replace("```json", "").replace("```", "")
for index, character in enumerate(cleaned):
if character != "{":
continue
try:
payload, _ = JSON_DECODER.raw_decode(cleaned[index:])
except json.JSONDecodeError:
continue
if isinstance(payload, dict):
return payload
raise ValueError("No JSON object found")
def parse_json_output(stdout="", stderr=""):
for source in (stdout or "", stderr or ""):
if not source.strip():
continue
try:
return extract_first_json_object(source)
except ValueError:
continue
return {}
def run_timmy_home_module(module_name, args=None, timeout=120):
ensure_archive_layout()
command = [sys.executable, "-m", module_name]
if args:
command.extend(args)
result = subprocess.run(
command,
cwd=str(TIMMY_HOME),
capture_output=True,
text=True,
timeout=timeout,
)
payload = parse_json_output(result.stdout, result.stderr)
if not payload:
payload = {
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
}
payload["returncode"] = result.returncode
if result.returncode != 0:
payload.setdefault("status", "error")
else:
payload.setdefault("status", "ok")
return payload
def archive_counts():
total_batches = len(list(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json")))
total_pairs = sum(count_jsonl_rows(path) for path in ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl"))
return {
"total_batches": total_batches,
"total_pairs": total_pairs,
}
def archive_progress_snapshot():
checkpoint = load_archive_checkpoint()
profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []})
durable_claims = [
claim for claim in profile.get("claims", []) if claim.get("status") == "durable"
]
snapshot = {
"batches_completed": checkpoint.get("batches_completed", 0),
"next_offset": checkpoint.get("next_offset", 0),
"phase": checkpoint.get("phase", "discovery"),
"candidate_batches": len(list(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json"))),
"durable_claims": len(durable_claims),
"training_examples": sum(
count_jsonl_rows(path) for path in ARCHIVE_TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl")
),
"dpo_pair_files": len(list(ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl"))),
"dpo_pairs": sum(
count_jsonl_rows(path) for path in ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl")
),
"latest_dpo_file": latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl").name
if latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl")
else None,
"latest_note": latest_path(ARCHIVE_NOTES_DIR, "batch_*.md").name
if latest_path(ARCHIVE_NOTES_DIR, "batch_*.md")
else None,
"latest_eval": latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json").name
if latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json")
else None,
}
write_json(ARCHIVE_PROGRESS_FILE, snapshot)
return snapshot
def archive_batch_id(batch_number):
return f"batch_{batch_number:03d}"
def archive_profile_summary(profile):
claims = profile.get("claims", [])
durable = [claim for claim in claims if claim.get("status") == "durable"][:12]
provisional = [claim for claim in claims if claim.get("status") == "provisional"][:8]
return {
"durable_claims": durable,
"provisional_claims": provisional,
}
def format_tweets_for_prompt(rows):
formatted = []
for index, row in enumerate(rows, start=1):
formatted.append(
f"{index}. tweet_id={row.get('tweet_id')} created_at={row.get('created_at')}\n"
f"text={row.get('full_text')}"
)
return "\n\n".join(formatted)
def normalize_candidate_entry(candidate, batch_id, index):
category = str(candidate.get("category") or "recurring-theme").strip()
claim = str(candidate.get("claim") or "").strip()
if not claim:
return None
quotes = []
for quote in candidate.get("evidence_quotes", [])[:5]:
quote = str(quote).strip()
if quote and quote not in quotes:
quotes.append(quote)
evidence_ids = []
for tweet_id in candidate.get("evidence_tweet_ids", []):
tweet_id = str(tweet_id).strip()
if tweet_id and tweet_id not in evidence_ids:
evidence_ids.append(tweet_id)
try:
confidence = float(candidate.get("confidence", 0.5))
except (TypeError, ValueError):
confidence = 0.5
confidence = max(0.0, min(confidence, 1.0))
status = str(candidate.get("status") or "provisional").strip().lower()
if status not in {"provisional", "durable", "retracted"}:
status = "provisional"
contradictions = []
for item in candidate.get("contradicts", [])[:5]:
item = str(item).strip()
if item and item not in contradictions:
contradictions.append(item)
return {
"id": f"{batch_id}-candidate-{index:02d}",
"category": category,
"claim": claim,
"evidence_tweet_ids": evidence_ids,
"evidence_quotes": quotes,
"confidence": round(confidence, 3),
"status": status,
"first_seen_at": batch_id,
"last_confirmed_at": batch_id,
"contradicts": contradictions,
}
def normalize_training_examples(examples, batch_id, tweet_ids, fallback_prompt, fallback_response):
normalized = []
for index, example in enumerate(examples, start=1):
prompt = str(example.get("prompt") or example.get("instruction") or "").strip()
response = str(example.get("response") or example.get("answer") or "").strip()
if not prompt or not response:
continue
normalized.append(
{
"example_id": f"{batch_id}-example-{index:02d}",
"batch_id": batch_id,
"task_type": str(example.get("task_type") or "analysis").strip() or "analysis",
"prompt": prompt,
"response": response,
"tweet_ids": tweet_ids,
}
)
if normalized:
return normalized
return [
{
"example_id": f"{batch_id}-example-01",
"batch_id": batch_id,
"task_type": "analysis",
"prompt": fallback_prompt,
"response": fallback_response,
"tweet_ids": tweet_ids,
}
]
def normalize_rubric_scores(scores):
rubric = {}
for key in ("grounding", "specificity", "source_distinction", "actionability"):
try:
rubric[key] = float(scores.get(key, 0))
except (TypeError, ValueError):
rubric[key] = 0.0
return rubric
def build_archive_draft_prompt(batch_id, checkpoint, profile, prior_note, batch_rows):
tweet_ids = [row.get("tweet_id") for row in batch_rows]
previous_summary = archive_profile_summary(profile)
return (
"You are Timmy, reading Alexander's private Twitter archive.\n"
"Work only from the supplied tweets. Do not invent facts. Separate explicit facts from inference.\n"
"Return ONLY valid JSON with this schema:\n"
'{'
'"notes_markdown":"...",'
'"knowledge_candidates":[{'
'"category":"trait|preference|project|relationship|value|recurring-theme",'
'"claim":"...",'
'"evidence_tweet_ids":["..."],'
'"evidence_quotes":["..."],'
'"confidence":0.0,'
'"status":"provisional",'
'"contradicts":["optional contradiction hint"]'
'}],'
'"training_examples":[{"prompt":"...","response":"...","task_type":"analysis"}],'
'"phase":"discovery|synthesis|refinement",'
'"confidence":"low|medium|high",'
'"next_focus":"..."'
'}\n\n'
f"Batch id: {batch_id}\n"
f"Checkpoint: {json.dumps(checkpoint, indent=2)}\n"
f"Previous profile summary: {json.dumps(previous_summary, indent=2)}\n"
f"Prior batch note excerpt: {prior_note[-2500:] if prior_note else 'none'}\n"
f"Tweet ids in this batch: {tweet_ids}\n\n"
"Tweets:\n"
f"{format_tweets_for_prompt(batch_rows)}\n"
)
def build_archive_critique_prompt(batch_id, draft_payload, batch_rows):
rubric = {
"grounding": "Every material claim must be supported by quoted evidence and tweet ids.",
"specificity": "Avoid bland summaries; identify concrete traits, projects, values, and relationships.",
"source_distinction": "Mark inference carefully and never upgrade speculation into fact.",
"actionability": "Training examples should teach Timmy how to read Alexander usefully.",
}
return (
"You are the critique pass for Timmy's private Twitter archive learning loop.\n"
"Rewrite the draft into a stronger, more grounded version.\n"
"Return ONLY valid JSON with this schema:\n"
'{'
'"notes_markdown":"...",'
'"knowledge_candidates":[{'
'"category":"trait|preference|project|relationship|value|recurring-theme",'
'"claim":"...",'
'"evidence_tweet_ids":["..."],'
'"evidence_quotes":["..."],'
'"confidence":0.0,'
'"status":"provisional",'
'"contradicts":["optional contradiction hint"]'
'}],'
'"training_examples":[{"prompt":"...","response":"...","task_type":"analysis"}],'
'"rubric_scores":{"grounding":0,"specificity":0,"source_distinction":0,"actionability":0},'
'"phase":"discovery|synthesis|refinement",'
'"confidence":"low|medium|high",'
'"next_focus":"..."'
'}\n\n'
f"Batch id: {batch_id}\n"
f"Rubric: {json.dumps(rubric, indent=2)}\n"
f"Draft payload: {json.dumps(draft_payload, indent=2)}\n"
"Tweets:\n"
f"{format_tweets_for_prompt(batch_rows)}\n"
)
def build_weekly_insight_prompt(profile, recent_batches):
return (
"You are Timmy preparing a private weekly insight brief about Alexander.\n"
"Use the profile plus recent batch deltas to produce grounded, actionable insights.\n"
"Return ONLY valid JSON with this schema:\n"
'{'
'"markdown_report":"...",'
'"opportunities":[{'
'"id":"...",'
'"theme":"...",'
'"insight":"...",'
'"why_it_matters":"...",'
'"evidence_tweet_ids":["..."],'
'"suggested_action":"...",'
'"confidence":0.0,'
'"time_horizon":"this week|this month|long-term"'
'}]'
'}\n\n'
f"Profile: {json.dumps(archive_profile_summary(profile), indent=2)}\n"
f"Recent batches: {json.dumps(recent_batches, indent=2)}\n"
)
def latest_eval_gate():
latest_eval = latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json")
if not latest_eval:
return None
return run_timmy_home_module(
"scripts.twitter_archive.evaluate_candidate",
args=["--eval-file", str(latest_eval)],
timeout=60,
)
def training_command_env():
return {
"TIMMY_ARCHIVE_DIR": str(ARCHIVE_DIR),
"TIMMY_HOME": str(TIMMY_HOME),
}
def _archive_extract_impl():
return run_timmy_home_module("scripts.twitter_archive.extract_archive")
@huey.task()
def archive_extract():
"""Deterministically extract tweets.js into the private JSONL workspace."""
return _archive_extract_impl()
def _archive_profile_consolidate_impl():
checkpoint = load_archive_checkpoint()
result = run_timmy_home_module("scripts.twitter_archive.consolidate_profile")
if result.get("status") == "ok":
checkpoint["last_profile_update"] = datetime.now(timezone.utc).isoformat()
write_json(ARCHIVE_CHECKPOINT, checkpoint)
return result
@huey.task()
def archive_profile_consolidate():
"""Merge batch candidate files into a deterministic archive profile."""
return _archive_profile_consolidate_impl()
def _archive_dpo_build_impl():
checkpoint = load_archive_checkpoint()
result = run_timmy_home_module("scripts.twitter_archive.build_dpo_pairs")
if result.get("status") == "ok":
checkpoint["last_dpo_build"] = datetime.now(timezone.utc).isoformat()
write_json(ARCHIVE_CHECKPOINT, checkpoint)
return result
@huey.task()
def archive_dpo_build():
"""Build local-only DPO pairs from completed archive batches."""
return _archive_dpo_build_impl()
def _archive_pipeline_health_impl():
result = run_timmy_home_module("scripts.twitter_archive.pipeline_health")
latest_session = latest_path(HERMES_HOME / "sessions", "session_*.json")
latest_dpo = latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl")
if latest_session:
result["latest_session"] = latest_session.name
if latest_dpo:
result["latest_dpo_file"] = latest_dpo.name
if latest_session and latest_dpo and latest_session.stat().st_mtime > latest_dpo.stat().st_mtime:
issues = result.setdefault("issues", [])
issues.append("latest Hermes session is newer than latest archive DPO file")
result["ok"] = False
result["progress"] = archive_progress_snapshot()
return result
@huey.task()
def archive_pipeline_health():
"""Check the private archive pipeline for stalled or missing stages."""
return _archive_pipeline_health_impl()
def _know_thy_father_impl():
ensure_archive_layout()
extraction = _archive_extract_impl()
if extraction.get("status") != "ok":
return {"status": "error", "reason": "archive extraction failed", "extract": extraction}
checkpoint = load_archive_checkpoint()
tweets = load_jsonl(ARCHIVE_TWEETS_FILE)
if not tweets:
return {"status": "error", "reason": "no extracted tweets found"}
offset = int(checkpoint.get("next_offset", 0) or 0)
if offset >= len(tweets):
return {
"status": "complete",
"batches_completed": checkpoint.get("batches_completed", 0),
"tweet_count": len(tweets),
"progress": archive_progress_snapshot(),
}
batch_rows = tweets[offset:offset + ARCHIVE_BATCH_SIZE]
batch_number = int(checkpoint.get("batches_completed", 0) or 0) + 1
batch_id = archive_batch_id(batch_number)
batch_tweet_ids = [str(row.get("tweet_id")) for row in batch_rows]
profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []})
previous_note = ""
previous_batch = checkpoint.get("last_batch_id")
if previous_batch:
previous_note_path = ARCHIVE_NOTES_DIR / f"{previous_batch}.md"
if previous_note_path.exists():
previous_note = previous_note_path.read_text()
draft_prompt = build_archive_draft_prompt(
batch_id=batch_id,
checkpoint=checkpoint,
profile=profile,
prior_note=previous_note,
batch_rows=batch_rows,
)
draft_run = run_archive_hermes(
prompt=draft_prompt,
caller_tag=f"know-thy-father-draft:{batch_id}",
)
if not draft_run:
return {"status": "error", "reason": "draft pass failed"}
write_text(ARCHIVE_TRAINING_RUNS_DIR / f"{batch_id}_draft.txt", draft_run["response"])
try:
draft_payload = extract_first_json_object(draft_run["response"])
except ValueError:
return {"status": "error", "reason": "draft pass did not return JSON", "batch_id": batch_id}
critique_prompt = build_archive_critique_prompt(batch_id=batch_id, draft_payload=draft_payload, batch_rows=batch_rows)
critique_run = run_archive_hermes(
prompt=critique_prompt,
caller_tag=f"know-thy-father-critique:{batch_id}",
)
if not critique_run:
return {"status": "error", "reason": "critique pass failed", "batch_id": batch_id}
write_text(ARCHIVE_TRAINING_RUNS_DIR / f"{batch_id}_critique.txt", critique_run["response"])
try:
critique_payload = extract_first_json_object(critique_run["response"])
except ValueError:
return {"status": "error", "reason": "critique pass did not return JSON", "batch_id": batch_id}
notes_markdown = str(critique_payload.get("notes_markdown") or "").strip()
if not notes_markdown:
return {"status": "error", "reason": "critique output missing notes", "batch_id": batch_id}
knowledge_candidates = []
for index, candidate in enumerate(critique_payload.get("knowledge_candidates", []), start=1):
normalized = normalize_candidate_entry(candidate, batch_id, index)
if normalized:
knowledge_candidates.append(normalized)
training_examples = normalize_training_examples(
critique_payload.get("training_examples", []),
batch_id=batch_id,
tweet_ids=batch_tweet_ids,
fallback_prompt="Read this batch of Alexander's tweets and write grounded notes with evidence.",
fallback_response=notes_markdown,
)
note_body = (
f"# {batch_id}\n\n"
f"- Batch number: {batch_number}\n"
f"- Tweet range: {offset} to {offset + len(batch_rows) - 1}\n"
f"- Tweet ids: {', '.join(batch_tweet_ids)}\n\n"
f"{notes_markdown}\n"
)
write_text(ARCHIVE_NOTES_DIR / f"{batch_id}.md", note_body)
write_jsonl(ARCHIVE_TRAINING_EXAMPLES_DIR / f"{batch_id}.jsonl", training_examples)
batch_payload = {
"batch_id": batch_id,
"batch_number": batch_number,
"tweet_ids": batch_tweet_ids,
"prompt": draft_prompt,
"rejected": str(draft_payload.get("notes_markdown") or draft_run["response"]).strip(),
"chosen": notes_markdown,
"draft_session_id": draft_run.get("session_id"),
"critique_session_id": critique_run.get("session_id"),
"rubric_scores": normalize_rubric_scores(critique_payload.get("rubric_scores", {})),
"knowledge_candidates": knowledge_candidates,
"training_examples": training_examples,
"phase": str(critique_payload.get("phase") or checkpoint.get("phase") or "discovery"),
"confidence": str(critique_payload.get("confidence") or checkpoint.get("confidence") or "low"),
"next_focus": str(critique_payload.get("next_focus") or checkpoint.get("next_focus") or ""),
"draft_response_file": f"{batch_id}_draft.txt",
"critique_response_file": f"{batch_id}_critique.txt",
}
write_json(ARCHIVE_CANDIDATES_DIR / f"{batch_id}.json", batch_payload)
checkpoint["next_offset"] = offset + len(batch_rows)
checkpoint["batches_completed"] = batch_number
checkpoint["phase"] = batch_payload["phase"]
checkpoint["confidence"] = batch_payload["confidence"]
checkpoint["next_focus"] = batch_payload["next_focus"]
checkpoint["understanding_version"] = batch_number
checkpoint["last_batch_id"] = batch_id
checkpoint["last_batch_sessions"] = {
"draft": draft_run.get("session_id"),
"critique": critique_run.get("session_id"),
}
write_json(ARCHIVE_CHECKPOINT, checkpoint)
profile_result = _archive_profile_consolidate_impl()
dpo_result = _archive_dpo_build_impl()
health_result = _archive_pipeline_health_impl()
return {
"status": "ok",
"batch_id": batch_id,
"batch_number": batch_number,
"tweets_processed": len(batch_rows),
"next_offset": checkpoint["next_offset"],
"knowledge_candidates": len(knowledge_candidates),
"training_examples": len(training_examples),
"profile": profile_result,
"dpo": dpo_result,
"health": health_result,
}
@huey.task()
@huey.lock_task("know-thy-father")
def know_thy_father():
"""Process one explicit 50-tweet archive batch into private learning artifacts."""
return _know_thy_father_impl()
def _archive_weekly_insights_impl():
ensure_archive_layout()
profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []})
if not profile.get("claims"):
return {"status": "error", "reason": "profile is empty; run know_thy_father first"}
recent_batches = []
for path in sorted(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json"))[-3:]:
batch = read_json(path, {})
recent_batches.append(
{
"batch_id": batch.get("batch_id", path.stem),
"tweet_ids": batch.get("tweet_ids", [])[:10],
"next_focus": batch.get("next_focus"),
"knowledge_candidates": batch.get("knowledge_candidates", [])[:5],
}
)
prompt = build_weekly_insight_prompt(profile=profile, recent_batches=recent_batches)
insight_run = run_archive_hermes(prompt=prompt, caller_tag="archive-weekly-insights")
if not insight_run:
return {"status": "error", "reason": "insight pass failed"}
try:
insight_payload = extract_first_json_object(insight_run["response"])
except ValueError:
return {"status": "error", "reason": "insight pass did not return JSON"}
date_key = datetime.now(timezone.utc).strftime("%Y%m%d")
weekly_file = ARCHIVE_INSIGHTS_DIR / f"weekly_{date_key}.md"
opportunities_file = ARCHIVE_INSIGHTS_DIR / "opportunities.json"
markdown_report = str(insight_payload.get("markdown_report") or "").strip()
opportunities = []
for item in insight_payload.get("opportunities", []):
opportunity = {
"id": str(item.get("id") or f"opportunity-{len(opportunities) + 1}").strip(),
"theme": str(item.get("theme") or "").strip(),
"insight": str(item.get("insight") or "").strip(),
"why_it_matters": str(item.get("why_it_matters") or "").strip(),
"evidence_tweet_ids": [str(tweet_id) for tweet_id in item.get("evidence_tweet_ids", []) if str(tweet_id).strip()],
"suggested_action": str(item.get("suggested_action") or "").strip(),
"confidence": round(float(item.get("confidence", 0.0) or 0.0), 3),
"time_horizon": str(item.get("time_horizon") or "this week").strip(),
}
if opportunity["theme"] and opportunity["insight"] and opportunity["suggested_action"]:
opportunities.append(opportunity)
write_text(weekly_file, markdown_report)
write_json(opportunities_file, {"generated_at": datetime.now(timezone.utc).isoformat(), "opportunities": opportunities})
checkpoint = load_archive_checkpoint()
checkpoint["last_insight_file"] = weekly_file.name
write_json(ARCHIVE_CHECKPOINT, checkpoint)
archive_progress_snapshot()
return {
"status": "ok",
"weekly_file": weekly_file.name,
"opportunities": len(opportunities),
"session_id": insight_run.get("session_id"),
}
@huey.task()
def archive_weekly_insights():
"""Generate the private weekly insight brief from the current profile."""
return _archive_weekly_insights_impl()
def _archive_train_adapter_impl():
ensure_archive_layout()
counts = archive_counts()
state = load_train_state()
eval_gate = latest_eval_gate()
if state.get("awaiting_eval"):
if not eval_gate or not eval_gate.get("pass"):
return {
"status": "blocked",
"reason": "latest candidate eval is missing or still red",
"last_candidate_id": state.get("last_candidate_id"),
"eval": eval_gate,
}
new_pairs = max(0, counts["total_pairs"] - int(state.get("last_total_pairs", 0) or 0))
new_batches = max(0, counts["total_batches"] - int(state.get("last_total_batches", 0) or 0))
if new_pairs < 200 and new_batches < 10:
return {
"status": "not-ready",
"new_pairs": new_pairs,
"new_batches": new_batches,
"threshold": {"pairs": 200, "batches": 10},
}
pipeline_config = load_pipeline_config()
train_command = str(pipeline_config.get("train_command") or "").strip()
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
candidate_id = f"timmy-archive-{timestamp}"
run_log = ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.log"
run_manifest = {
"status": "ready" if not train_command else "started",
"candidate_id": candidate_id,
"new_pairs": new_pairs,
"new_batches": new_batches,
"train_command": train_command or None,
"created_at": datetime.now(timezone.utc).isoformat(),
}
if not train_command:
write_json(ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.json", run_manifest)
return run_manifest
env = os.environ.copy()
env.update(training_command_env())
result = subprocess.run(
["/bin/zsh", "-lc", train_command],
cwd=str(TIMMY_HOME),
capture_output=True,
text=True,
timeout=3600,
env=env,
)
run_log.write_text((result.stdout or "") + ("\n" + result.stderr if result.stderr else ""))
run_manifest["exit_code"] = result.returncode
run_manifest["log_file"] = run_log.name
run_manifest["status"] = "ok" if result.returncode == 0 else "error"
write_json(ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.json", run_manifest)
if result.returncode == 0:
state.update(
{
"last_total_batches": counts["total_batches"],
"last_total_pairs": counts["total_pairs"],
"last_candidate_id": candidate_id,
"awaiting_eval": True,
"last_run_status": "ok",
"last_run_at": datetime.now(timezone.utc).isoformat(),
}
)
write_json(ARCHIVE_TRAIN_STATE_FILE, state)
else:
state.update(
{
"last_run_status": "error",
"last_run_at": datetime.now(timezone.utc).isoformat(),
}
)
write_json(ARCHIVE_TRAIN_STATE_FILE, state)
return run_manifest
@huey.task()
def archive_train_adapter():
"""Train an archive-reading adapter when DPO thresholds and eval gates allow."""
return _archive_train_adapter_impl()
def _archive_promote_candidate_impl():
eval_gate = latest_eval_gate()
if not eval_gate:
return {"status": "blocked", "reason": "missing eval file"}
if not eval_gate.get("pass"):
write_json(
ARCHIVE_PROMOTION_STATE_FILE,
{
"status": "blocked",
"reason": "promotion gate failed",
"evaluated_at": datetime.now(timezone.utc).isoformat(),
"eval": eval_gate,
},
)
return {"status": "blocked", "eval": eval_gate}
pipeline_config = load_pipeline_config()
promote_command = str(pipeline_config.get("promote_command") or "").strip()
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
decision = {
"status": "ready" if not promote_command else "started",
"candidate_id": eval_gate.get("candidate_id"),
"rollback_model": eval_gate.get("rollback_model"),
"evaluated_at": datetime.now(timezone.utc).isoformat(),
"eval": eval_gate,
}
if promote_command:
env = os.environ.copy()
env.update(training_command_env())
env["TIMMY_ARCHIVE_CANDIDATE_ID"] = str(eval_gate.get("candidate_id") or "")
result = subprocess.run(
["/bin/zsh", "-lc", promote_command],
cwd=str(TIMMY_HOME),
capture_output=True,
text=True,
timeout=1200,
env=env,
)
log_path = ARCHIVE_TRAINING_RUNS_DIR / f"promote_{timestamp}.log"
log_path.write_text((result.stdout or "") + ("\n" + result.stderr if result.stderr else ""))
decision["status"] = "ok" if result.returncode == 0 else "error"
decision["exit_code"] = result.returncode
decision["log_file"] = log_path.name
if result.returncode != 0:
write_json(ARCHIVE_PROMOTION_STATE_FILE, decision)
return decision
write_json(
ARCHIVE_ACTIVE_MODEL_FILE,
{
"candidate_id": eval_gate.get("candidate_id"),
"rollback_model": eval_gate.get("rollback_model"),
"promoted_at": datetime.now(timezone.utc).isoformat(),
},
)
write_json(ARCHIVE_PROMOTION_STATE_FILE, decision)
state = load_train_state()
state["awaiting_eval"] = False
state["last_run_status"] = "promoted"
write_json(ARCHIVE_TRAIN_STATE_FILE, state)
return decision
@huey.task()
def archive_promote_candidate():
"""Promote an archive candidate model only when offline eval gates pass."""
return _archive_promote_candidate_impl()
@huey.periodic_task(crontab(hour="*/4", minute="15"))
def archive_pipeline_tick():
"""Advance the private archive learning loop on a regular cadence."""
batch = _know_thy_father_impl()
train = _archive_train_adapter_impl()
promote = _archive_promote_candidate_impl()
insight = {"status": "skipped"}
if datetime.now(timezone.utc).weekday() == 0:
expected = f"weekly_{datetime.now(timezone.utc).strftime('%Y%m%d')}.md"
if not (ARCHIVE_INSIGHTS_DIR / expected).exists():
insight = _archive_weekly_insights_impl()
return {
"batch": batch,
"train": train,
"promote": promote,
"insight": insight,
"health": _archive_pipeline_health_impl(),
}
# ── Existing: Orchestration ──────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/15"))
def triage_issues():
"""Passively scan unassigned issues without posting comment spam."""
g = GiteaClient()
backlog = []
for repo in REPOS:
for issue in g.find_unassigned_issues(repo, limit=10):
backlog.append({
"repo": repo,
"issue": issue.number,
"title": issue.title,
})
return {"unassigned": len(backlog), "sample": backlog[:20]}
@huey.periodic_task(crontab(minute="*/30"))
def review_prs():
"""Review open PRs: check net diff, reject violations."""
g = GiteaClient()
reviewed, rejected = 0, 0
for repo in REPOS:
for pr in g.list_pulls(repo, state="open", limit=20):
reviewed += 1
files = g.get_pull_files(repo, pr.number)
net = sum(f.additions - f.deletions for f in files)
if net > NET_LINE_LIMIT:
rejected += 1
g.create_comment(
repo, pr.number,
f"❌ Net +{net} lines exceeds the {NET_LINE_LIMIT}-line limit. "
f"Find {net - NET_LINE_LIMIT} lines to cut. See CONTRIBUTING.md."
)
return {"reviewed": reviewed, "rejected": rejected}
@huey.periodic_task(crontab(minute="*/10"))
def dispatch_assigned():
"""Pick up issues assigned to agents and kick off work."""
g = GiteaClient()
agents = ["claude", "gemini", "kimi", "grok", "perplexity"]
dispatched = 0
for repo in REPOS:
for agent in agents:
for issue in g.find_agent_issues(repo, agent, limit=5):
comments = g.list_comments(repo, issue.number)
if any(c.body and "dispatched" in c.body.lower() for c in comments):
continue
dispatch_work(repo, issue.number, agent)
dispatched += 1
return {"dispatched": dispatched}
@huey.task(retries=3, retry_delay=60)
def dispatch_work(repo, issue_number, agent):
"""Dispatch a single issue to an agent. Huey handles retry."""
g = GiteaClient()
g.create_comment(
repo, issue_number,
f"⚡ Dispatched to `{agent}`. Huey task queued."
)
# ── NEW 1: Config Sync ───────────────────────────────────────────────
@huey.periodic_task(crontab(minute="0")) # every hour on the hour
def sync_config_up():
"""Push live ~/.hermes config changes UP to timmy-config repo."""
script = TIMMY_HOME / "timmy-config" / "bin" / "sync-up.sh"
if not script.exists():
return {"error": "sync-up.sh not found"}
result = subprocess.run(
["bash", str(script)],
capture_output=True, text=True, timeout=60
)
return {
"exit_code": result.returncode,
"output": result.stdout[-500:] if result.stdout else "",
"error": result.stderr[-200:] if result.stderr else "",
}
# ── NEW 2: Session Export for DPO ────────────────────────────────────
@huey.periodic_task(crontab(hour="*/4", minute="30")) # every 4 hours
def session_export():
"""Scan recent sessions, extract conversation pairs for DPO training."""
sessions_dir = HERMES_HOME / "sessions"
export_dir = TIMMY_HOME / "training-data" / "dpo-pairs"
export_dir.mkdir(parents=True, exist_ok=True)
marker_file = export_dir / ".last_export"
last_export = ""
if marker_file.exists():
last_export = marker_file.read_text().strip()
exported = 0
session_files = sorted(sessions_dir.glob("session_*.json"))
for sf in session_files:
if sf.name <= last_export:
continue
try:
data = json.loads(sf.read_text())
messages = data.get("messages", [])
# Extract user->assistant pairs (raw material for DPO curation)
pairs = []
for i, msg in enumerate(messages):
if msg.get("role") == "user" and i + 1 < len(messages):
next_msg = messages[i + 1]
if next_msg.get("role") == "assistant":
pairs.append({
"prompt": msg.get("content", "")[:2000],
"chosen": next_msg.get("content", "")[:2000],
"session": sf.name,
})
if pairs:
out_file = export_dir / sf.name
out_file.write_text(json.dumps(pairs, indent=2))
exported += 1
except (json.JSONDecodeError, KeyError):
continue
# Update marker
if session_files:
marker_file.write_text(session_files[-1].name)
return {"exported": exported, "total_sessions": len(session_files)}
# ── NEW 3: Model Health Check ────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/5")) # every 5 minutes
def model_health():
"""Check the active local inference surface and export freshness."""
checks = {}
models_url = f"{LOCAL_PROVIDER_BASE_URL}/models"
chat_url = f"{LOCAL_PROVIDER_BASE_URL}/chat/completions"
checks["provider"] = "local-llama.cpp"
checks["provider_base_url"] = LOCAL_PROVIDER_BASE_URL
checks["provider_model"] = LOCAL_PROVIDER_MODEL
# 1. Is the local inference process running?
try:
result = subprocess.run(
["pgrep", "-f", "llama-server|ollama"],
capture_output=True, timeout=5
)
checks["local_inference_running"] = result.returncode == 0
except Exception:
checks["local_inference_running"] = False
# 2. Can we hit the configured API?
try:
import urllib.request
req = urllib.request.Request(models_url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
models = [m.get("id", "?") for m in data.get("data", [])]
checks["models_loaded"] = models
checks["api_responding"] = True
except Exception as e:
checks["api_responding"] = False
checks["error"] = str(e)
# 3. Can we do a tiny inference?
if checks.get("api_responding"):
try:
payload = json.dumps({
"model": LOCAL_PROVIDER_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"max_tokens": 5,
"stream": False,
}).encode()
req = urllib.request.Request(
chat_url,
data=payload,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=30) as resp:
checks["inference_ok"] = resp.status == 200
except Exception as e:
checks["inference_ok"] = False
checks["inference_error"] = str(e)
# 4. Is session export keeping up with new Hermes sessions?
sessions_dir = HERMES_HOME / "sessions"
export_dir = TIMMY_HOME / "training-data" / "dpo-pairs"
latest_session = newest_file(sessions_dir, "session_*.json")
latest_export = newest_file(export_dir, "session_*.json")
checks["latest_session"] = latest_session.name if latest_session else None
checks["latest_export"] = latest_export.name if latest_export else None
if latest_session and latest_export:
session_mtime = latest_session.stat().st_mtime
export_mtime = latest_export.stat().st_mtime
lag_minutes = max(0, int((session_mtime - export_mtime) // 60))
checks["export_lag_minutes"] = lag_minutes
checks["export_fresh"] = lag_minutes <= 300
elif latest_session and not latest_export:
checks["export_lag_minutes"] = None
checks["export_fresh"] = False
else:
checks["export_lag_minutes"] = 0
checks["export_fresh"] = True
# Write health status to a file for other tools to read
health_file = HERMES_HOME / "model_health.json"
checks["timestamp"] = datetime.now(timezone.utc).isoformat()
health_file.write_text(json.dumps(checks, indent=2))
return checks
# ── NEW 4: Heartbeat Tick ────────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/10")) # every 10 minutes
def heartbeat_tick():
"""Perceive — Reflect — Remember — Decide — Act — Learn.
This is the nervous system. Each tick:
1. Perceive: gather state (Gitea activity, model health, open issues)
2. Reflect: what changed since last tick?
3. Remember: log perception to episodic memory
4. Decide: anything need action?
5. Act: create comments, close issues, alert
6. Learn: log outcome for training data
"""
tick_dir = TIMMY_HOME / "heartbeat"
tick_dir.mkdir(parents=True, exist_ok=True)
now = datetime.now(timezone.utc)
tick_id = now.strftime("%Y%m%d_%H%M%S")
perception = {}
# PERCEIVE: gather state
try:
g = GiteaClient()
perception["gitea_alive"] = g.ping()
except Exception:
perception["gitea_alive"] = False
# Model health (read from health file)
health_file = HERMES_HOME / "model_health.json"
if health_file.exists():
try:
perception["model_health"] = json.loads(health_file.read_text())
except Exception:
perception["model_health"] = "unreadable"
# Open issue/PR counts
if perception.get("gitea_alive"):
try:
g = GiteaClient()
for repo in REPOS:
issues = g.list_issues(repo, state="open", limit=1)
pulls = g.list_pulls(repo, state="open", limit=1)
perception[repo] = {
"open_issues": len(issues),
"open_prs": len(pulls),
}
except Exception as e:
perception["gitea_error"] = str(e)
# Huey consumer alive (we're running, so yes)
perception["huey_alive"] = True
# REFLECT + REMEMBER: compare to last tick, log
last_tick_file = tick_dir / "last_tick.json"
last_tick = {}
if last_tick_file.exists():
try:
last_tick = json.loads(last_tick_file.read_text())
except Exception:
pass
tick_record = {
"tick_id": tick_id,
"timestamp": now.isoformat(),
"perception": perception,
"previous_tick": last_tick.get("tick_id", "none"),
}
# DECIDE: let hermes4:14b reason about what to do
decide_prompt = (
f"System state at {now.isoformat()}:\n\n"
f"{json.dumps(perception, indent=2)}\n\n"
f"Previous tick: {last_tick.get('tick_id', 'none')}\n\n"
"You are the heartbeat monitor. Based on this state:\n"
"1. List any actions needed (alerts, restarts, escalations). Empty if all OK.\n"
"2. Rate severity: ok, warning, or critical.\n"
"3. One sentence of reasoning.\n\n"
'Respond ONLY with JSON: {"actions": [], "severity": "ok", "reasoning": "..."}'
)
decision = None
try:
raw = hermes_local(decide_prompt, caller_tag="heartbeat_tick")
if raw:
# Model might wrap JSON in markdown, extract first { line
for line in raw.split("\n"):
line = line.strip()
if line.startswith("{"):
decision = json.loads(line)
break
if not decision:
decision = json.loads(raw)
except (json.JSONDecodeError, Exception):
decision = None
# Fallback to hardcoded logic if model fails or is down
if decision is None:
actions = []
if not perception.get("gitea_alive"):
actions.append("ALERT: Gitea unreachable")
health = perception.get("model_health", {})
if isinstance(health, dict) and not health.get("local_inference_running"):
actions.append("ALERT: local inference surface not running")
decision = {
"actions": actions,
"severity": "fallback",
"reasoning": "model unavailable, used hardcoded checks",
}
tick_record["decision"] = decision
actions = decision.get("actions", [])
# Save tick
last_tick_file.write_text(json.dumps(tick_record, indent=2))
# LEARN: append to episodic log
log_file = tick_dir / f"ticks_{now.strftime('%Y%m%d')}.jsonl"
with open(log_file, "a") as f:
f.write(json.dumps(tick_record) + "\n")
return tick_record
# ── NEW 5: Memory Compress (Morning Briefing) ───────────────────────
@huey.periodic_task(crontab(hour="8", minute="0")) # 8 AM daily
def memory_compress():
"""Morning briefing — compress recent heartbeat ticks into summary.
Reads yesterday's tick log, compresses into a briefing file
that can be injected into system prompt at startup.
"""
tick_dir = TIMMY_HOME / "heartbeat"
briefing_dir = TIMMY_HOME / "briefings"
briefing_dir.mkdir(parents=True, exist_ok=True)
# Find yesterday's tick log
from datetime import timedelta
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y%m%d")
tick_log = tick_dir / f"ticks_{yesterday}.jsonl"
if not tick_log.exists():
return {"status": "no ticks from yesterday"}
# Read all ticks
ticks = []
for line in tick_log.read_text().strip().split("\n"):
try:
ticks.append(json.loads(line))
except Exception:
continue
if not ticks:
return {"status": "empty tick log"}
# Compress: extract key facts
alerts = []
gitea_down_count = 0
inference_down_count = 0
for t in ticks:
for action in t.get("actions", []):
alerts.append(f"[{t['tick_id']}] {action}")
p = t.get("perception", {})
if not p.get("gitea_alive"):
gitea_down_count += 1
health = p.get("model_health", {})
if isinstance(health, dict) and not health.get("local_inference_running"):
inference_down_count += 1
# Last tick's perception = current state
last = ticks[-1].get("perception", {})
briefing = {
"date": yesterday,
"total_ticks": len(ticks),
"alerts": alerts[-10:], # last 10 alerts
"gitea_downtime_ticks": gitea_down_count,
"local_inference_downtime_ticks": inference_down_count,
"last_known_state": last,
}
briefing_file = briefing_dir / f"briefing_{yesterday}.json"
briefing_file.write_text(json.dumps(briefing, indent=2))
return briefing
# ── NEW 6: Good Morning Report ───────────────────────────────────────
@huey.periodic_task(crontab(hour="6", minute="0")) # 6 AM daily
def good_morning_report():
"""Generate Alexander's official morning report.
Delivery contract:
- save markdown + beautiful HTML locally
- open the HTML report in the browser on the Mac
- send the full markdown artifact to Telegram plus a readable summary message
- keep claims evidence-rich and honest
"""
now = datetime.now().astimezone()
today = now.strftime("%Y-%m-%d")
day_name = now.strftime("%A")
today_tick_slug = now.strftime("%Y%m%d")
g = GiteaClient()
tick_log = TIMMY_HOME / "heartbeat" / f"ticks_{today_tick_slug}.jsonl"
ticks = read_jsonl_rows(tick_log)
tick_count = len(ticks)
gitea_downtime_ticks = sum(1 for tick in ticks if not (tick.get("perception", {}) or {}).get("gitea_alive", True))
inference_fail_ticks = sum(
1
for tick in ticks
if not ((tick.get("perception", {}) or {}).get("model_health", {}) or {}).get("inference_ok", False)
)
first_green_tick = next(
(
tick.get("tick_id")
for tick in ticks
if ((tick.get("perception", {}) or {}).get("model_health", {}) or {}).get("inference_ok", False)
),
"none",
)
health_file = HERMES_HOME / "model_health.json"
model_health = read_json(health_file, {})
provider = model_health.get("provider", "unknown")
provider_model = model_health.get("provider_model", "unknown")
provider_base_url = model_health.get("provider_base_url", "unknown")
model_status = "healthy" if model_health.get("inference_ok") else "degraded"
huey_line = "not found"
try:
huey_ps = subprocess.run(
["bash", "-lc", "ps aux | egrep 'huey_consumer|tasks.huey' | grep -v egrep || true"],
capture_output=True,
text=True,
timeout=10,
)
huey_line = huey_ps.stdout.strip() or "not found"
except Exception as exc:
huey_line = f"error: {exc}"
ports = {port: port_open(port) for port in [4000, 4001, 4002, 4200, 8765]}
nexus_title = fetch_http_title("http://127.0.0.1:4200")
evennia_title = fetch_http_title("http://127.0.0.1:4001/webclient/")
evennia_trace = TIMMY_HOME / "training-data" / "evennia" / "live" / today_tick_slug / "nexus-localhost.jsonl"
evennia_events = read_jsonl_rows(evennia_trace)
last_evennia = evennia_events[-1] if evennia_events else {}
recent_issue_lines = []
for repo in ["Timmy_Foundation/timmy-config", "Timmy_Foundation/the-nexus", "Timmy_Foundation/timmy-home"]:
try:
issues = g.list_issues(repo, state="open", sort="created", direction="desc", limit=5)
for issue in issues[:3]:
recent_issue_lines.append(
f"{repo}#{issue.number}{issue.title} ({g.base_url}/{repo}/issues/{issue.number})"
)
except Exception:
continue
recent_pr_lines = []
for repo in ["Timmy_Foundation/timmy-config", "Timmy_Foundation/the-nexus", "Timmy_Foundation/timmy-home"]:
try:
prs = g.list_pulls(repo, state="open", sort="newest", limit=5)
for pr in prs[:2]:
recent_pr_lines.append(
f"{repo}#{pr.number}{pr.title} ({g.base_url}/{repo}/pulls/{pr.number})"
)
except Exception:
continue
research_candidates = []
for label, path in [
("research", TIMMY_HOME / "research"),
("reports", TIMMY_HOME / "reports"),
("specs", TIMMY_HOME / "specs"),
]:
for item in latest_files(path, limit=3):
research_candidates.append(f"{label}: {item['path']} (mtime {item['mtime']})")
what_matters = [
"The official report lane is tracked in timmy-config #87 and now runs through the integrated timmy-config automation path.",
"The local world stack is alive: Nexus, Evennia, and the local bridge are all up, with replayable Evennia action telemetry already on disk.",
"Bannerlord remains an engineering substrate test. If it fails the thin-adapter test, reject it early instead of building falsework around it.",
]
executive_summary = (
"The field is sharper this morning. The report lane is now integrated into timmy-config, the local world stack is visibly alive, "
"and Bannerlord is being held to the thin-adapter standard instead of backlog gravity."
)
note_prompt = (
"Write a short morning note from Timmy to Alexander. Keep it grounded, warm, and brief. "
"Use the following real facts only: "
f"heartbeat ticks={tick_count}; gitea downtime ticks={gitea_downtime_ticks}; inference fail ticks before recovery={inference_fail_ticks}; "
f"current model={provider_model}; Nexus title={nexus_title}; Evennia title={evennia_title}; latest Evennia room/title={last_evennia.get('room_name', last_evennia.get('title', 'unknown'))}."
)
note_result = run_hermes_local(
prompt=note_prompt,
caller_tag="good_morning_report",
disable_all_tools=True,
skip_context_files=True,
skip_memory=True,
max_iterations=3,
)
personal_note = note_result.get("response") if note_result else None
if not personal_note:
personal_note = (
"Good morning, Alexander. The stack held together through the night, and the local world lane is no longer theoretical. "
"We have more proof than posture now."
)
markdown = f"""# Timmy Time — Good Morning Report
Date: {today}
Audience: Alexander Whitestone
Status: Generated by timmy-config automation
{today} · {day_name} · generated {now.strftime('%I:%M %p %Z')}
---
## Executive Summary
{executive_summary}
## Overnight / Local Pulse
- Heartbeat log for `{today_tick_slug}`: `{tick_count}` ticks recorded in `{tick_log}`
- Gitea downtime ticks: `{gitea_downtime_ticks}`
- Inference-failure ticks before recovery: `{inference_fail_ticks}`
- First green local-inference tick: `{first_green_tick}`
- Current model health file: `{health_file}`
- Current provider: `{provider}`
- Current model: `{provider_model}`
- Current base URL: `{provider_base_url}`
- Current inference status: `{model_status}`
- Huey consumer: `{huey_line}`
### Local surfaces right now
- Nexus port 4200: `{'open' if ports[4200] else 'closed'}` → title: `{nexus_title}`
- Evennia telnet 4000: `{'open' if ports[4000] else 'closed'}`
- Evennia web 4001: `{'open' if ports[4001] else 'closed'}` → title: `{evennia_title}`
- Evennia websocket 4002: `{'open' if ports[4002] else 'closed'}`
- Local bridge 8765: `{'open' if ports[8765] else 'closed'}`
### Evennia proof of life
- Trace path: `{evennia_trace}`
- Event count: `{len(evennia_events)}`
- Latest event type: `{last_evennia.get('type', 'unknown')}`
- Latest room/title: `{last_evennia.get('room_name', last_evennia.get('title', 'unknown'))}`
## Gitea Pulse
### Open issues
{chr(10).join(f'- {line}' for line in recent_issue_lines) if recent_issue_lines else '- quiet'}
### Open PRs
{chr(10).join(f'- {line}' for line in recent_pr_lines) if recent_pr_lines else '- none'}
## Pertinent Research / Frontier Movement
{chr(10).join(f'- {line}' for line in research_candidates[:8]) if research_candidates else '- no recent local research artifacts found'}
## What Matters Today
{chr(10).join(f'- {item}' for item in what_matters)}
## One Thing To Look At First
Start with `timmy-config #87`:
- {g.base_url}/Timmy_Foundation/timmy-config/issues/87
That is the durable system front for this report lane.
## Evidence Appendix
- `{health_file}`
- `{tick_log}`
- `{evennia_trace}`
- `http://127.0.0.1:4200`
- `http://127.0.0.1:4001/webclient/`
- `{newest_file(HERMES_HOME / 'cron' / 'output' / 'a77a87392582', '*.md') or 'no recent health monitor artifact found'}`
## From Timmy
{personal_note}
— Timmy
"""
html_report = render_evening_html(
title="Timmy Time — Good Morning Report",
subtitle=f"{today} · {day_name} · generated {now.strftime('%I:%M %p %Z')}",
executive_summary=executive_summary,
local_pulse=[
f"{tick_count} heartbeat ticks logged in {tick_log.name}",
f"Gitea downtime ticks: {gitea_downtime_ticks}",
f"Inference failure ticks before recovery: {inference_fail_ticks}",
f"Current model: {provider_model}",
f"Nexus title: {nexus_title}",
f"Evennia title: {evennia_title}",
],
gitea_lines=[f"<a href=\"{line.split('(')[-1].rstrip(')')}\">{html.escape(line.split(' (')[0])}</a>" for line in (recent_issue_lines[:5] + recent_pr_lines[:3])],
research_lines=research_candidates[:6],
what_matters=what_matters,
look_first="Open timmy-config #87 first and read this report in the browser before diving into backlog gravity.",
)
BRIEFING_DIR.mkdir(parents=True, exist_ok=True)
markdown_path = BRIEFING_DIR / f"{today}.md"
html_path = BRIEFING_DIR / f"{today}.html"
latest_md = BRIEFING_DIR / "latest.md"
latest_html = BRIEFING_DIR / "latest.html"
verification_path = BRIEFING_DIR / f"{today}-verification.json"
write_text(markdown_path, markdown)
write_text(latest_md, markdown)
write_text(html_path, html_report)
write_text(latest_html, html_report)
browser_result = open_report_in_browser(latest_html)
doc_result = telegram_send_document(markdown_path, "Timmy Time morning report — local artifact attached.")
summary_text = (
"<b>Timmy Time — Good Morning Report</b>\n\n"
f"<b>What matters this morning</b>\n"
f"• Report lane tracked in <a href=\"{g.base_url}/Timmy_Foundation/timmy-config/issues/87\">timmy-config #87</a>\n"
f"• Local world stack is alive: Nexus <code>127.0.0.1:4200</code>, Evennia <code>127.0.0.1:4001/webclient/</code>, bridge <code>127.0.0.1:8765</code>\n"
f"• Bannerlord stays an engineering substrate test, not a builder trap\n\n"
f"<b>Evidence</b>\n"
f"• model health: <code>{health_file}</code>\n"
f"• heartbeat: <code>{tick_log}</code>\n"
f"• evennia trace: <code>{evennia_trace}</code>"
)
summary_result = telegram_send_message(summary_text)
verification = {
"markdown_path": str(markdown_path),
"html_path": str(html_path),
"latest_markdown": str(latest_md),
"latest_html": str(latest_html),
"browser_open": browser_result,
"telegram_document": doc_result,
"telegram_summary": summary_result,
"ports": ports,
"titles": {"nexus": nexus_title, "evennia": evennia_title},
}
write_json(verification_path, verification)
return verification
# ── NEW 7: Repo Watchdog ─────────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/20")) # every 20 minutes
def repo_watchdog():
"""Poll Gitea for new issues/PRs since last check. No webhooks needed."""
state_file = HERMES_HOME / "watchdog_state.json"
state = {}
if state_file.exists():
try:
state = json.loads(state_file.read_text())
except Exception:
pass
g = GiteaClient()
new_items = []
for repo in REPOS:
repo_state = state.get(repo, {"last_issue": 0, "last_pr": 0})
# Check issues
try:
issues = g.list_issues(repo, state="open", sort="created", direction="desc", limit=5)
for issue in issues:
if issue.number > repo_state["last_issue"]:
new_items.append({
"type": "issue",
"repo": repo,
"number": issue.number,
"title": issue.title,
"creator": issue.user.login if hasattr(issue, 'user') and issue.user else "unknown",
})
if issues:
repo_state["last_issue"] = max(i.number for i in issues)
except Exception:
pass
# Check PRs
try:
prs = g.list_pulls(repo, state="open", sort="newest", limit=5)
for pr in prs:
if pr.number > repo_state.get("last_pr", 0):
new_items.append({
"type": "pr",
"repo": repo,
"number": pr.number,
"title": pr.title,
})
if prs:
repo_state["last_pr"] = max(p.number for p in prs)
except Exception:
pass
state[repo] = repo_state
state_file.write_text(json.dumps(state, indent=2))
return {"new_items": len(new_items), "items": new_items[:10]}
# ── AGENT WORKERS: Gemini + Grok ─────────────────────────────────────
WORKTREE_BASE = Path.home() / "worktrees"
AGENT_LOG_DIR = HERMES_HOME / "logs"
AGENT_CONFIG = {
"gemini": {
"tool": "aider",
"model": "gemini/gemini-2.5-pro-preview-05-06",
"api_key_env": "GEMINI_API_KEY",
"gitea_token_file": HERMES_HOME / "gemini_token",
"timeout": 600,
},
"grok": {
"tool": "opencode",
"model": "xai/grok-3-fast",
"api_key_env": "XAI_API_KEY",
"gitea_token_file": HERMES_HOME / "grok_gitea_token",
"timeout": 600,
},
}
def _get_agent_issue(agent_name):
"""Find the next issue assigned to this agent that hasn't been worked.
Only picks issues where this agent is the SOLE assignee (not shared)."""
token_file = AGENT_CONFIG[agent_name]["gitea_token_file"]
if not token_file.exists():
return None, None
g = GiteaClient(token=token_file.read_text().strip())
for repo in REPOS:
try:
issues = g.find_agent_issues(repo, agent_name, limit=10)
for issue in issues:
# Skip if assigned to multiple agents (avoid collisions)
assignees = [a.login for a in (issue.assignees or [])] if hasattr(issue, 'assignees') else []
other_agents = [a for a in assignees if a in AGENT_CONFIG and a != agent_name]
if other_agents:
continue
# Skip if already being worked on by this agent
comments = g.list_comments(repo, issue.number)
if any(c.body and "working on" in c.body.lower() and agent_name in c.body.lower() for c in comments):
continue
return repo, issue
except Exception:
continue
return None, None
def _run_agent(agent_name, repo, issue):
"""Clone, branch, run agent tool, push, open PR."""
cfg = AGENT_CONFIG[agent_name]
token = cfg["gitea_token_file"].read_text().strip()
repo_owner, repo_name = repo.split("/")
branch = f"{agent_name}/issue-{issue.number}"
workdir = WORKTREE_BASE / f"{agent_name}-{issue.number}"
log_file = AGENT_LOG_DIR / f"{agent_name}-worker.log"
def log(msg):
with open(log_file, "a") as f:
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}\n")
log(f"=== Starting #{issue.number}: {issue.title} ===")
# Comment that we're working on it
g = GiteaClient(token=token)
g.create_comment(repo, issue.number,
f"🔧 `{agent_name}` working on this via Huey. Branch: `{branch}`")
# Clone
clone_url = f"http://{agent_name}:{token}@143.198.27.163:3000/{repo}.git"
if workdir.exists():
subprocess.run(["rm", "-rf", str(workdir)], timeout=30)
result = subprocess.run(
["git", "clone", "--depth", "50", clone_url, str(workdir)],
capture_output=True, text=True, timeout=120
)
if result.returncode != 0:
log(f"Clone failed: {result.stderr}")
return {"status": "clone_failed", "error": result.stderr[:200]}
# Create branch
subprocess.run(
["git", "checkout", "-b", branch],
cwd=str(workdir), capture_output=True, timeout=10
)
# Build prompt
prompt = (
f"Fix issue #{issue.number}: {issue.title}\n\n"
f"{issue.body or 'No description.'}\n\n"
f"Make minimal, focused changes. Only modify files directly related to this issue."
)
# Run agent tool
env = os.environ.copy()
if cfg["api_key_env"] == "XAI_API_KEY":
env["XAI_API_KEY"] = Path(Path.home() / ".config/grok/api_key").read_text().strip()
if cfg["tool"] == "aider":
cmd = [
"aider",
"--model", cfg["model"],
"--no-auto-commits",
"--yes-always",
"--no-suggest-shell-commands",
"--message", prompt,
]
else: # opencode
cmd = [
"opencode", "run",
"-m", cfg["model"],
"--no-interactive",
prompt,
]
log(f"Running: {cfg['tool']} with {cfg['model']}")
try:
result = subprocess.run(
cmd, cwd=str(workdir), capture_output=True, text=True,
timeout=cfg["timeout"], env=env
)
log(f"Exit code: {result.returncode}")
log(f"Stdout (last 500): {result.stdout[-500:]}")
if result.stderr:
log(f"Stderr (last 300): {result.stderr[-300:]}")
except subprocess.TimeoutExpired:
log("TIMEOUT")
return {"status": "timeout"}
# Check if anything changed
diff_result = subprocess.run(
["git", "diff", "--stat"], cwd=str(workdir),
capture_output=True, text=True, timeout=10
)
if not diff_result.stdout.strip():
log("No changes produced")
g.create_comment(repo, issue.number,
f"⚠️ `{agent_name}` produced no changes for this issue. Skipping.")
subprocess.run(["rm", "-rf", str(workdir)], timeout=30)
return {"status": "no_changes"}
# Commit, push, open PR
subprocess.run(["git", "add", "-A"], cwd=str(workdir), timeout=10)
subprocess.run(
["git", "commit", "-m", f"[{agent_name}] {issue.title} (#{issue.number})"],
cwd=str(workdir), capture_output=True, timeout=30
)
push_result = subprocess.run(
["git", "push", "-u", "origin", branch],
cwd=str(workdir), capture_output=True, text=True, timeout=60
)
if push_result.returncode != 0:
log(f"Push failed: {push_result.stderr}")
return {"status": "push_failed", "error": push_result.stderr[:200]}
# Open PR
try:
pr = g.create_pull(
repo,
title=f"[{agent_name}] {issue.title} (#{issue.number})",
head=branch,
base="main",
body=f"Closes #{issue.number}\n\nGenerated by `{agent_name}` via Huey worker.",
)
log(f"PR #{pr.number} created")
return {"status": "pr_created", "pr": pr.number}
except Exception as e:
log(f"PR creation failed: {e}")
return {"status": "pr_failed", "error": str(e)[:200]}
finally:
subprocess.run(["rm", "-rf", str(workdir)], timeout=30)
@huey.periodic_task(crontab(minute="*/20"))
def gemini_worker():
"""Gemini picks up an assigned issue, codes it with aider, opens a PR."""
repo, issue = _get_agent_issue("gemini")
if not issue:
return {"status": "idle", "reason": "no issues assigned to gemini"}
return _run_agent("gemini", repo, issue)
@huey.periodic_task(crontab(minute="*/20"))
def grok_worker():
"""Grok picks up an assigned issue, codes it with opencode, opens a PR."""
repo, issue = _get_agent_issue("grok")
if not issue:
return {"status": "idle", "reason": "no issues assigned to grok"}
return _run_agent("grok", repo, issue)
# ── PR Cross-Review ──────────────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/30"))
def cross_review_prs():
"""Gemini reviews Grok's PRs. Grok reviews Gemini's PRs."""
results = []
for reviewer, author in [("gemini", "grok"), ("grok", "gemini")]:
cfg = AGENT_CONFIG[reviewer]
token_file = cfg["gitea_token_file"]
if not token_file.exists():
continue
g = GiteaClient(token=token_file.read_text().strip())
for repo in REPOS:
try:
prs = g.list_pulls(repo, state="open", limit=10)
for pr in prs:
# Only review the other agent's PRs
if not pr.title.startswith(f"[{author}]"):
continue
# Skip if already reviewed
comments = g.list_comments(repo, pr.number)
if any(c.body and f"reviewed by {reviewer}" in c.body.lower() for c in comments):
continue
# Get the diff
files = g.get_pull_files(repo, pr.number)
net = sum(f.additions - f.deletions for f in files)
file_list = ", ".join(f.filename for f in files[:5])
# Build review prompt
review_prompt = (
f"Review PR #{pr.number}: {pr.title}\n"
f"Files: {file_list}\n"
f"Net change: +{net} lines\n\n"
f"Is this PR focused, correct, and ready to merge? "
f"Reply with APPROVE or REQUEST_CHANGES and a brief reason."
)
# Run reviewer's tool for analysis
env = os.environ.copy()
if cfg["api_key_env"] == "XAI_API_KEY":
env["XAI_API_KEY"] = Path(Path.home() / ".config/grok/api_key").read_text().strip()
if cfg["tool"] == "aider":
cmd = ["aider", "--model", cfg["model"],
"--no-auto-commits", "--yes-always",
"--no-suggest-shell-commands",
"--message", review_prompt]
else:
cmd = ["opencode", "run", "-m", cfg["model"],
"--no-interactive", review_prompt]
try:
result = subprocess.run(
cmd, capture_output=True, text=True,
timeout=120, env=env, cwd="/tmp"
)
review_text = result.stdout[-1000:] if result.stdout else "No output"
except Exception as e:
review_text = f"Review failed: {e}"
# Post review as comment
g.create_comment(repo, pr.number,
f"**Reviewed by `{reviewer}`:**\n\n{review_text}")
results.append({"reviewer": reviewer, "pr": pr.number, "repo": repo})
except Exception:
continue
return {"reviews": len(results), "details": results}